From 1720e31b87cdb1fba5e0ee81f5407e9874c5b8bb Mon Sep 17 00:00:00 2001 From: Dawei Date: Tue, 9 Jun 2026 16:40:01 -0700 Subject: [PATCH] Add River RPC span classification attributes --- tracing/index.ts | 42 +++++++++++++++++++------- tracing/tracing.test.ts | 65 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 11 deletions(-) diff --git a/tracing/index.ts b/tracing/index.ts index 273d5fd0..504748f4 100644 --- a/tracing/index.ts +++ b/tracing/index.ts @@ -25,6 +25,26 @@ export interface TelemetryInfo { ctx: Context; } +function createProcedureAttributes( + kind: ValidProcType, + serviceName: string, + procedureName: string, + streamId: string, +) { + return { + component: 'river', + 'rpc.system': 'river', + 'rpc.service': serviceName, + 'rpc.method': procedureName, + 'river.method.kind': kind, + 'river.method.service': serviceName, + 'river.method.name': procedureName, + 'river.rpc.kind': kind, + 'river.rpc.streaming': kind !== 'rpc', + 'river.streamId': streamId, + }; +} + export function getPropagationContext( ctx: Context, ): PropagationContext | undefined { @@ -101,11 +121,12 @@ export function createProcTelemetryInfo( `river.client.${serviceName}.${procedureName}`, { attributes: { - component: 'river', - 'river.method.kind': kind, - 'river.method.service': serviceName, - 'river.method.name': procedureName, - 'river.streamId': streamId, + ...createProcedureAttributes( + kind, + serviceName, + procedureName, + streamId, + ), 'span.kind': 'client', }, links: [{ context: session.telemetry.span.spanContext() }], @@ -153,11 +174,12 @@ export function createHandlerSpan unknown>( `river.server.${serviceName}.${procedureName}`, { attributes: { - component: 'river', - 'river.method.kind': kind, - 'river.method.service': serviceName, - 'river.method.name': procedureName, - 'river.streamId': streamId, + ...createProcedureAttributes( + kind, + serviceName, + procedureName, + streamId, + ), 'span.kind': 'server', }, links: [{ context: session.telemetry.span.spanContext() }], diff --git a/tracing/tracing.test.ts b/tracing/tracing.test.ts index af9a1037..f12d144f 100644 --- a/tracing/tracing.test.ts +++ b/tracing/tracing.test.ts @@ -15,7 +15,12 @@ import { } from '@opentelemetry/sdk-trace-base'; import { W3CTraceContextPropagator } from '@opentelemetry/core'; import { AsyncHooksContextManager } from '@opentelemetry/context-async-hooks'; -import { createSessionTelemetryInfo, getPropagationContext } from './index'; +import { + createHandlerSpan, + createProcTelemetryInfo, + createSessionTelemetryInfo, + getPropagationContext, +} from './index'; import { testMatrix } from '../testUtil/fixtures/matrix'; import { cleanupTransports, @@ -65,6 +70,64 @@ describe('Basic tracing tests', () => { ) as Span, ).toBeTruthy(); }); + + test('procedure spans include rpc classification attributes', () => { + spanExporter.reset(); + const tracer = trace.getTracer('test'); + const session = dummySession(); + + const { span: clientSpan } = createProcTelemetryInfo( + tracer, + session, + 'subscription', + 'example', + 'watch', + 'client-stream-id', + ); + clientSpan.end(); + + createHandlerSpan( + tracer, + session, + 'stream', + 'example', + 'chat', + 'server-stream-id', + undefined, + (span) => { + span.end(); + }, + ); + + const spans = spanExporter.getFinishedSpans(); + const clientProcSpan = spans.find( + (span) => span.name === 'river.client.example.watch', + ); + const serverProcSpan = spans.find( + (span) => span.name === 'river.server.example.chat', + ); + + expect(clientProcSpan?.attributes).toMatchObject({ + 'rpc.system': 'river', + 'rpc.service': 'example', + 'rpc.method': 'watch', + 'river.method.kind': 'subscription', + 'river.rpc.kind': 'subscription', + 'river.rpc.streaming': true, + 'river.streamId': 'client-stream-id', + 'span.kind': 'client', + }); + expect(serverProcSpan?.attributes).toMatchObject({ + 'rpc.system': 'river', + 'rpc.service': 'example', + 'rpc.method': 'chat', + 'river.method.kind': 'stream', + 'river.rpc.kind': 'stream', + 'river.rpc.streaming': true, + 'river.streamId': 'server-stream-id', + 'span.kind': 'server', + }); + }); }); describe.each(testMatrix())(