Automaticaly update BigQuery table schema when unknown fields are seen.#38058
Automaticaly update BigQuery table schema when unknown fields are seen.#38058reuvenlax wants to merge 11 commits intoapache:masterfrom
Conversation
|
/gemini review |
|
Warning Gemini encountered an error creating the summary. You can try again by commenting |
|
/gemini summary |
Summary of ChangesThis pull request introduces a significant enhancement to the BigQuery Storage Write API connector, allowing it to automatically adapt to evolving data schemas. When data with new fields or relaxed field constraints arrives, the system will now detect these changes, buffer the affected records, and dynamically update the BigQuery table's schema. This capability greatly simplifies data ingestion pipelines by removing the need for manual schema management in response to minor data evolution, ensuring data flow continues uninterrupted. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements automatic schema evolution for the BigQuery Storage Write API. It introduces a mechanism to detect schema mismatches using hashing and handles missing field errors by automatically patching the BigQuery table schema when permitted. Key changes include the UpgradeTableSchema utility for schema merging and a stateful buffering mechanism in StorageApiConvertMessages to hold records during schema updates. The review feedback recommends replacing System.err calls with proper logging, optimizing object allocations in loops, removing redundant semicolons, and using a fixed charset for hashing to ensure consistency across environments.
| ElementT value = element.getValue(); | ||
| boolean needsNewTimer = false; | ||
| if (value != null) { | ||
| System.err.println("BUFFERING ELEMENT"); |
| boolean schemaOutOfDate = false; | ||
| do { | ||
| try { | ||
| System.err.println("TRYING TO PATCH TO " + updatedSchema); |
| } | ||
| return; | ||
| } catch (IOException e) { | ||
| ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); |
| collectors.clear(); | ||
| ; |
| this.isStruct = isStruct; | ||
| ; |
| prefix.isEmpty() | ||
| ? tableFieldSchema.getName() | ||
| : String.join(".", prefix, tableFieldSchema.getName()); | ||
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset())); |
There was a problem hiding this comment.
Using Charset.defaultCharset() can lead to inconsistent results across different environments. It is safer to use a fixed charset like StandardCharsets.UTF_8 for hashing.
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), Charset.defaultCharset())); | |
| hashCodes.add(SCHEMA_HASH_FUNCTION.hashString(name.toLowerCase(), StandardCharsets.UTF_8)); |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
402535f to
863352d
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces automatic schema updates for the BigQuery Storage Write API, enabling field additions and relaxations during streaming. It implements a detection mechanism for schema mismatches, asynchronous table patching, and a stateful buffering stage to hold records until updates are propagated. The changes also include an error collection system for proto conversion and optimized schema caching. A redundant license header was noted in ConvertMessagesDoFn.java.
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for automatic BigQuery table schema updates during Storage Write API operations. It adds new components like ConvertMessagesDoFn, PatchTableSchemaDoFn, and SchemaUpdateHoldingFn to buffer and retry elements when schema mismatches occur. My review identified several critical issues: the schema patching loop can become infinite on invalid updates, the ConvertMessagesDoFn is incorrectly shared across transforms, the windowing logic for schema notifications is flawed, and there are several instances of dead code, debug logs, and overly aggressive error handling that could crash the pipeline.
| while (true) { | ||
| TableSchema baseSchema = messageConverter.getTableSchema(); | ||
| TableSchema updatedSchema = UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff); | ||
| // Check first to see if the schema still needs updating. | ||
| if (baseSchema.equals(updatedSchema)) { | ||
| return; | ||
| } | ||
| BackOff backoff = | ||
| new ExponentialBackOff.Builder() | ||
| .setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(1)) | ||
| .build(); | ||
| boolean schemaOutOfDate = false; | ||
| Exception lastException = null; | ||
| do { | ||
| try { | ||
| getDatasetService(pipelineOptions) | ||
| .patchTableSchema( | ||
| dynamicDestinations.getTable(destination).getTableReference(), | ||
| TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema)); | ||
| // Indicate that we've patched this schema. | ||
| o.output(KV.of(destination, null)); | ||
| return; | ||
| } catch (IOException e) { | ||
| ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); | ||
| if (errorExtractor.preconditionNotMet(e) || errorExtractor.badRequest(e)) { | ||
| schemaOutOfDate = true; | ||
| break; | ||
| } else { | ||
| lastException = e; | ||
| } | ||
| } | ||
| } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff)); | ||
| if (schemaOutOfDate) { | ||
| // This could be due to an out-of-date schema. | ||
| messageConverter.updateSchemaFromTable(); | ||
| } else { | ||
| // We ran out of retries. | ||
| throw new RuntimeException("Failed to patch table schema.", lastException); | ||
| } | ||
| } |
There was a problem hiding this comment.
The while (true) loop, combined with the logic that sets schemaOutOfDate = true on any badRequest or preconditionNotMet error, can lead to an infinite loop. If a schema update is inherently invalid (e.g., attempting an incompatible type change), BigQuery will return a 400 Bad Request. The current implementation will interpret this as the local schema being out of date, refresh it, and retry the same invalid update indefinitely. A maximum retry count or a more specific check on the error reason should be implemented.
| ConvertMessagesDoFn<DestinationT, ElementT> convertMessagesDoFn = | ||
| new ConvertMessagesDoFn<>( | ||
| dynamicDestinations, | ||
| bqServices, | ||
| operationName, | ||
| failedWritesTag, | ||
| successfulWritesTag, | ||
| patchTableSchemaTag, | ||
| elementsWaitingForSchemaTag, | ||
| rowMutationFn, | ||
| badRecordRouter, | ||
| input.getCoder()); | ||
|
|
||
| PCollectionTuple result = | ||
| input.apply( | ||
| "Convert to message", | ||
| ParDo.of( | ||
| new ConvertMessagesDoFn<>( | ||
| dynamicDestinations, | ||
| bqServices, | ||
| operationName, | ||
| failedWritesTag, | ||
| successfulWritesTag, | ||
| rowMutationFn, | ||
| badRecordRouter, | ||
| input.getCoder())) | ||
| ParDo.of(convertMessagesDoFn) | ||
| .withOutputTags( | ||
| successfulWritesTag, | ||
| TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))) | ||
| TupleTagList.of( | ||
| ImmutableList.of( | ||
| failedWritesTag, | ||
| BAD_RECORD_TAG, | ||
| patchTableSchemaTag, | ||
| elementsWaitingForSchemaTag))) | ||
| .withSideInputs(dynamicDestinations.getSideInputs())); | ||
| result.get(successfulWritesTag).setCoder(successCoder); | ||
| result.get(failedWritesTag).setCoder(errorCoder); | ||
| result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())); | ||
| return result; | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))); | ||
| result.get(elementsWaitingForSchemaTag).setCoder(KvCoder.of(destinationCoder, elementCoder)); | ||
|
|
||
| final int numShards = | ||
| input | ||
| .getPipeline() | ||
| .getOptions() | ||
| .as(BigQueryOptions.class) | ||
| .getSchemaUpgradeBufferingShards(); | ||
|
|
||
| // Throttle the stream to the patch-table function so that only a single update per table per | ||
| // second gets processed. The combiner merges incremental schemas, so we won't miss any pdates. | ||
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched = | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .apply( | ||
| "rewindow", | ||
| Window.<KV<DestinationT, TableSchema>>configure() | ||
| .triggering( | ||
| Repeatedly.forever( | ||
| AfterProcessingTime.pastFirstElementInPane() | ||
| .plusDelayOf(Duration.standardSeconds(1)))) | ||
| .discardingFiredPanes()) | ||
| .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn())) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))) | ||
| .apply( | ||
| "Patch table schema", | ||
| ParDo.of( | ||
| new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations))) | ||
| .setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder))) | ||
| // We need to make sure that all shards of the buffering transform are notified. | ||
| .apply( | ||
| "fanout to all shards", | ||
| FlatMapElements.via( | ||
| new SimpleFunction< | ||
| KV<DestinationT, ElementT>, | ||
| Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() { | ||
| @Override | ||
| public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply( | ||
| KV<DestinationT, ElementT> elem) { | ||
| return IntStream.range(0, numShards) | ||
| .mapToObj( | ||
| i -> | ||
| KV.of( | ||
| StorageApiConvertMessages.AssignShardFn.getShardedKey( | ||
| elem.getKey(), i, numShards), | ||
| elem.getValue())) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| })) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))) | ||
| .apply( | ||
| Window.<KV<ShardedKey<DestinationT>, ElementT>>configure() | ||
| .triggering(DefaultTrigger.of())); | ||
|
|
||
| // Any elements that are waiting for a schema update are sent to this stateful DoFn to be | ||
| // buffered. | ||
| // Note: we currently do not provide the DynamicDestinations object access to the side input in | ||
| // this path. | ||
| // This is because side inputs are not currently available from timer callbacks. Since side | ||
| // inputs are generally | ||
| // used for getSchema and in this case we read the schema from the table, this is unlikely to be | ||
| // a problem. | ||
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements = | ||
| result | ||
| .get(elementsWaitingForSchemaTag) | ||
| // TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here | ||
| // instead of fixed sharding. | ||
| .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards))) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))); | ||
|
|
||
| PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList = | ||
| PCollectionList.of(shardedWaitingElements).and(tablesPatched); | ||
| PCollectionTuple retryResult = | ||
| waitingElementsList | ||
| .apply("Buffered flatten", Flatten.pCollections()) | ||
| .apply( | ||
| "bufferElements", | ||
| ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn)) |
There was a problem hiding this comment.
The convertMessagesDoFn instance is being used both as a standalone DoFn in a ParDo (line 125) and as an embedded helper inside SchemaUpdateHoldingFn (line 220). In Apache Beam, DoFn instances have specific lifecycles (setup, startBundle, finishBundle, teardown) managed by the runner. Sharing a DoFn instance across different transforms or embedding it within another DoFn is highly discouraged and can lead to unexpected behavior or serialization issues. The shared logic should be refactored into a separate helper class that is not a DoFn.
| PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched = | ||
| result | ||
| .get(patchTableSchemaTag) | ||
| .apply( | ||
| "rewindow", | ||
| Window.<KV<DestinationT, TableSchema>>configure() | ||
| .triggering( | ||
| Repeatedly.forever( | ||
| AfterProcessingTime.pastFirstElementInPane() | ||
| .plusDelayOf(Duration.standardSeconds(1)))) | ||
| .discardingFiredPanes()) | ||
| .apply("merge schemas", Combine.perKey(new MergeSchemaCombineFn())) | ||
| .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class))) | ||
| .apply( | ||
| "Patch table schema", | ||
| ParDo.of( | ||
| new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations))) | ||
| .setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder))) | ||
| // We need to make sure that all shards of the buffering transform are notified. | ||
| .apply( | ||
| "fanout to all shards", | ||
| FlatMapElements.via( | ||
| new SimpleFunction< | ||
| KV<DestinationT, ElementT>, | ||
| Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() { | ||
| @Override | ||
| public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply( | ||
| KV<DestinationT, ElementT> elem) { | ||
| return IntStream.range(0, numShards) | ||
| .mapToObj( | ||
| i -> | ||
| KV.of( | ||
| StorageApiConvertMessages.AssignShardFn.getShardedKey( | ||
| elem.getKey(), i, numShards), | ||
| elem.getValue())) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| })) | ||
| .setCoder( | ||
| KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder))) | ||
| .apply( | ||
| Window.<KV<ShardedKey<DestinationT>, ElementT>>configure() | ||
| .triggering(DefaultTrigger.of())); |
There was a problem hiding this comment.
The schema update notification (tablesPatched) is emitted in the GlobalWindow via ConvertMessagesDoFn.finishBundle. However, the elements waiting for schema updates in SchemaUpdateHoldingFn may belong to various windows. Since SchemaUpdateHoldingFn is a stateful DoFn, its state and timers are window-scoped. A notification in the GlobalWindow will not trigger the processing of state held in other windows. While the pollTimer fallback ensures eventual processing, the 'fast-path' notification mechanism is currently ineffective for windowed data.
| new RetryManager<>( | ||
| Duration.standardSeconds(1), | ||
| Duration.standardSeconds(20), | ||
| maxRetries, | ||
| BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS)); |
|
|
||
| AppendClientInfo getAppendClientInfo( | ||
| boolean lookupCache, final @Nullable TableSchema updatedSchema) { | ||
| lookupCache = false; |
| // TODO: WHY ARE WE HITTING THIS FAILURE!!!!!!! WE successfully reopend the | ||
| // connection with | ||
| // a new TableSchema, yet Vortex is failing the individual rows. | ||
| // APPEARS THAT STREAMWRITER isn't getting updated? | ||
| LOG.error( | ||
| "UNEXPECTED. DUMPING ERROR {}, CONVERTER SCHEMA {}, ACI SCHEMA {}", | ||
| error.getRowIndexToErrorMessage().get(failedIndex), | ||
| messageConverter.getTableSchema(), | ||
| aci.getTableSchema()); |
| return; | ||
| } | ||
| } while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff)); | ||
| throw new RuntimeException("Failed to flush elements on window expiration!"); |
There was a problem hiding this comment.
Throwing a RuntimeException during onWindowExpiration is quite aggressive for a streaming pipeline. If a schema update is delayed or fails permanently, this will cause the entire pipeline to crash and potentially enter a crash loop. Consider routing these elements to a dead-letter queue or failing them through the standard WriteResult mechanism instead of crashing the worker.
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
R: @ahmedabu98 |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
No description provided.