Skip to content

Commit 863352d

Browse files
committed
auto update schema
1 parent 211cd66 commit 863352d

26 files changed

Lines changed: 2534 additions & 250 deletions

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import javax.annotation.Nullable;
3434
import org.apache.beam.sdk.metrics.Counter;
3535
import org.apache.beam.sdk.metrics.Metrics;
36+
import org.apache.beam.sdk.util.Preconditions;
3637

3738
/**
3839
* Container class used by {@link StorageApiWritesShardedRecords} and {@link
@@ -145,15 +146,17 @@ boolean hasSchemaChanged(TableSchema updatedTableSchema) {
145146
public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
146147
throws TableRowToStorageApiProto.SchemaConversionException {
147148
Message msg =
148-
TableRowToStorageApiProto.messageFromTableRow(
149-
getSchemaInformation(),
150-
getDescriptorIgnoreRequired(),
151-
unknown,
152-
ignoreUnknownValues,
153-
true,
154-
null,
155-
null,
156-
null);
149+
Preconditions.checkArgumentNotNull(
150+
TableRowToStorageApiProto.messageFromTableRow(
151+
getSchemaInformation(),
152+
getDescriptorIgnoreRequired(),
153+
unknown,
154+
ignoreUnknownValues,
155+
true,
156+
null,
157+
null,
158+
null,
159+
TableRowToStorageApiProto.ErrorCollector.DONT_COLLECT));
157160
return msg.toByteString();
158161
}
159162

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4202,9 +4202,19 @@ private <DestinationT> WriteResult continueExpandTyped(
42024202
}
42034203
return input.apply(batchLoads);
42044204
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
4205+
boolean useSchemaUpdate =
4206+
getSchemaUpdateOptions() != null && !getSchemaUpdateOptions().isEmpty();
4207+
if (useSchemaUpdate) {
4208+
checkArgument(
4209+
!getAutoSchemaUpdate(),
4210+
"Schema update options are not supported when using auto schema update");
4211+
checkArgument(!getIgnoreUnknownValues());
4212+
}
42054213
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
42064214
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
42074215
if (getUseBeamSchema()) {
4216+
checkArgument(
4217+
!useSchemaUpdate, "SchemaUpdateOptions are not supported when using Beam schemas");
42084218
// This ensures that the Beam rows are directly translated into protos for Storage API
42094219
// writes, with no
42104220
// need to round trip through JSON TableRow objects.
@@ -4216,6 +4226,9 @@ private <DestinationT> WriteResult continueExpandTyped(
42164226
getFormatRecordOnFailureFunction(),
42174227
getRowMutationInformationFn() != null);
42184228
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
4229+
checkArgument(
4230+
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing protos");
4231+
42194232
// We could support both of these by falling back to
42204233
// StorageApiDynamicDestinationsTableRow. This
42214234
// would defeat the optimization (we would be forced to create a new dynamic proto message
@@ -4233,13 +4246,18 @@ private <DestinationT> WriteResult continueExpandTyped(
42334246
!getIgnoreUnknownValues(),
42344247
"ignoreUnknownValues not supported when using writeProtos."
42354248
+ " Try setting withDirectWriteProtos(false)");
4249+
checkArgument(!useSchemaUpdate);
4250+
42364251
storageApiDynamicDestinations =
42374252
(StorageApiDynamicDestinations<T, DestinationT>)
42384253
new StorageApiDynamicDestinationsProto(
42394254
dynamicDestinations,
42404255
getWriteProtosClass(),
42414256
getFormatRecordOnFailureFunction());
42424257
} else if (getAvroRowWriterFactory() != null) {
4258+
checkArgument(
4259+
!useSchemaUpdate, "SchemaUpdateOptions are not supported when writing avros");
4260+
42434261
// we can configure the avro to storage write api proto converter for this
42444262
// assuming the format function returns an Avro GenericRecord
42454263
// and there is a schema defined
@@ -4248,7 +4266,6 @@ private <DestinationT> WriteResult continueExpandTyped(
42484266
|| getDynamicDestinations() != null
42494267
|| getSchemaFromView() != null,
42504268
"A schema must be provided for avro rows to be used with StorageWrite API.");
4251-
42524269
RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
42534270
recordWriterFactory =
42544271
(RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
@@ -4275,7 +4292,10 @@ private <DestinationT> WriteResult continueExpandTyped(
42754292
getRowMutationInformationFn() != null,
42764293
getCreateDisposition(),
42774294
getIgnoreUnknownValues(),
4278-
getAutoSchemaUpdate());
4295+
getAutoSchemaUpdate(),
4296+
getSchemaUpdateOptions() == null
4297+
? Collections.emptySet()
4298+
: getSchemaUpdateOptions());
42794299
}
42804300

42814301
int numShards = getStorageApiNumStreams(bqOptions);
@@ -4287,6 +4307,7 @@ private <DestinationT> WriteResult continueExpandTyped(
42874307
StorageApiLoads<DestinationT, T> storageApiLoads =
42884308
new StorageApiLoads<>(
42894309
destinationCoder,
4310+
elementCoder,
42904311
storageApiDynamicDestinations,
42914312
getRowMutationInformationFn(),
42924313
getCreateDisposition(),

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,12 @@ public interface BigQueryOptions
245245
Boolean getGroupFilesFileLoad();
246246

247247
void setGroupFilesFileLoad(Boolean value);
248+
249+
@Hidden
250+
@Description(
251+
"The number of parallelization to use for buffering elements when upgrading table schemas.")
252+
@Default.Integer(50)
253+
Integer getSchemaUpgradeBufferingShards();
254+
255+
void setSchemaUpgradeBufferingShards(Integer value);
248256
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ <T> long insertAll(
204204
/** Patch BigQuery {@link Table} description. */
205205
Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
206206
throws IOException, InterruptedException;
207+
208+
Table patchTableSchema(
209+
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
210+
throws IOException, InterruptedException;
207211
}
208212

