|
8 | 8 |
|
9 | 9 | import anyio |
10 | 10 | from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
| 11 | +from opentelemetry.propagate import inject |
11 | 12 | from pydantic import BaseModel, TypeAdapter |
12 | 13 | from typing_extensions import Self |
13 | 14 |
|
@@ -263,6 +264,9 @@ async def send_request( |
263 | 264 | # Store the callback for this request |
264 | 265 | self._progress_callbacks[request_id] = progress_callback |
265 | 266 |
|
| 267 | + # Propagate opentelemetry trace context |
| 268 | + self._inject_otel_context(request_data) |
| 269 | + |
266 | 270 | try: |
267 | 271 | jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) |
268 | 272 | await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata)) |
@@ -295,18 +299,37 @@ async def send_notification( |
295 | 299 | related_request_id: RequestId | None = None, |
296 | 300 | ) -> None: |
297 | 301 | """Emits a notification, which is a one-way message that does not expect a response.""" |
| 302 | + |
| 303 | + request_data = notification.model_dump(by_alias=True, mode="json", exclude_none=True) |
| 304 | + # Propagate opentelemetry trace context |
| 305 | + self._inject_otel_context(request_data) |
| 306 | + jsonrpc_notification = JSONRPCNotification(jsonrpc="2.0", **request_data) |
| 307 | + |
298 | 308 | # Some transport implementations may need to set the related_request_id |
299 | 309 | # to attribute to the notifications to the request that triggered them. |
300 | | - jsonrpc_notification = JSONRPCNotification( |
301 | | - jsonrpc="2.0", |
302 | | - **notification.model_dump(by_alias=True, mode="json", exclude_none=True), |
303 | | - ) |
304 | 310 | session_message = SessionMessage( |
305 | 311 | message=jsonrpc_notification, |
306 | 312 | metadata=ServerMessageMetadata(related_request_id=related_request_id) if related_request_id else None, |
307 | 313 | ) |
308 | 314 | await self._write_stream.send(session_message) |
309 | 315 |
|
| 316 | + def _inject_otel_context(self, request: dict[str, Any]) -> None: |
| 317 | + """Propagate OpenTelemetry context in `_meta`. |
| 318 | +
|
| 319 | + See |
| 320 | + - SEP414 https://github.com/modelcontextprotocol/modelcontextprotocol/pull/414 |
| 321 | + - OpenTelemetry semantic conventions |
| 322 | + https://github.com/open-telemetry/semantic-conventions/blob/v1.39.0/docs/gen-ai/mcp.md |
| 323 | + """ |
| 324 | + |
| 325 | + carrier: dict[str, str] = {} |
| 326 | + inject(carrier) |
| 327 | + if not carrier: |
| 328 | + return |
| 329 | + |
| 330 | + meta: dict[str, Any] = request.setdefault("params", {}).setdefault("_meta", {}) |
| 331 | + meta.update(carrier) |
| 332 | + |
310 | 333 | async def _send_response(self, request_id: RequestId, response: SendResultT | ErrorData) -> None: |
311 | 334 | if isinstance(response, ErrorData): |
312 | 335 | jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response) |
|
0 commit comments