Skip to content

Automaticaly update BigQuery table schema when unknown fields are seen.#38058

Open
reuvenlax wants to merge 11 commits intoapache:masterfrom
reuvenlax:bq_add_new_columns
Open

Automaticaly update BigQuery table schema when unknown fields are seen.#38058
reuvenlax wants to merge 11 commits intoapache:masterfrom
reuvenlax:bq_add_new_columns

Conversation

@reuvenlax
Copy link
Copy Markdown
Contributor

No description provided.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Warning

Gemini encountered an error creating the summary. You can try again by commenting /gemini summary.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini summary

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

This pull request introduces a significant enhancement to the BigQuery Storage Write API connector, allowing it to automatically adapt to evolving data schemas. When data with new fields or relaxed field constraints arrives, the system will now detect these changes, buffer the affected records, and dynamically update the BigQuery table's schema. This capability greatly simplifies data ingestion pipelines by removing the need for manual schema management in response to minor data evolution, ensuring data flow continues uninterrupted.

Highlights

  • Automatic Schema Evolution: Implemented a mechanism for BigQuery Storage Write API to automatically detect and apply schema updates (field additions and relaxation) to target tables when unknown fields or schema mismatches are encountered in incoming data.
  • Schema Mismatch Detection and Buffering: Introduced logic to identify when an incoming record's schema differs from the BigQuery table's current schema. Records requiring schema updates are buffered and reprocessed after the table schema has been patched.
  • Incremental Schema Generation and Patching: Developed utilities to generate an 'incremental schema' based on detected differences, which is then used to patch the BigQuery table. This process is throttled to prevent excessive API calls.
  • Enhanced Error Collection: Refactored TableRowToStorageApiProto to include an ErrorCollector that gathers schema conversion exceptions, allowing for more granular error handling and the generation of incremental schema updates.
  • API Integration for Schema Updates: Extended BigQueryServices with a patchTableSchema method, enabling programmatic updates to BigQuery table schemas directly from the Beam pipeline.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • examples/java/src/main/java/org/apache/beam/examples/GroupIntoBatchesExample.java
    • Updated class declaration with an extra space for formatting consistency.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
    • Imported Preconditions for utility functions.
    • Modified encodeUnknownFields to pass ErrorCollector.DONT_COLLECT to messageFromTableRow.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
    • Added validation to prevent schema update options from being used with auto schema update, Beam schemas, or direct proto writes.
    • Passed elementCoder and destinationCoder to the StorageApiLoads constructor.
    • Updated StorageApiDynamicDestinationsTableRow instantiation to include schemaUpdateOptions.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
    • Added patchTableSchema method to the BigQueryRpc interface for updating table schemas.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
    • Implemented the patchTableSchema method in BigQueryRpcImpl to update BigQuery table schemas.
    • Added DONT_RETRY_INVALID_ARG_OR_PRECONDITION retry function for specific API errors.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/MergeSchemaCombineFn.java
    • Added new class MergeSchemaCombineFn which extends Combine.CombineFn to merge BigQuery TableSchema objects.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
    • Imported isPayloadSchemaOutOfDate and Descriptors.
    • Added schemaMismatchSeen field to Value and SplittingIterable.
    • Updated constructor to accept getCurrentTableSchemaHash and getCurrentTableSchemaDescriptor suppliers.
    • Modified next method to detect and track schema mismatches using isPayloadSchemaOutOfDate.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java
    • Refactored expand method to integrate schema update logic, including new PatchTableSchemaDoFn and SchemaUpdateHoldingFn.
    • Added new output tags (patchTableSchemaTag, elementsWaitingForSchemaTag) for schema patching and buffering elements.
    • Introduced SchemaUpdateHoldingFn to buffer elements awaiting schema updates and PatchTableSchemaDoFn to apply schema patches.
    • Modified ConvertMessagesDoFn to use an ErrorCollector for schema conversion errors and to output elements for schema patching or retry.
    • Added BufferedCollectorInformation inner class to track schema errors per destination.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
    • Added getSchemaHash and updateSchemaFromTable methods to the MessageConverter interface.
    • Modified toMessage method in MessageConverter to accept an ErrorCollector parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsProto.java
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Updated toMessage method to accept collectedExceptions parameter.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
    • Introduced SchemaUpgradingTableRowConverter to manage schema updates for TableRow conversions.
    • Added schemaUpdateOptions parameter to the constructor.
    • Implemented getSchemaHash and updateSchemaFromTable methods.
    • Modified toMessage method to accept collectedExceptions and include schema hash in the payload.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
    • Added elementCoder and destinationCoder to the constructor.
    • Updated StorageApiConvertMessages instantiation to pass elementCoder and destinationCoder.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
    • Added getSchemaHash and setSchemaHash methods to StorageApiWritePayload for tracking schema versions.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
    • Imported isPayloadSchemaOutOfDate for schema mismatch detection.
    • Added schema mismatch detection logic within addMessage and flush methods.
    • Updated SplittingIterable constructor call to pass schema hash and descriptor suppliers.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
    • Added schema mismatch detection logic within process method.
    • Updated SplittingIterable constructor call to pass schema hash and descriptor suppliers.
    • Refined error handling for append failures, including closing clients for non-quota errors.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
    • Imported toSet for stream collectors.
    • Introduced ErrorCollector class to gather schema conversion exceptions.
    • Added new SchemaConversionException types: SchemaTooNarrowException and SchemaMissingRequiredFieldException.
    • Implemented schema hashing logic (tableSchemaHash) for TableSchema and TableFieldSchema.
    • Modified messageFromMap and messageFromTableRow to accept and utilize ErrorCollector for schema evolution handling.
    • Updated messageValueFromFieldValue and singularFieldToProtoValue to propagate ErrorCollector and handle missing required fields.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
    • Added putSchema method to explicitly set a schema in the cache.
    • Modified refreshSchema to use putIfAbsent for tablesToRefresh to avoid overwriting existing refresh requests.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpgradeTableSchema.java
    • Added new class UpgradeTableSchema to encapsulate schema upgrade logic.
    • Implemented newErrorCollector to create an ErrorCollector instance.
    • Provided getIncrementalSchema to construct a new schema based on collected errors and an old schema.
    • Implemented mergeSchemas and mergeFields for combining BigQuery table schemas.
    • Added isPayloadSchemaOutOfDate to check for schema mismatches between payload and current table schema.
    • Included hasUnknownFields to recursively check for unknown fields in a protobuf message.
  • sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
    • Added getStorageException helper for creating Storage API exceptions.
    • Modified appendRows to return Exceptions.StorageException instead of throwing RuntimeException.
    • Implemented patchTableSchema to simulate BigQuery schema updates.
    • Added checkSchemaChanges and checkSchemaChangesProtos to validate schema evolution rules.
    • Removed check for unknown fields in appendRows to align with new schema evolution behavior.
  • sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
    • Added testAutoPatchTableSchemaTest to verify automatic schema patching functionality.
  • sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
    • Updated calls to messageFromTableRow to include the new ErrorCollector parameter.