209213
/** An interface to get, create and flush Cloud BigQuery STORAGE API write streams. */

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,6 +1459,31 @@ public Table patchTableDescription(
14591459
ALWAYS_RETRY);
14601460
}
14611461

1462+
@Override
1463+
public Table patchTableSchema(
1464+
TableReference tableReference, com.google.api.services.bigquery.model.TableSchema newSchema)
1465+
throws IOException, InterruptedException {
1466+
Table newTable = new Table();
1467+
newTable.setSchema(newSchema);
1468+
1469+
Tables.Patch request =
1470+
client
1471+
.tables()
1472+
.patch(
1473+
tableReference.getProjectId(),
1474+
tableReference.getDatasetId(),
1475+
tableReference.getTableId(),
1476+
newTable);
1477+
return executeWithRetries(
1478+
request,
1479+
String.format(
1480+
"Unable to patch table: %s, aborting after %d retries.",
1481+
tableReference, MAX_RPC_RETRIES),
1482+
Sleeper.DEFAULT,
1483+
createDefaultBackoff(),
1484+
DONT_RETRY_INVALID_ARG_OR_PRECONDITION);
1485+
}
1486+
14621487
@Override
14631488
public void close() throws Exception {
14641489
// Nothing to close
@@ -1664,6 +1689,11 @@ public void close() throws Exception {
16641689
return !errorExtractor.itemNotFound(input);
16651690
};
16661691

1692+
static final SerializableFunction<IOException, Boolean> DONT_RETRY_INVALID_ARG_OR_PRECONDITION =
1693+
input -> {
1694+
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
1695+
return !errorExtractor.preconditionNotMet(input) && !errorExtractor.badRequest(input);
1696+
};
16671697
static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true;
16681698

16691699
@VisibleForTesting
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.cloud.bigquery.storage.v1.TableSchema;
23+
import java.util.Iterator;
24+
import org.apache.beam.sdk.coders.CannotProvideCoderException;
25+
import org.apache.beam.sdk.coders.Coder;
26+
import org.apache.beam.sdk.coders.CoderRegistry;
27+
import org.apache.beam.sdk.transforms.Combine;
28+
29+
/** Merge schemas. */
30+
public class MergeSchemaCombineFn extends Combine.CombineFn<TableSchema, TableSchema, TableSchema> {
31+
@Override
32+
public TableSchema createAccumulator() {
33+
return TableSchema.newBuilder().build();
34+
}
35+
36+
@Override
37+
public TableSchema addInput(TableSchema accumulator, TableSchema input) {
38+
if (input.equals(accumulator)) {
39+
return accumulator;
40+
}
41+
return UpgradeTableSchema.mergeSchemas(accumulator, input);
42+
}
43+
44+
@Override
45+
public Coder<TableSchema> getAccumulatorCoder(
46+
CoderRegistry registry, Coder<TableSchema> inputCoder) {
47+
return inputCoder;
48+
}
49+
50+
@Override
51+
public Coder<TableSchema> getDefaultOutputCoder(
52+
CoderRegistry registry, Coder<TableSchema> inputCoder) throws CannotProvideCoderException {
53+
return inputCoder;
54+
}
55+
56+
@Override
57+
public TableSchema mergeAccumulators(Iterable<TableSchema> accumulators) {
58+
checkNotNull(accumulators, "accumulators must be non-null");
59+
60+
Iterator<TableSchema> iter = accumulators.iterator();
61+
if (!iter.hasNext()) {
62+
return createAccumulator();
63+
}
64+
65+
TableSchema merged = iter.next();
66+
while (iter.hasNext()) {
67+
merged = addInput(merged, iter.next());
68+
}
69+
return merged;
70+
}
71+
72+
@Override
73+
public TableSchema extractOutput(TableSchema accumulator) {
74+
return accumulator;
75+
}
76+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import com.google.api.client.util.BackOff;
21+
import com.google.api.client.util.BackOffUtils;
22+
import com.google.api.client.util.ExponentialBackOff;
23+
import com.google.cloud.bigquery.storage.v1.TableSchema;
24+
import com.google.cloud.hadoop.util.ApiErrorExtractor;
25+
import java.io.IOException;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.beam.sdk.options.PipelineOptions;
28+
import org.apache.beam.sdk.transforms.DoFn;
29+
import org.apache.beam.sdk.values.KV;
30+
import org.checkerframework.checker.nullness.qual.NonNull;
31+
import org.checkerframework.checker.nullness.qual.Nullable;
32+
33+
/**
34+
* This DoFn is responsible for updating a BigQuery's table schema. The input is a TableSchema
35+
* containing only the schema delta (new fields, relaxed fields). It outputs elements for all
36+
* updated tables, which act as notifcations to the buffering stage that the elements can be
37+
* retried.
38+
*/
39+
public class PatchTableSchemaDoFn<DestinationT extends @NonNull Object, ElementT>
40+
extends DoFn<KV<DestinationT, TableSchema>, KV<DestinationT, ElementT>> {
41+
private final BigQueryServices bqServices;
42+
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
43+
private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
44+
private transient BigQueryServices.@Nullable DatasetService datasetServiceInternal = null;
45+
46+
PatchTableSchemaDoFn(
47+
String operationName,
48+
BigQueryServices bqServices,
49+
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations) {
50+
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
51+
this.bqServices = bqServices;
52+
this.dynamicDestinations = dynamicDestinations;
53+
}
54+
55+
private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions)
56+
throws IOException {
57+
if (datasetServiceInternal == null) {
58+
datasetServiceInternal =
59+
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
60+
}
61+
return datasetServiceInternal;
62+
}
63+
64+
@ProcessElement
65+
public void processElement(
66+
@Element KV<DestinationT, TableSchema> element,
67+
OutputReceiver<KV<DestinationT, @Nullable ElementT>> o,
68+
ProcessContext context,
69+
PipelineOptions pipelineOptions)
70+
throws Exception {
71+
dynamicDestinations.setSideInputAccessorFromProcessContext(context);
72+
DestinationT destination = element.getKey();
73+
TableSchema tableSchemaDiff = element.getValue();
74+
75+
StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter =
76+
messageConverters.get(destination, dynamicDestinations, getDatasetService(pipelineOptions));
77+
messageConverter.updateSchemaFromTable();
78+
79+
while (true) {
80+
TableSchema baseSchema = messageConverter.getTableSchema();
81+
TableSchema updatedSchema = UpgradeTableSchema.mergeSchemas(baseSchema, tableSchemaDiff);
82+
if (baseSchema.equals(updatedSchema)) {
83+
return;
84+
}
85+
// Check first to see if the schema still needs updating.
86+
BackOff backoff =
87+
new ExponentialBackOff.Builder()
88+
.setMaxElapsedTimeMillis((int) TimeUnit.MINUTES.toMillis(1))
89+
.build();
90+
boolean schemaOutOfDate = false;
91+
do {
92+
try {
93+
getDatasetService(pipelineOptions)
94+
.patchTableSchema(
95+
dynamicDestinations.getTable(destination).getTableReference(),
96+
TableRowToStorageApiProto.protoSchemaToTableSchema(updatedSchema));
97+
// Indicate that we've patched this schema.
98+
o.output(KV.of(destination, null));
99+
return;
100+
} catch (IOException e) {
101+
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
102+
if (errorExtractor.preconditionNotMet(e) || errorExtractor.badRequest(e)) {
103+
schemaOutOfDate = true;
104+
break;
105+
}
106+
}
107+
} while (BackOffUtils.next(com.google.api.client.util.Sleeper.DEFAULT, backoff));
108+
if (schemaOutOfDate) {
109+
// This could be due to an out-of-date schema.
110+
messageConverter.updateSchemaFromTable();
111+
} else {
112+
// We ran out of retries.
113+
throw new RuntimeException("Failed to patch table schema");
114+
}
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)