Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,16 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
}
if (plainText != null && Boolean.TRUE.equals(plainText.get())) {
builder.setChannelConfigurator(b -> b.usePlaintext());
builder.setCredentials(NoCredentials.getInstance());
}
ValueProvider<String> clientCert = spannerConfig.getClientCertPath();
ValueProvider<String> clientKey = spannerConfig.getClientCertKeyPath();
if (clientCert != null
&& clientKey != null
&& clientCert.isAccessible()
&& clientKey.isAccessible()
&& !Strings.isNullOrEmpty(clientCert.get())
&& !Strings.isNullOrEmpty(clientKey.get())) {
builder.useClientCert(clientCert.get(), clientKey.get());
}
Comment thread
sagnghos marked this conversation as resolved.
}
Comment thread
sagnghos marked this conversation as resolved.

Expand All @@ -273,6 +282,8 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
ValueProvider<Credentials> credentials = spannerConfig.getCredentials();
if (credentials != null && credentials.get() != null) {
builder.setCredentials(credentials.get());
} else if (experimentalHost != null && !Strings.isNullOrEmpty(experimentalHost.get())) {
builder.setCredentials(NoCredentials.getInstance());
}

ValueProvider<java.time.Duration> waitForSessionCreationDuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public String getHostValue() {

public abstract @Nullable ValueProvider<Boolean> getPlainText();

public abstract @Nullable ValueProvider<String> getClientCertPath();

public abstract @Nullable ValueProvider<String> getClientCertKeyPath();

@VisibleForTesting
abstract @Nullable ServiceFactory<Spanner, SpannerOptions> getServiceFactory();

Expand Down Expand Up @@ -194,6 +198,10 @@ abstract Builder setExecuteStreamingSqlRetrySettings(
abstract Builder setWaitForSessionCreationDuration(
ValueProvider<java.time.Duration> waitForSessionCreationDuration);

abstract Builder setClientCertPath(ValueProvider<String> clientCertPath);

abstract Builder setClientCertKeyPath(ValueProvider<String> clientCertKeyPath);

public abstract SpannerConfig build();
}

Expand Down Expand Up @@ -414,4 +422,33 @@ public SpannerConfig withWaitForSessionCreationDuration(
return withWaitForSessionCreationDuration(
ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using a Spanner Omni instance (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public SpannerConfig withClientCert(
ValueProvider<String> certPath, ValueProvider<String> keyPath) {
return toBuilder().setClientCertPath(certPath).setClientCertKeyPath(keyPath).build();
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using a Spanner Omni instance (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public SpannerConfig withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceFactory;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
Expand Down Expand Up @@ -146,6 +147,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -657,6 +659,35 @@ public ReadAll withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public ReadAll withClientCert(ValueProvider<String> certPath, ValueProvider<String> keyPath) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withClientCert(certPath, keyPath));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public ReadAll withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}

/** Specifies the Cloud Spanner database. */
public ReadAll withDatabaseId(ValueProvider<String> databaseId) {
SpannerConfig config = getSpannerConfig();
Expand Down Expand Up @@ -917,6 +948,35 @@ public Read withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public Read withClientCert(ValueProvider<String> certPath, ValueProvider<String> keyPath) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withClientCert(certPath, keyPath));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public Read withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}

/** If true the uses Cloud Spanner batch API. */
public Read withBatching(boolean batching) {
return toBuilder().setBatching(batching).build();
Expand Down Expand Up @@ -1244,6 +1304,36 @@ public CreateTransaction withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public CreateTransaction withClientCert(
ValueProvider<String> certPath, ValueProvider<String> keyPath) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withClientCert(certPath, keyPath));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public CreateTransaction withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}

@VisibleForTesting
CreateTransaction withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
SpannerConfig config = getSpannerConfig();
Expand Down Expand Up @@ -1412,6 +1502,35 @@ public Write withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public Write withClientCert(ValueProvider<String> certPath, ValueProvider<String> keyPath) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withClientCert(certPath, keyPath));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public Write withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}

public Write withDialectView(PCollectionView<Dialect> dialect) {
return toBuilder().setDialectView(dialect).build();
}
Expand Down Expand Up @@ -1770,6 +1889,10 @@ public abstract static class ReadChangeStream

abstract @Nullable ValueProvider<Boolean> getPlainText();

abstract @Nullable ValueProvider<String> getClientCertPath();

abstract @Nullable ValueProvider<String> getClientCertKeyPath();

abstract Duration getRealTimeCheckpointInterval();