Activity
  • reuvenlax requested a Gemini review.
  • gemini-code-assist[bot] reported an error creating the summary.
  • reuvenlax requested a Gemini summary again.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements automatic schema evolution for the BigQuery Storage Write API. It introduces a mechanism to detect schema mismatches using hashing and handles missing field errors by automatically patching the BigQuery table schema when permitted. Key changes include the UpgradeTableSchema utility for schema merging and a stateful buffering mechanism in StorageApiConvertMessages to hold records during schema updates. The review feedback recommends replacing System.err calls with proper logging, optimizing object allocations in loops, removing redundant semicolons, and using a fixed charset for hashing to ensure consistency across environments.

ElementT value = element.getValue();
boolean needsNewTimer = false;
if (value != null) {
System.err.println("BUFFERING ELEMENT");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This debug print should be removed or replaced with a proper logger call. System.err is not appropriate for production library code as it bypasses logging configurations.

boolean schemaOutOfDate = false;
do {
try {
System.err.println("TRYING TO PATCH TO " + updatedSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This debug print should be removed or replaced with a proper logger call.

}
return;
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

ApiErrorExtractor is stateless and can be instantiated once outside of the retry loop to avoid unnecessary object allocation in each iteration.

Comment on lines +668 to +669
collectors.clear();
;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a redundant semicolon here.

Suggested change
collectors.clear();
;
collectors.clear();

Comment on lines +156 to +157
this.isStruct = isStruct;
;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a redundant semicolon here.

Suggested change
this.isStruct = isStruct;
;
this.isStruct = isStruct;

prefix.isEmpty()
? tableFieldSchema.getName()
: String.join(".", prefix, tableFieldSchema.getName());
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using Charset.defaultCharset() can lead to inconsistent results across different environments. It is safer to use a fixed charset like StandardCharsets.UTF_8 for hashing.

Suggested change
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset()));
hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), StandardCharsets.UTF_8));

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 3, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@reuvenlax reuvenlax force-pushed the bq_add_new_columns branch from 402535f to 863352d Compare April 3, 2026 22:45
@github-actions github-actions bot removed the examples label Apr 3, 2026
@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces automatic schema updates for the BigQuery Storage Write API, enabling field additions and relaxations during streaming. It implements a detection mechanism for schema mismatches, asynchronous table patching, and a stateful buffering stage to hold records until updates are propagated. The changes also include an error collection system for proto conversion and optimized schema caching. A redundant license header was noted in ConvertMessagesDoFn.java.

