diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b05884305dc..bf03354f806 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -159,14 +159,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(); - if (baggage == null) { - return Context.current(); - } - return Context.current().with(baggage); - } - private static final class ClientTracer extends ClientStreamTracer { @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; @@ -282,7 +274,6 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -308,15 +299,15 @@ void recordFinishedAttempt() { if (module.resource.clientAttemptDurationCounter() != null ) { module.resource.clientAttemptDurationCounter() - .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext); + .record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext); } if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) { module.resource.clientTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attribute, otelContext); + .record(outboundWireSize, attribute, attemptsState.otelContext); } if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.clientTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attribute, otelContext); + .record(inboundWireSize, attribute, attemptsState.otelContext); } } } @@ -331,6 +322,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory private boolean callEnded; private final String fullMethodName; private final List callPlugins; + private final Context otelContext; private Status status; private long retryDelayNanos; private long callLatencyNanos; @@ -347,11 +339,12 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory OpenTelemetryMetricsModule module, String target, String fullMethodName, - List callPlugins) { + List callPlugins, Context otelContext) { this.module = checkNotNull(module, "module"); this.target = checkNotNull(target, "target"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.callPlugins = checkNotNull(callPlugins, "callPlugins"); + this.otelContext = checkNotNull(otelContext, "otelContext"); this.attemptDelayStopwatch = module.stopwatchSupplier.get(); this.callStopWatch = module.stopwatchSupplier.get().start(); @@ -361,7 +354,7 @@ static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory // Record here in case mewClientStreamTracer() would never be called. if (module.resource.clientAttemptCountCounter() != null) { - module.resource.clientAttemptCountCounter().add(1, attribute); + module.resource.clientAttemptCountCounter().add(1, attribute, otelContext); } } @@ -385,7 +378,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metada io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName, TARGET_KEY, target); if (module.resource.clientAttemptCountCounter() != null) { - module.resource.clientAttemptCountCounter().add(1, attribute); + module.resource.clientAttemptCountCounter().add(1, attribute, otelContext); } } if (info.isTransparentRetry()) { @@ -448,7 +441,6 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -548,6 +540,7 @@ private static final class ServerTracer extends ServerStreamTracer { private final OpenTelemetryMetricsModule module; private final String fullMethodName; private final List streamPlugins; + private Context otelContext = Context.root(); private volatile boolean isGeneratedMethod; private volatile int streamClosed; private final Stopwatch stopwatch; @@ -562,6 +555,17 @@ private static final class ServerTracer extends ServerStreamTracer { this.stopwatch = module.stopwatchSupplier.get().start(); } + @Override + public io.grpc.Context filterContext(io.grpc.Context context) { + Baggage baggage = BAGGAGE_KEY.get(context); + if (baggage != null) { + otelContext = Context.current().with(baggage); + } else { + otelContext = Context.current(); + } + return context; + } + @Override public void serverCallStarted(ServerCallInfo callInfo) { // Only record method name as an attribute if isSampledToLocalTracing is set to true, @@ -569,12 +573,13 @@ public void serverCallStarted(ServerCallInfo callInfo) { // created methods result in high cardinality metrics. boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing(); isGeneratedMethod = isSampledToLocalTracing; + io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of( METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); if (module.resource.serverCallCountCounter() != null) { - module.resource.serverCallCountCounter().add(1, attribute); + module.resource.serverCallCountCounter().add(1, attribute, otelContext); } } @@ -606,7 +611,6 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { - Context otelContext = otelContextWithBaggage(); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -657,7 +661,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, + streamPlugins); } } @@ -694,7 +699,7 @@ public ClientCall interceptCall( final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory( OpenTelemetryMetricsModule.this, target, recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()), - callPlugins); + callPlugins, Context.current()); ClientCall call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall(call) { @@ -717,3 +722,4 @@ public void onClose(Status status, Metadata trailers) { } } } + diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 391f94cefea..93f74a53ed0 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; @@ -48,23 +47,15 @@ import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; -import io.grpc.stub.MetadataUtils; -import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; -import io.grpc.testing.protobuf.SimpleRequest; -import io.grpc.testing.protobuf.SimpleResponse; -import io.grpc.testing.protobuf.SimpleServiceGrpc; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -174,14 +165,9 @@ public String parse(InputStream stream) { private ServerCall.Listener mockServerCallListener; @Captor private ArgumentCaptor statusCaptor; - @Mock - private DoubleHistogram mockServerCallDurationHistogram; - @Captor - private ArgumentCaptor contextCaptor; + private io.grpc.Server server; private io.grpc.ManagedChannel channel; - private OpenTelemetryMetricsResource resource; - private final String serverName = "E2ETestServer-" + Math.random(); private final FakeClock fakeClock = new FakeClock(); private final MethodDescriptor method = @@ -201,9 +187,7 @@ public String parse(InputStream stream) { public void setUp() throws Exception { testMeter = openTelemetryTesting.getOpenTelemetry() .getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE); - resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(mockServerCallDurationHistogram) - .build(); + } @After @@ -278,7 +262,8 @@ public void clientBasicMetrics() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); Metadata headers = new Metadata(); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, headers); @@ -445,7 +430,8 @@ public void clientBasicMetrics_withRetryMetricsEnabled_shouldRecordZeroOrBeAbsen enabledMetrics, disableDefaultMetrics); OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -513,7 +499,7 @@ public void recordAttemptMetrics() { OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, - method.getFullMethodName(), emptyList()); + method.getFullMethodName(), emptyList(), Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -970,7 +956,7 @@ public void recordAttemptMetrics_withRetryMetricsEnabled() { OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, - method.getFullMethodName(), emptyList()); + method.getFullMethodName(), emptyList(), Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1058,7 +1044,7 @@ public void recordAttemptMetrics_withHedgedCalls() { OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, - method.getFullMethodName(), emptyList()); + method.getFullMethodName(), emptyList(), Context.root()); // Create a StreamInfo specifically for hedged attempts final ClientStreamTracer.StreamInfo hedgedStreamInfo = @@ -1139,7 +1125,7 @@ public void clientStreamNeverCreatedStillRecordMetrics() { OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new OpenTelemetryMetricsModule.CallAttemptsTracerFactory(module, target, - method.getFullMethodName(), emptyList()); + method.getFullMethodName(), emptyList(), Context.root()); fakeClock.forwardTime(3000, MILLISECONDS); Status status = Status.DEADLINE_EXCEEDED.withDescription("5 seconds"); callAttemptsTracerFactory.callEnded(status); @@ -1245,9 +1231,11 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1313,9 +1301,11 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1380,7 +1370,8 @@ public void clientBackendServiceMetrics_present() { fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1449,7 +1440,8 @@ public void clientBackendServiceMetrics_missing() { fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = - new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); + new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList(), + Context.root()); ClientStreamTracer tracer = callAttemptsTracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata()); @@ -1629,44 +1621,6 @@ public void serverBasicMetrics() { } - @Test - public void serverBaggagePropagationToMetrics() { - // 1. Create module and tracer factory using the mock resource - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); - ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - ServerStreamTracer tracer = - tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); - - // 2. Define the test baggage and gRPC context - Baggage testBaggage = Baggage.builder() - .put("user-id", "67") - .build(); - - // This simulates the context that the Tracing module would have created - io.grpc.Context grpcContext = io.grpc.Context.current() - .withValue(OpenTelemetryConstants.BAGGAGE_KEY, testBaggage); - - // 3. Attach the gRPC context, trigger metric recording, and detach - io.grpc.Context previousContext = grpcContext.attach(); - try { - tracer.streamClosed(Status.OK); - } finally { - grpcContext.detach(previousContext); - } - - // 4. Verify the record call and capture the OTel Context - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - // 5. Assert on the captured OTel Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); - - assertEquals("67", capturedBaggage.getEntryValue("user-id")); - } @Test public void targetAttributeFilter_notSet_usesOriginalTarget() { @@ -1808,7 +1762,8 @@ private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + filter); } static class CallInfo extends ServerCallInfo { @@ -1843,62 +1798,135 @@ public String getAuthority() { } @Test - public void serverBaggagePropagation_EndToEnd() throws Exception { - // 1. Create Both Modules - OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); - OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); - OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); - - // 2. Create Server with *both* tracer factories - server = InProcessServerBuilder.forName(serverName) - .addService(new SimpleServiceImpl()) // <-- Uses the helper class below - .addStreamTracerFactory(tracingModule.getServerTracerFactory()) - .addStreamTracerFactory(metricsModule.getServerTracerFactory()) - .build() - .start(); + public void serverMetrics_recordsBaggage() { + io.opentelemetry.api.metrics.DoubleHistogram mockDurationHistogram = org.mockito.Mockito + .mock(io.opentelemetry.api.metrics.DoubleHistogram.class); + OpenTelemetryMetricsResource mockResource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(mockDurationHistogram) + .build(); - // 3. Create Client Channel - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(mockResource); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - // 4. Manually create baggage headers - Metadata headers = new Metadata(); - headers.put(Metadata.Key.of("baggage", Metadata.ASCII_STRING_MARSHALLER), - "choice=red_pill_or_blue_pill"); + Baggage baggage = Baggage.builder() + .put("baggage-key-1", "baggage-val-1") + .build(); - // 5. Make the gRPC call with these headers - ClientInterceptor headerAttachingInterceptor = - MetadataUtils.newAttachHeadersInterceptor(headers); + io.grpc.Context grpcContext = io.grpc.Context.ROOT + .withValue(OpenTelemetryConstants.BAGGAGE_KEY, baggage); + io.grpc.Context previous = grpcContext.attach(); - // Now, create the stub and apply that interceptor - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub(channel) - .withInterceptors(headerAttachingInterceptor); + ServerStreamTracer tracer; + try { + tracer = tracerFactory.newServerStreamTracer( + method.getFullMethodName(), new Metadata()); + tracer.filterContext(grpcContext); + tracer.serverCallStarted( + new CallInfo<>(method, Attributes.EMPTY, null)); + } finally { + grpcContext.detach(previous); + } - // Use the imported SimpleRequest - stub.unaryRpc(SimpleRequest.getDefaultInstance()); + try (io.opentelemetry.context.Scope scope = Context.root().makeCurrent()) { + tracer.streamClosed(Status.CANCELLED); + } - // 6. Verify the Mock - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), + org.mockito.ArgumentCaptor contextCaptor = org.mockito.ArgumentCaptor + .forClass(Context.class); + org.mockito.Mockito.verify(mockDurationHistogram).record( + org.mockito.ArgumentMatchers.anyDouble(), + org.mockito.ArgumentMatchers.any(), contextCaptor.capture()); - // 7. Assert on the captured Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); - - assertEquals("red_pill_or_blue_pill", capturedBaggage.getEntryValue("choice")); + Baggage capturedBaggage = Baggage.fromContext(contextCaptor.getValue()); + org.junit.Assert.assertNotNull("Captured context should have baggage", capturedBaggage); + org.junit.Assert.assertEquals( + "baggage-val-1", capturedBaggage.getEntryValue("baggage-key-1")); } - /** - * A simple service implementation for the E2E test. - */ - private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { - @Override - public void unaryRpc(SimpleRequest request, StreamObserver responseObserver) { - responseObserver.onNext(SimpleResponse.getDefaultInstance()); - responseObserver.onCompleted(); + @Test + public void serverMetrics_recordsBaggage_endToEnd() throws Exception { + io.opentelemetry.api.metrics.DoubleHistogram mockDurationHistogram = org.mockito.Mockito + .mock(io.opentelemetry.api.metrics.DoubleHistogram.class); + OpenTelemetryMetricsResource mockResource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(mockDurationHistogram) + .build(); + + io.opentelemetry.api.OpenTelemetry openTelemetry = io.opentelemetry.sdk.OpenTelemetrySdk + .builder() + .setPropagators(io.opentelemetry.context.propagation.ContextPropagators.create( + io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator.getInstance())) + .build(); + + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(mockResource); + OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(openTelemetry); + + String serverName = io.grpc.inprocess.InProcessServerBuilder.generateName(); + io.grpc.inprocess.InProcessServerBuilder serverBuilder = io.grpc.inprocess + .InProcessServerBuilder + .forName(serverName).directExecutor(); + + serverBuilder.addStreamTracerFactory(tracingModule.getServerTracerFactory()); + serverBuilder.intercept(tracingModule.getServerSpanPropagationInterceptor()); + serverBuilder.addStreamTracerFactory(module.getServerTracerFactory()); + + serverBuilder.addService(ServerServiceDefinition.builder( + io.grpc.ServiceDescriptor.newBuilder("package1.service2") + .addMethod(method) + .build()) + .addMethod(method, new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + call.sendHeaders(new Metadata()); + call.sendMessage("response"); + call.close(Status.OK, new Metadata()); + return new ServerCall.Listener() { + }; + } + }).build()); + io.grpc.Server server = serverBuilder.build().start(); + + io.grpc.inprocess.InProcessChannelBuilder channelBuilder = io.grpc.inprocess + .InProcessChannelBuilder + .forName(serverName).directExecutor(); + channelBuilder.intercept(tracingModule.getClientInterceptor()); + channelBuilder.intercept(module.getClientInterceptor(serverName)); + Channel channel = channelBuilder.intercept(new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions); + } + }).build(); + + Baggage baggage = Baggage.builder() + .put("baggage-key-1", "baggage-val-1") + .build(); + + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.root() + .with(baggage); + io.opentelemetry.context.Scope scope = otelContext.makeCurrent(); + + try { + io.grpc.stub.ClientCalls.blockingUnaryCall(channel, + method, CallOptions.DEFAULT, "request"); + } finally { + scope.close(); } + + server.shutdown().awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS); + + org.mockito.ArgumentCaptor contextCaptor = org.mockito.ArgumentCaptor + .forClass(Context.class); + org.mockito.Mockito.verify(mockDurationHistogram).record( + org.mockito.ArgumentMatchers.anyDouble(), + org.mockito.ArgumentMatchers.any(), + contextCaptor.capture()); + + Baggage capturedBaggage = Baggage.fromContext(contextCaptor.getValue()); + org.junit.Assert.assertNotNull("Captured context should have baggage", capturedBaggage); + org.junit.Assert.assertEquals( + "baggage-val-1", capturedBaggage.getEntryValue("baggage-key-1")); } }