abstract int getHeartbeatMillis();
Expand Down Expand Up @@ -1807,6 +1930,10 @@ abstract static class Builder {

abstract Builder setPlainText(ValueProvider<Boolean> plainText);

abstract Builder setClientCertPath(ValueProvider<String> clientCertPath);

abstract Builder setClientCertKeyPath(ValueProvider<String> clientCertKeyPath);

/**
* When caught up to real-time, checkpoint processing of change stream this often. This sets a
* bound on latency of processing if a steady trickle of elements prevents the heartbeat
Expand Down Expand Up @@ -1946,6 +2073,36 @@ public ReadChangeStream withUsingPlainTextChannel(boolean plainText) {
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public ReadChangeStream withClientCert(
ValueProvider<String> certPath, ValueProvider<String> keyPath) {
SpannerConfig config = getSpannerConfig();
return withSpannerConfig(config.withClientCert(certPath, keyPath));
}

/**
* Specifies certificate paths to use for mTLS channel.
*
* <p>Note: These parameters are only valid when using Spanner Omni (set via {@code
* withExperimentalHost}).
*
* @param certPath Path to the client certificate file.
* @param keyPath Path to the client certificate key file.
*/
public ReadChangeStream withClientCert(String certPath, String keyPath) {
return withClientCert(
ValueProvider.StaticValueProvider.of(certPath),
ValueProvider.StaticValueProvider.of(keyPath));
}

/**
* Configures low latency experiment for readChangeStream transform. Example usage:
*
Expand Down Expand Up @@ -2177,9 +2334,17 @@ SpannerConfig buildChangeStreamSpannerConfig() {
static SpannerConfig buildSpannerConfigWithCredential(
SpannerConfig spannerConfig, PipelineOptions pipelineOptions) {
if (spannerConfig.getCredentials() == null && pipelineOptions != null) {
final Credentials credentials = pipelineOptions.as(GcpOptions.class).getGcpCredential();
if (credentials != null) {
spannerConfig = spannerConfig.withCredentials(credentials);
boolean isExperimentalHostEmpty =
spannerConfig.getExperimentalHost() == null
|| (spannerConfig.getExperimentalHost().isAccessible()
&& Strings.isNullOrEmpty(spannerConfig.getExperimentalHost().get()));
if (isExperimentalHostEmpty) {
final Credentials credentials = pipelineOptions.as(GcpOptions.class).getGcpCredential();
if (credentials != null) {
spannerConfig = spannerConfig.withCredentials(credentials);
}
} else {
spannerConfig = spannerConfig.withCredentials(NoCredentials.getInstance());
}
}
return spannerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public abstract static class CrossLanguageConfiguration {
@Nullable String emulatorHost;
@Nullable String experimentalHost;
@Nullable Boolean plainText;
@Nullable String clientCertPath;
@Nullable String clientCertKeyPath;

public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
Expand Down Expand Up @@ -110,6 +112,14 @@ public void setPlainText(@Nullable Boolean plainText) {
this.plainText = plainText;
}

public void setClientCertPath(@Nullable String clientCertPath) {
this.clientCertPath = clientCertPath;
}

public void setClientCertKeyPath(@Nullable String clientCertKeyPath) {
this.clientCertKeyPath = clientCertKeyPath;
}

void checkMandatoryFields() {
if (projectId.isEmpty()) {
throw new IllegalArgumentException("projectId can't be empty");
Comment thread
sagnghos marked this conversation as resolved.
Expand All @@ -120,6 +130,10 @@ void checkMandatoryFields() {
if (instanceId.isEmpty()) {
throw new IllegalArgumentException("instanceId can't be empty");
}
if ((clientCertPath != null) != (clientCertKeyPath != null)) {
throw new IllegalArgumentException(
"Both clientCertPath and clientCertKeyPath must be specified together.");
}
}
}

Expand Down Expand Up @@ -249,6 +263,11 @@ public PTransform<PBegin, PCollection<Row>> buildExternal(
if (configuration.plainText != null) {
readTransform = readTransform.withUsingPlainTextChannel(configuration.plainText);
}
if (configuration.clientCertPath != null && configuration.clientCertKeyPath != null) {
readTransform =
readTransform.withClientCert(
configuration.clientCertPath, configuration.clientCertKeyPath);
}
@Nullable TimestampBound timestampBound = configuration.getTimestampBound();
if (timestampBound != null) {
readTransform = readTransform.withTimestampBound(timestampBound);
Expand Down Expand Up @@ -393,6 +412,11 @@ public PTransform<PCollection<Row>, PDone> buildExternal(
if (configuration.plainText != null) {
writeTransform = writeTransform.withUsingPlainTextChannel(configuration.plainText);
}
if (configuration.clientCertPath != null && configuration.clientCertKeyPath != null) {
writeTransform =
writeTransform.withClientCert(
configuration.clientCertPath, configuration.clientCertKeyPath);
}
if (configuration.commitDeadline != null) {
writeTransform = writeTransform.withCommitDeadline(configuration.commitDeadline);
}
Expand Down Expand Up @@ -504,6 +528,12 @@ public PTransform<PBegin, PCollection<String>> buildExternal(
readChangeStream = readChangeStream.withMetadataTable(configuration.metadataTable);
}

if (configuration.clientCertPath != null && configuration.clientCertKeyPath != null) {
readChangeStream =
readChangeStream.withClientCert(
configuration.clientCertPath, configuration.clientCertKeyPath);
}

if (configuration.rpcPriority != null) {

readChangeStream = readChangeStream.withRpcPriority(configuration.rpcPriority);
Expand Down
Loading
Loading