Comment on lines +19 to +35
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This license header block appears to be a duplicate of the one at the top of the file. Please remove this redundant block to improve code clarity.

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for automatic BigQuery table schema updates during Storage Write API operations. It adds new components like ConvertMessagesDoFn, PatchTableSchemaDoFn, and SchemaUpdateHoldingFn to buffer and retry elements when schema mismatches occur. My review identified several critical issues: the schema patching loop can become infinite on invalid updates, the ConvertMessagesDoFn is incorrectly shared across transforms, the windowing logic for schema notifications is flawed, and there are several instances of dead code, debug logs, and overly aggressive error handling that could crash the pipeline.

Comment on lines +110 to +149
while (true) {
TableSchema baseSchema = messageConverter.getTableSchema();
TableSchema updatedSchema = UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff);
// Check first to see if the schema still needs updating.
if (baseSchema.equals(updatedSchema)) {
return;
}
BackOff backoff =
new ExponentialBackOff.Builder()
.setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(1))
.build();
boolean schemaOutOfDate = false;
Exception lastException = null;
do {
try {
getDatasetService(pipelineOptions)
.patchTableSchema(
dynamicDestinations.getTable(destination).getTableReference(),
TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema));
// Indicate that we've patched this schema.
o.output(KV.of(destination, null));
return;
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if (errorExtractor.preconditionNotMet(e) || errorExtractor.badRequest(e)) {
schemaOutOfDate = true;
break;
} else {
lastException = e;
}
}
} while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff));
if (schemaOutOfDate) {
// This could be due to an out-of-date schema.
messageConverter.updateSchemaFromTable();
} else {
// We ran out of retries.
throw new RuntimeException("Failed to patch table schema.", lastException);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The while (true) loop, combined with the logic that sets schemaOutOfDate = true on any badRequest or preconditionNotMet error, can lead to an infinite loop. If a schema update is inherently invalid (e.g., attempting an incompatible type change), BigQuery will return a 400 Bad Request. The current implementation will interpret this as the local schema being out of date, refresh it, and retry the same invalid update indefinitely. A maximum retry count or a more specific check on the error reason should be implemented.

Comment on lines +109 to +220
ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn =
new ConvertMessagesDoFn<>(
dynamicDestinations,
bqServices,
operationName,
failedWritesTag,
successfulWritesTag,
patchTableSchemaTag,
elementsWaitingForSchemaTag,
rowMutationFn,
badRecordRouter,
input.getCoder());

PCollectionTuple result =
input.apply(
"Convert to message",
ParDo.of(
new ConvertMessagesDoFn<>(
dynamicDestinations,
bqServices,
operationName,
failedWritesTag,
successfulWritesTag,
rowMutationFn,
badRecordRouter,
input.getCoder()))
ParDo.of(convertMessagesDoFn)
.withOutputTags(
successfulWritesTag,
TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG)))
TupleTagList.of(
ImmutableList.of(
failedWritesTag,
BAD_RECORD_TAG,
patchTableSchemaTag,
elementsWaitingForSchemaTag)))
.withSideInputs(dynamicDestinations.getSideInputs()));
result.get(successfulWritesTag).setCoder(successCoder);
result.get(failedWritesTag).setCoder(errorCoder);
result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));
return result;
result
.get(patchTableSchemaTag)
.setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)));
result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, elementCoder));

final int numShards =
input
.getPipeline()
.getOptions()
.as(BigQueryOptions.class)
.getSchemaUpgradeBufferingShards();

// Throttle the stream to the patch-table function so that only a single update per table per
// second gets processed. The combiner merges incremental schemas, so we won't miss any pdates.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
result
.get(patchTableSchemaTag)
.apply(
"rewindow",
Window.<KV<DestinationT, TableSchema>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.discardingFiredPanes())
.apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
.setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)))
.apply(
"Patch table schema",
ParDo.of(
new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations)))
.setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder)))
// We need to make sure that all shards of the buffering transform are notified.
.apply(
"fanout to all shards",
FlatMapElements.via(
new SimpleFunction<
KV<DestinationT, ElementT>,
Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
@Override
public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply(
KV<DestinationT, ElementT> elem) {
return IntStream.range(0, numShards)
.mapToObj(
i ->
KV.of(
StorageApiConvertMessages.AssignShardFn.getShardedKey(
elem.getKey(), i, numShards),
elem.getValue()))
.collect(Collectors.toList());
}
}))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)))
.apply(
Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
.triggering(DefaultTrigger.of()));

