Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,56 @@
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/CompositeTracer</className>
<method>void recordGFELatency(java.lang.Float)</method>
</difference><difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/CompositeTracer</className>
<method>void recordGfeHeaderMissingCount(java.lang.Long)</method>
</difference>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/CompositeTracer</className>
<method>void recordGfeHeaderMissingCount(java.lang.Long)</method>
</difference>

<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerException</className>
<method>void setRequestId(com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerBatchUpdateException newSpannerBatchUpdateException(com.google.cloud.spanner.ErrorCode, java.lang.String, long[], com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerException newSpannerException(com.google.cloud.spanner.ErrorCode, java.lang.String, java.lang.Throwable, com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerException newSpannerException(com.google.cloud.spanner.ErrorCode, java.lang.String, com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerException newSpannerException(java.lang.Throwable, com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerException newSpannerException(io.grpc.Context, java.lang.Throwable, com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/spanner/SpannerExceptionFactory</className>
<method>com.google.cloud.spanner.SpannerException propagateInterrupt(java.lang.InterruptedException, com.google.cloud.spanner.XGoogSpannerRequestId)</method>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/spanner/XGoogSpannerRequestId</className>
<field>REQUEST_HEADER_KEY</field>
</difference>
<difference>
<differenceType>6001</differenceType>
<className>com/google/cloud/spanner/XGoogSpannerRequestId</className>
<field>REQUEST_ID</field>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,16 @@ public class AbortedException extends SpannerException {
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
AbortedException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
this(token, message, cause, null, null);
this(token, message, cause, null);
}

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
AbortedException(
DoNotConstructDirectly token,
@Nullable String message,
@Nullable Throwable cause,
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException, reqId);
@Nullable ApiException apiException) {
super(token, ErrorCode.ABORTED, IS_RETRYABLE, message, cause, apiException);
if (cause instanceof AbortedException) {
this.transactionID = ((AbortedException) cause).getTransactionID();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,30 +457,22 @@ void initTransaction() {
}

private void initTransactionInternal(BeginTransactionRequest request) {
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
try {
Transaction transaction =
rpc.beginTransaction(
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Missing expected transaction.read_timestamp metadata field",
reqId);
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
}
if (transaction.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
}
try {
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
} catch (IllegalArgumentException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Bad value in transaction.read_timestamp metadata field",
e,
reqId);
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
transactionId = transaction.getId();
span.addAnnotation(
Expand Down Expand Up @@ -816,7 +808,8 @@ ResultSet executeQueryInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
AsyncResultSet.StreamMessageListener streamListener,
XGoogSpannerRequestId requestId) {
GrpcStreamIterator stream =
new GrpcStreamIterator(
statement,
Expand All @@ -839,12 +832,12 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}
this.ensureNonNullXGoogRequestId();
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
Expand All @@ -860,7 +853,7 @@ boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
}

Map<SpannerRpc.Option, ?> getChannelHintOptions(
static Map<SpannerRpc.Option, ?> getChannelHintOptions(
Map<SpannerRpc.Option, ?> channelHintForSession, Long channelHintForTransaction) {
if (channelHintForSession != null) {
return channelHintForSession;
Expand Down Expand Up @@ -1030,7 +1023,8 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
AsyncResultSet.StreamMessageListener streamListener) {
AsyncResultSet.StreamMessageListener streamListener,
XGoogSpannerRequestId requestId) {
GrpcStreamIterator stream =
new GrpcStreamIterator(
lastStatement, prefetchChunks, cancelQueryWhenClientIsClosed);
Expand All @@ -1048,13 +1042,12 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
this.incrementXGoogRequestIdAttempt();
this.xGoogRequestId.setChannelId(session.getChannel());
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ public class AdminRequestsPerMinuteExceededException extends SpannerException {
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
AdminRequestsPerMinuteExceededException(
DoNotConstructDirectly token, @Nullable String message, @Nullable Throwable cause) {
this(token, message, cause, null, null);
this(token, message, cause, null);
}

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
AdminRequestsPerMinuteExceededException(
DoNotConstructDirectly token,
@Nullable String message,
@Nullable Throwable cause,
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, ErrorCode.RESOURCE_EXHAUSTED, true, message, cause, apiException, reqId);
@Nullable ApiException apiException) {
super(token, ErrorCode.RESOURCE_EXHAUSTED, true, message, cause, apiException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,9 @@ private List<Partition> partitionReadUsingIndex(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
final PartitionReadRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
PartitionResponse response = rpc.partitionRead(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -274,7 +272,6 @@ private List<Partition> partitionReadUsingIndex(
return partitionReadUsingIndex(
partitionOptions, table, index, keys, columns, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down Expand Up @@ -316,11 +313,9 @@ private List<Partition> partitionQuery(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
final PartitionQueryRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
PartitionResponse response = rpc.partitionQuery(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -333,7 +328,6 @@ private List<Partition> partitionQuery(
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
return partitionQuery(partitionOptions, statement, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID;
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_HEADER_NAME;

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.OpenTelemetryMetricsRecorder;
Expand Down Expand Up @@ -98,10 +98,12 @@ public class BuiltInMetricsConstant {
AttributeKey.stringKey("directpath_enabled");
public static final AttributeKey<String> DIRECT_PATH_USED_KEY =
AttributeKey.stringKey("directpath_used");
public static final AttributeKey<String> REQUEST_ID_KEY = AttributeKey.stringKey(REQUEST_ID);
public static final AttributeKey<String> REQUEST_ID_KEY =
AttributeKey.stringKey(REQUEST_ID_HEADER_NAME);
public static final AttributeKey<String> GRPC_XDS_RESOURCE_TYPE_KEY =
AttributeKey.stringKey("grpc.xds.resource_type");
public static Set<String> ALLOWED_EXEMPLARS_ATTRIBUTES = new HashSet<>(Arrays.asList(REQUEST_ID));
public static Set<String> ALLOWED_EXEMPLARS_ATTRIBUTES =
new HashSet<>(Arrays.asList(REQUEST_ID_HEADER_NAME));

// IP address prefixes allocated for DirectPath backends.
public static final String DP_IPV6_PREFIX = "2001:4860:8040";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class DatabaseNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause) {
this(token, message, resourceInfo, cause, null, null);
this(token, message, resourceInfo, cause, null);
}

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
Expand All @@ -44,8 +44,7 @@ public class DatabaseNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause,
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, message, resourceInfo, cause, apiException, reqId);
@Nullable ApiException apiException) {
super(token, message, resourceInfo, cause, apiException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InstanceNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause) {
this(token, message, resourceInfo, cause, null, null);
this(token, message, resourceInfo, cause, null);
}

/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
Expand All @@ -44,8 +44,7 @@ public class InstanceNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause,
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, message, resourceInfo, cause, apiException, reqId);
@Nullable ApiException apiException) {
super(token, message, resourceInfo, cause, apiException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ public class MissingDefaultSequenceKindException extends SpannerException {
ErrorCode errorCode,
String message,
Throwable cause,
@Nullable ApiException apiException,
@Nullable XGoogSpannerRequestId reqId) {
super(token, errorCode, /* retryable= */ false, message, cause, apiException, reqId);
@Nullable ApiException apiException) {
super(token, errorCode, /* retryable= */ false, message, cause, apiException);
}

static boolean isMissingDefaultSequenceKindException(Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return createMultiplexedSessionTransaction(/* singleUse= */ true)
return createMultiplexedSessionTransaction(/* singleUse= */ false)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionedDML is not a single-use transaction. It invokes the BeginTransaction RPC and the ExecuteStreamingSql RPC. These two calls should use the same channel.

.executePartitionedUpdate(stmt, options);
}

Expand Down
Loading