diff --git a/google-cloud-spanner-executor/pom.xml b/google-cloud-spanner-executor/pom.xml index ee00087f472..fcbb20517ae 100644 --- a/google-cloud-spanner-executor/pom.xml +++ b/google-cloud-spanner-executor/pom.xml @@ -59,6 +59,11 @@ + + com.google.cloud + grpc-gcp + ${grpc.gcp.version} + io.opentelemetry.semconv opentelemetry-semconv @@ -296,7 +301,7 @@ org.apache.maven.plugins maven-dependency-plugin - com.google.api:gax,org.apache.maven.surefire:surefire-junit4,io.opentelemetry.semconv:opentelemetry-semconv,com.google.cloud.opentelemetry:shared-resourcemapping + com.google.api:gax,org.apache.maven.surefire:surefire-junit4,io.opentelemetry.semconv:opentelemetry-semconv,com.google.cloud.opentelemetry:shared-resourcemapping,com.google.cloud:grpc-gcp diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index cacb13c8592..14519670dfb 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -166,6 +166,7 @@ com.google.cloud grpc-gcp + ${grpc.gcp.version} io.grpc diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 765114dc68d..20ed760c18d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -1025,7 +1025,7 @@ public static class Builder private DatabaseAdminStubSettings.Builder databaseAdminStubSettingsBuilder = DatabaseAdminStubSettings.newBuilder(); private Duration partitionedDmlTimeout = Duration.ofHours(2L); - private boolean grpcGcpExtensionEnabled = false; + private boolean grpcGcpExtensionEnabled = true; private GcpManagedChannelOptions grpcGcpOptions; private RetrySettings retryAdministrativeRequestsSettings = DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS; @@ -1557,20 +1557,14 @@ public Builder setExperimentalHost(String host) { return this; } - /** - * Enables gRPC-GCP extension with the default settings. Do not set - * GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as - * Multiplexed sessions are not supported for gRPC-GCP. - */ + /** Enables gRPC-GCP extension with the default settings. This option is enabled by default. */ public Builder enableGrpcGcpExtension() { return this.enableGrpcGcpExtension(null); } /** * Enables gRPC-GCP extension and uses provided options for configuration. The metric registry - * and default Spanner metric labels will be added automatically. Do not set - * GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS to true in combination with this option, as - * Multiplexed sessions are not supported for gRPC-GCP. + * and default Spanner metric labels will be added automatically. */ public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) { this.grpcGcpExtensionEnabled = true; @@ -1578,7 +1572,7 @@ public Builder enableGrpcGcpExtension(GcpManagedChannelOptions options) { return this; } - /** Disables gRPC-GCP extension. */ + /** Disables gRPC-GCP extension and uses GAX channel pool instead. */ public Builder disableGrpcGcpExtension() { this.grpcGcpExtensionEnabled = false; return this; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 306bd9842ac..dc7ee8de9c2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -188,7 +188,6 @@ import io.grpc.Context; import io.grpc.ManagedChannelBuilder; import io.grpc.MethodDescriptor; -import io.opencensus.metrics.Metrics; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -570,7 +569,7 @@ private static String parseGrpcGcpApiConfig() { } } - // Enhance metric options for gRPC-GCP extension. Adds metric registry if not specified. + // Enhance metric options for gRPC-GCP extension. private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) { GcpManagedChannelOptions grpcGcpOptions = MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions()); @@ -578,9 +577,6 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions MoreObjects.firstNonNull( grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build()); GcpMetricsOptions.Builder metricsOptionsBuilder = GcpMetricsOptions.newBuilder(metricsOptions); - if (metricsOptions.getMetricRegistry() == null) { - metricsOptionsBuilder.withMetricRegistry(Metrics.getMetricRegistry()); - } // TODO: Add default labels with values: client_id, database, instance_id. if (metricsOptions.getNamePrefix().equals("")) { metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/"); diff --git a/google-cloud-spanner/src/main/resources/META-INF/native-image/com.google.cloud.spanner/grpc-gcp-reflect-config.json b/google-cloud-spanner/src/main/resources/META-INF/native-image/com.google.cloud.spanner/grpc-gcp-reflect-config.json new file mode 100644 index 00000000000..a92f2c29737 --- /dev/null +++ b/google-cloud-spanner/src/main/resources/META-INF/native-image/com.google.cloud.spanner/grpc-gcp-reflect-config.json @@ -0,0 +1,56 @@ +[ + { + "name": "com.google.cloud.grpc.proto.ApiConfig", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.ApiConfig$Builder", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.ChannelPoolConfig", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.ChannelPoolConfig$Builder", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.MethodConfig", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.MethodConfig$Builder", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.AffinityConfig", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.AffinityConfig$Builder", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + }, + { + "name": "com.google.cloud.grpc.proto.AffinityConfig$Command", + "allDeclaredFields": true, + "allDeclaredMethods": true, + "allDeclaredConstructors": true + } +] diff --git a/google-cloud-spanner/src/main/resources/META-INF/native-image/native-image.properties b/google-cloud-spanner/src/main/resources/META-INF/native-image/native-image.properties index 44bcd53941a..566244d3e59 100644 --- a/google-cloud-spanner/src/main/resources/META-INF/native-image/native-image.properties +++ b/google-cloud-spanner/src/main/resources/META-INF/native-image/native-image.properties @@ -2,4 +2,5 @@ Args = --initialize-at-build-time=com.google.cloud.spanner.IntegrationTestEnv,\ org.junit.experimental.categories.CategoryValidator,\ org.junit.validator.AnnotationValidator,\ java.lang.annotation.Annotation \ + -H:ReflectionConfigurationResources=${.}/com.google.cloud.spanner/grpc-gcp-reflect-config.json \ --features=com.google.cloud.spanner.nativeimage.SpannerFeature diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java index a06eeb91662..89a73237381 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java @@ -17,10 +17,9 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.DisableDefaultMtlsProvider.disableDefaultMtlsProvider; -import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; +import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; @@ -32,7 +31,6 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.TypeCode; -import io.grpc.Attributes; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.Metadata; @@ -70,13 +68,9 @@ public class ChannelUsageTest { @Parameter(0) public int numChannels; - @Parameter(1) - public boolean enableGcpPool; - - @Parameters(name = "num channels = {0}, enable GCP pool = {1}") + @Parameters(name = "num channels = {0}") public static Collection data() { - return Arrays.asList( - new Object[][] {{1, true}, {1, false}, {2, true}, {2, false}, {4, true}, {4, false}}); + return Arrays.asList(new Object[][] {{1}, {2}, {4}}); } private static final Statement SELECT1 = Statement.of("SELECT 1 AS COL1"); @@ -106,9 +100,9 @@ public static Collection data() { private static MockSpannerServiceImpl mockSpanner; private static Server server; private static InetSocketAddress address; - private static final Set batchCreateSessionLocalIps = - ConcurrentHashMap.newKeySet(); - private static final Set executeSqlLocalIps = ConcurrentHashMap.newKeySet(); + // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method + private static final Set batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); + private static final Set executeSqlChannelHints = ConcurrentHashMap.newKeySet(); private static Level originalLogLevel; @@ -123,8 +117,8 @@ public static void startServer() throws Exception { server = NettyServerBuilder.forAddress(address) .addService(mockSpanner) - // Add a server interceptor to register the remote addresses that we are seeing. This - // indicates how many channels are used client side to communicate with the server. + // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id + // header. This verifies that the client uses all configured channels. .intercept( new ServerInterceptor() { @Override @@ -138,22 +132,26 @@ public ServerCall.Listener interceptCall( headers.get( Metadata.Key.of( "x-response-encoding", Metadata.ASCII_STRING_MARSHALLER))); - Attributes attributes = call.getAttributes(); - @SuppressWarnings({"unchecked", "deprecation"}) - Attributes.Key key = - (Attributes.Key) - attributes.keys().stream() - .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR)) - .findFirst() - .orElse(null); - if (key != null) { - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { - batchCreateSessionLocalIps.add(attributes.get(key)); - } - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { - executeSqlLocalIps.add(attributes.get(key)); + // Extract channel hint from X-Goog-Spanner-Request-Id header + String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); + if (requestId != null) { + // Format: + // ..... + String[] parts = requestId.split("\\."); + if (parts.length >= 4) { + try { + long channelHint = Long.parseLong(parts[3]); + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { + batchCreateSessionChannelHints.add(channelHint); + } + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { + executeSqlChannelHints.add(channelHint); + } + } catch (NumberFormatException e) { + // Ignore parse errors + } } } return Contexts.interceptCall(Context.current(), call, headers, next); @@ -185,8 +183,8 @@ public static void resetLogging() { @After public void reset() { mockSpanner.reset(); - batchCreateSessionLocalIps.clear(); - executeSqlLocalIps.clear(); + batchCreateSessionChannelHints.clear(); + executeSqlChannelHints.clear(); } private SpannerOptions createSpannerOptions() { @@ -208,34 +206,14 @@ private SpannerOptions createSpannerOptions() { .build()) .setHost("http://" + endpoint) .setCredentials(NoCredentials.getInstance()); - if (enableGcpPool) { - builder.enableGrpcGcpExtension(); - } return builder.build(); } - @Test - public void testCreatesNumChannels() { - try (Spanner spanner = createSpannerOptions().getService()) { - assumeFalse( - "GRPC-GCP is currently not supported with multiplexed sessions", - isMultiplexedSessionsEnabled(spanner) && enableGcpPool); - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { - while (resultSet.next()) {} - } - } - assertEquals(numChannels, batchCreateSessionLocalIps.size()); - } - @Test public void testUsesAllChannels() throws InterruptedException { final int multiplier = 2; try (Spanner spanner = createSpannerOptions().getService()) { - assumeFalse( - "GRPC-GCP is currently not supported with multiplexed sessions", - isMultiplexedSessionsEnabled(spanner)); DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numChannels * multiplier)); @@ -263,13 +241,23 @@ public void testUsesAllChannels() throws InterruptedException { executor.shutdown(); assertTrue(executor.awaitTermination(Duration.ofSeconds(10L))); } - assertEquals(numChannels, executeSqlLocalIps.size()); - } - - private boolean isMultiplexedSessionsEnabled(Spanner spanner) { - if (spanner.getOptions() == null || spanner.getOptions().getSessionPoolOptions() == null) { - return false; + // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify + // that channels are being distributed. The raw channel hints may be unbounded (based on + // session index), but gRPC-GCP bounds them to the actual number of channels. + Set boundedChannelHints = + executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); + // Verify that channel distribution is working: + // - For numChannels=1, exactly 1 channel should be used + // - For numChannels>1, multiple channels should be used (at least half) + if (numChannels == 1) { + assertEquals(1, boundedChannelHints.size()); + } else { + assertTrue( + "Expected at least " + + (numChannels / 2) + + " channels to be used, but got " + + boundedChannelHints.size(), + boundedChannelHints.size() >= numChannels / 2); } - return spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java index e7ef9955d4f..a744163063a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnDifferentGrpcChannelMockServerTest.java @@ -27,7 +27,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.connection.AbstractMockServerTest; -import com.google.common.collect.ImmutableSet; import com.google.spanner.v1.BatchCreateSessionsRequest; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.ExecuteSqlRequest; @@ -64,6 +63,7 @@ @RunWith(JUnit4.class) public class RetryOnDifferentGrpcChannelMockServerTest extends AbstractMockServerTest { private static final Map> SERVER_ADDRESSES = new HashMap<>(); + private static final Map> CHANNEL_HINTS = new HashMap<>(); @BeforeClass public static void startStaticServer() throws IOException { @@ -79,6 +79,7 @@ public static void removeSystemProperty() { @After public void clearRequests() { SERVER_ADDRESSES.clear(); + CHANNEL_HINTS.clear(); mockSpanner.clearRequests(); mockSpanner.removeAllExecutionTimes(); } @@ -91,6 +92,7 @@ public Listener interceptCall( Metadata metadata, ServerCallHandler serverCallHandler) { Attributes attributes = serverCall.getAttributes(); + String methodName = serverCall.getMethodDescriptor().getFullMethodName(); //noinspection unchecked,deprecation Attributes.Key key = (Attributes.Key) @@ -102,11 +104,26 @@ public Listener interceptCall( InetSocketAddress address = attributes.get(key); synchronized (SERVER_ADDRESSES) { Set addresses = - SERVER_ADDRESSES.getOrDefault( - serverCall.getMethodDescriptor().getFullMethodName(), new HashSet<>()); + SERVER_ADDRESSES.getOrDefault(methodName, new HashSet<>()); addresses.add(address); - SERVER_ADDRESSES.putIfAbsent( - serverCall.getMethodDescriptor().getFullMethodName(), addresses); + SERVER_ADDRESSES.putIfAbsent(methodName, addresses); + } + } + String requestId = metadata.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); + if (requestId != null) { + // REQUEST_ID format: version.randProcessId.nthClientId.nthChannelId.nthRequest.attempt + String[] parts = requestId.split("\\."); + if (parts.length >= 6) { + try { + long channelHint = Long.parseLong(parts[3]); + synchronized (CHANNEL_HINTS) { + Set hints = CHANNEL_HINTS.getOrDefault(methodName, new HashSet<>()); + hints.add(channelHint); + CHANNEL_HINTS.putIfAbsent(methodName, hints); + } + } catch (NumberFormatException ignore) { + // Ignore malformed header values in tests. + } } } return serverCallHandler.startCall(serverCall, metadata); @@ -157,8 +174,8 @@ public void testReadWriteTransaction_retriesOnNewChannel() { assertNotEquals(requests.get(0).getSession(), requests.get(1).getSession()); assertEquals( 2, - SERVER_ADDRESSES - .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + CHANNEL_HINTS + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", new HashSet<>()) .size()); } @@ -201,8 +218,8 @@ public void testReadWriteTransaction_stopsRetrying() { assertEquals(numChannels, sessions.size()); assertEquals( numChannels, - SERVER_ADDRESSES - .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + CHANNEL_HINTS + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", new HashSet<>()) .size()); } } @@ -275,8 +292,8 @@ public void testDenyListedChannelIsCleared() { assertEquals(numChannels + 1, sessions.size()); assertEquals( numChannels, - SERVER_ADDRESSES - .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", ImmutableSet.of()) + CHANNEL_HINTS + .getOrDefault("google.spanner.v1.Spanner/BeginTransaction", new HashSet<>()) .size()); assertEquals(numChannels, mockSpanner.countRequestsOfType(BatchCreateSessionsRequest.class)); } @@ -303,11 +320,11 @@ public void testSingleUseQuery_retriesOnNewChannel() { List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); // The requests use the same multiplexed session. assertEquals(requests.get(0).getSession(), requests.get(1).getSession()); - // The requests use two different gRPC channels. + // The requests use two different channel hints (which may map to same physical channel). assertEquals( 2, - SERVER_ADDRESSES - .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", ImmutableSet.of()) + CHANNEL_HINTS + .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) .size()); } @@ -327,19 +344,19 @@ public void testSingleUseQuery_stopsRetrying() { assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); } int numChannels = spanner.getOptions().getNumChannels(); - assertEquals(numChannels, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class)); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); // The requests use the same multiplexed session. String session = requests.get(0).getSession(); for (ExecuteSqlRequest request : requests) { assertEquals(session, request.getSession()); } - // The requests use all gRPC channels. - assertEquals( - numChannels, - SERVER_ADDRESSES - .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", ImmutableSet.of()) - .size()); + // Each attempt, including retries, must use a distinct channel hint. + int totalRequests = mockSpanner.countRequestsOfType(ExecuteSqlRequest.class); + int distinctHints = + CHANNEL_HINTS + .getOrDefault("google.spanner.v1.Spanner/ExecuteStreamingSql", new HashSet<>()) + .size(); + assertEquals(totalRequests, distinctHints); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 9fc065f944c..be292eab5aa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -1100,6 +1100,7 @@ public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() { SpannerOptions.newBuilder() .setProjectId("test-project") .setCredentials(NoCredentials.getInstance()) + .disableGrpcGcpExtension() .build(); assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels()); @@ -1135,7 +1136,8 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() { @Test public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() { - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build(); + SpannerOptions options = + SpannerOptions.newBuilder().setProjectId("test-project").disableGrpcGcpExtension().build(); SpannerOptions options1 = options.toBuilder().build(); assertEquals(false, options.isGrpcGcpExtensionEnabled()); assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java index b68ef4667d5..c8f3162255f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionChannelHintTest.java @@ -21,7 +21,6 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_RESULTSET; import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; import static com.google.cloud.spanner.MockSpannerTestUtil.READ_TABLE_NAME; -import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -35,7 +34,6 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.TypeCode; -import io.grpc.Attributes; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.Metadata; @@ -62,7 +60,8 @@ * transaction, they go via same channel. For regular session, the hint is stored per session. For * multiplexed sessions this hint is stored per transaction. * - *

The below tests assert this behavior for both kinds of sessions. + *

The below tests assert this behavior by verifying that all operations within a transaction use + * the same channel hint (extracted from the X-Goog-Spanner-Request-Id header). */ @RunWith(JUnit4.class) public class TransactionChannelHintTest { @@ -94,10 +93,10 @@ public class TransactionChannelHintTest { private static MockSpannerServiceImpl mockSpanner; private static Server server; private static InetSocketAddress address; - private static final Set executeSqlLocalIps = ConcurrentHashMap.newKeySet(); - private static final Set beginTransactionLocalIps = - ConcurrentHashMap.newKeySet(); - private static final Set streamingReadLocalIps = ConcurrentHashMap.newKeySet(); + // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method + private static final Set executeSqlChannelHints = ConcurrentHashMap.newKeySet(); + private static final Set beginTransactionChannelHints = ConcurrentHashMap.newKeySet(); + private static final Set streamingReadChannelHints = ConcurrentHashMap.newKeySet(); private static Level originalLogLevel; @BeforeClass @@ -113,8 +112,8 @@ public static void startServer() throws Exception { server = NettyServerBuilder.forAddress(address) .addService(mockSpanner) - // Add a server interceptor to register the remote addresses that we are seeing. This - // indicates how many channels are used client side to communicate with the server. + // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id + // header. This verifies that all operations in a transaction use the same channel hint. .intercept( new ServerInterceptor() { @Override @@ -122,25 +121,30 @@ public ServerCall.Listener interceptCall( ServerCall call, Metadata headers, ServerCallHandler next) { - Attributes attributes = call.getAttributes(); - @SuppressWarnings({"unchecked", "deprecation"}) - Attributes.Key key = - (Attributes.Key) - attributes.keys().stream() - .filter(k -> k.equals(TRANSPORT_ATTR_REMOTE_ADDR)) - .findFirst() - .orElse(null); - if (key != null) { - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { - executeSqlLocalIps.add(attributes.get(key)); - } - if (call.getMethodDescriptor().equals(SpannerGrpc.getStreamingReadMethod())) { - streamingReadLocalIps.add(attributes.get(key)); - } - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getBeginTransactionMethod())) { - beginTransactionLocalIps.add(attributes.get(key)); + // Extract channel hint from X-Goog-Spanner-Request-Id header + String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); + if (requestId != null) { + // Format: + // ..... + String[] parts = requestId.split("\\."); + if (parts.length >= 4) { + try { + long channelHint = Long.parseLong(parts[3]); + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { + executeSqlChannelHints.add(channelHint); + } + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getStreamingReadMethod())) { + streamingReadChannelHints.add(channelHint); + } + if (call.getMethodDescriptor() + .equals(SpannerGrpc.getBeginTransactionMethod())) { + beginTransactionChannelHints.add(channelHint); + } + } catch (NumberFormatException e) { + // Ignore parse errors + } } } return Contexts.interceptCall(Context.current(), call, headers, next); @@ -172,9 +176,9 @@ public static void resetLogging() { @After public void reset() { mockSpanner.reset(); - executeSqlLocalIps.clear(); - streamingReadLocalIps.clear(); - beginTransactionLocalIps.clear(); + executeSqlChannelHints.clear(); + streamingReadChannelHints.clear(); + beginTransactionChannelHints.clear(); } private SpannerOptions createSpannerOptions() { @@ -195,18 +199,18 @@ private SpannerOptions createSpannerOptions() { } @Test - public void testSingleUseReadOnlyTransaction_usesSingleChannel() { + public void testSingleUseReadOnlyTransaction_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); try (ResultSet resultSet = client.singleUseReadOnlyTransaction().executeQuery(SELECT1)) { while (resultSet.next()) {} } } - assertEquals(1, executeSqlLocalIps.size()); + assertEquals(1, executeSqlChannelHints.size()); } @Test - public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannel() { + public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); try (ResultSet resultSet = @@ -216,11 +220,11 @@ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChanne while (resultSet.next()) {} } } - assertEquals(1, executeSqlLocalIps.size()); + assertEquals(1, executeSqlChannelHints.size()); } @Test - public void testReadOnlyTransaction_usesSingleChannel() { + public void testReadOnlyTransaction_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { @@ -232,13 +236,14 @@ public void testReadOnlyTransaction_usesSingleChannel() { } } } - assertEquals(1, executeSqlLocalIps.size()); - assertEquals(1, beginTransactionLocalIps.size()); - assertEquals(executeSqlLocalIps, beginTransactionLocalIps); + // All ExecuteSql calls within the transaction should use the same channel hint + assertEquals(1, executeSqlChannelHints.size()); + // BeginTransaction should use a single channel hint + assertEquals(1, beginTransactionChannelHints.size()); } @Test - public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel() { + public void testReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); try (ReadOnlyTransaction transaction = @@ -251,13 +256,14 @@ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannel() { } } } - assertEquals(1, executeSqlLocalIps.size()); - assertEquals(1, beginTransactionLocalIps.size()); - assertEquals(executeSqlLocalIps, beginTransactionLocalIps); + // All ExecuteSql calls within the transaction should use the same channel hint + assertEquals(1, executeSqlChannelHints.size()); + // BeginTransaction should use a single channel hint + assertEquals(1, beginTransactionChannelHints.size()); } @Test - public void testTransactionManager_usesSingleChannel() { + public void testTransactionManager_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); try (TransactionManager manager = client.transactionManager()) { @@ -282,11 +288,11 @@ public void testTransactionManager_usesSingleChannel() { } } } - assertEquals(1, executeSqlLocalIps.size()); + assertEquals(1, executeSqlChannelHints.size()); } @Test - public void testTransactionRunner_usesSingleChannel() { + public void testTransactionRunner_usesSingleChannelHint() { try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); TransactionRunner runner = client.readWriteTransaction(); @@ -312,6 +318,6 @@ public void testTransactionRunner_usesSingleChannel() { return null; }); } - assertEquals(1, streamingReadLocalIps.size()); + assertEquals(1, streamingReadChannelHints.size()); } } diff --git a/pom.xml b/pom.xml index 29e65ec3502..ce528ca5cc7 100644 --- a/pom.xml +++ b/pom.xml @@ -54,6 +54,7 @@ UTF-8 github google-cloud-spanner-parent + 1.8.0 @@ -103,7 +104,6 @@ google-cloud-spanner 6.104.0 - com.google.cloud google-cloud-shared-dependencies