// Any elements that are waiting for a schema update are sent to this stateful DoFn to be
// buffered.
// Note: we currently do not provide the DynamicDestinations object access to the side input in
// this path.
// This is because side inputs are not currently available from timer callbacks. Since side
// inputs are generally
// used for getSchema and in this case we read the schema from the table, this is unlikely to be
// a problem.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements =
result
.get(elementsWaitingForSchemaTag)
// TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here
// instead of fixed sharding.
.apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)));

PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList =
PCollectionList.of(shardedWaitingElements).and(tablesPatched);
PCollectionTuple retryResult =
waitingElementsList
.apply("Buffered flatten", Flatten.pCollections())
.apply(
"bufferElements",
ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The convertMessagesDoFn instance is being used both as a standalone DoFn in a ParDo (line 125) and as an embedded helper inside SchemaUpdateHoldingFn (line 220). In Apache Beam, DoFn instances have specific lifecycles (setup, startBundle, finishBundle, teardown) managed by the runner. Sharing a DoFn instance across different transforms or embedding it within another DoFn is highly discouraged and can lead to unexpected behavior or serialization issues. The shared logic should be refactored into a separate helper class that is not a DoFn.

Comment on lines +152 to +194
PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
result
.get(patchTableSchemaTag)
.apply(
"rewindow",
Window.<KV<DestinationT, TableSchema>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.discardingFiredPanes())
.apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn()))
.setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)))
.apply(
"Patch table schema",
ParDo.of(
new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations)))
.setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder)))
// We need to make sure that all shards of the buffering transform are notified.
.apply(
"fanout to all shards",
FlatMapElements.via(
new SimpleFunction<
KV<DestinationT, ElementT>,
Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
@Override
public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply(
KV<DestinationT, ElementT> elem) {
return IntStream.range(0, numShards)
.mapToObj(
i ->
KV.of(
StorageApiConvertMessages.AssignShardFn.getShardedKey(
elem.getKey(), i, numShards),
elem.getValue()))
.collect(Collectors.toList());
}
}))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)))
.apply(
Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
.triggering(DefaultTrigger.of()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The schema update notification (tablesPatched) is emitted in the GlobalWindow via ConvertMessagesDoFn.finishBundle. However, the elements waiting for schema updates in SchemaUpdateHoldingFn may belong to various windows. Since SchemaUpdateHoldingFn is a stateful DoFn, its state and timers are window-scoped. A notification in the GlobalWindow will not trigger the processing of state held in other windows. While the pollTimer fallback ensures eventual processing, the 'fast-path' notification mechanism is currently ineffective for windowed data.

Comment on lines +1038 to +1042
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(20),
maxRetries,
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block instantiates a new RetryManager but does not assign it to any variable or use it. It appears to be a leftover from a refactoring or a copy-paste error, as the retryManager used in the subsequent logic is correctly initialized on line 1037.


AppendClientInfo getAppendClientInfo(
boolean lookupCache, final @Nullable TableSchema updatedSchema) {
lookupCache = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The lookupCache parameter is hardcoded to false at the start of the method, which makes the parameter passed by callers irrelevant. If this was intended for debugging, it should be removed before merging.

Comment on lines +784 to +792
// TODO: WHY ARE WE HITTING THIS FAILURE!!!!!!! WE successfully reopend the
// connection with
// a new TableSchema, yet Vortex is failing the individual rows.
// APPEARS THAT STREAMWRITER isn't getting updated?
LOG.error(
"UNEXPECTED. DUMPING ERROR {}, CONVERTER SCHEMA {}, ACI SCHEMA {}",
error.getRowIndexToErrorMessage().get(failedIndex),
messageConverter.getTableSchema(),
aci.getTableSchema());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block contains a debug-style log message with a 'TODO' and multiple exclamation marks. This should be cleaned up or converted into a proper production-level error log with a clear explanation of the expected failure mode.

return;
}
} while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff));
throw new RuntimeException("Failed to flush elements on window expiration!");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Throwing a RuntimeException during onWindowExpiration is quite aggressive for a streaming pipeline. If a schema update is delayed or fails permanently, this will cause the entire pipeline to crash and potentially enter a crash loop. Consider routing these elements to a dead-letter queue or failing them through the standard WriteResult mechanism instead of crashing the worker.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 7, 2026

Assigning reviewers:

R: @chamikaramj for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@reuvenlax
Copy link
Copy Markdown
Contributor Author

R: @ahmedabu98

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 7, 2026

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants