diff --git a/Cargo.lock b/Cargo.lock index 6dab1900a0..23c1fa44e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15933,6 +15933,7 @@ dependencies = [ "ractor", "ractor-supervisor", "rodio", + "sentry", "serde", "serde_json", "specta", diff --git a/apps/api/src/listen.ts b/apps/api/src/listen.ts index e9a85cfd2d..921c5a809d 100644 --- a/apps/api/src/listen.ts +++ b/apps/api/src/listen.ts @@ -13,58 +13,82 @@ export const listenSocketHandler: Handler = async (c, next) => { const clientUrl = new URL(c.req.url, "http://localhost"); const provider = clientUrl.searchParams.get("provider") ?? "deepgram"; - let connection: WsProxyConnection; - try { - connection = createProxyFromRequest(clientUrl, c.req.raw.headers); - await connection.preconnectUpstream(); - Metrics.websocketConnected(provider); - } catch (error) { - Sentry.captureException(error, { - tags: { provider, operation: "upstream_connect" }, - }); - const detail = - error instanceof Error ? error.message : "upstream_connect_failed"; - const status = detail === "upstream_connect_timeout" ? 504 : 502; - return c.json({ error: "upstream_connect_failed", detail }, status); - } + const sentryTrace = c.req.header("sentry-trace"); + const baggage = c.req.header("baggage"); - const connectionStartTime = performance.now(); + return Sentry.continueTrace({ sentryTrace, baggage }, () => { + return Sentry.startSpan( + { name: `WebSocket /listen ${provider}`, op: "websocket.server" }, + async () => { + let connection: WsProxyConnection; + try { + connection = createProxyFromRequest(clientUrl, c.req.raw.headers); + await connection.preconnectUpstream(); + Metrics.websocketConnected(provider); + } catch (error) { + Sentry.addBreadcrumb({ + category: "websocket", + message: "Upstream connection failed", + level: "error", + data: { provider, error: String(error) }, + }); + Sentry.captureException(error, { + tags: { provider, operation: "upstream_connect" }, + }); + const detail = + error instanceof Error ? error.message : "upstream_connect_failed"; + const status = detail === "upstream_connect_timeout" ? 504 : 502; + return c.json({ error: "upstream_connect_failed", detail }, status); + } - const handler = upgradeWebSocket(() => { - return { - onOpen(_event, ws) { - connection.initializeUpstream(ws.raw); - }, - async onMessage(event) { - const payload = await normalizeWsData(event.data); - if (!payload) { - return; + const connectionStartTime = performance.now(); + + const handler = upgradeWebSocket(() => { + return { + onOpen(_event, ws) { + connection.initializeUpstream(ws.raw); + }, + async onMessage(event) { + const payload = await normalizeWsData(event.data); + if (!payload) { + return; + } + await connection.sendToUpstream(payload); + }, + onClose(event) { + const code = event?.code ?? 1000; + const reason = event?.reason || "client_closed"; + connection.closeConnections(code, reason); + Metrics.websocketDisconnected( + provider, + performance.now() - connectionStartTime, + ); + }, + onError(event) { + Sentry.addBreadcrumb({ + category: "websocket", + message: "Client WebSocket error", + level: "error", + data: { provider }, + }); + Sentry.captureException( + event instanceof Error + ? event + : new Error("websocket_client_error"), + { tags: { provider, operation: "websocket" } }, + ); + connection.closeConnections(1011, "client_error"); + }, + }; + }); + + const response = await handler(c, next); + if (!response) { + connection.closeConnections(); + return c.json({ error: "upgrade_failed" }, 400); } - await connection.sendToUpstream(payload); + return response; }, - onClose(event) { - const code = event?.code ?? 1000; - const reason = event?.reason || "client_closed"; - connection.closeConnections(code, reason); - Metrics.websocketDisconnected( - provider, - performance.now() - connectionStartTime, - ); - }, - onError(event) { - Sentry.captureException( - event instanceof Error ? event : new Error("websocket_client_error"), - { tags: { provider, operation: "websocket" } }, - ); - connection.closeConnections(1011, "client_error"); - }, - }; + ); }); - - const response = await handler(c, next); - if (!response) { - connection.closeConnections(); - return c.json({ error: "upgrade_failed" }, 400); - } - return response; }; diff --git a/owhisper/owhisper-client/src/lib.rs b/owhisper/owhisper-client/src/lib.rs index 9cda052329..164bdb2b7c 100644 --- a/owhisper/owhisper-client/src/lib.rs +++ b/owhisper/owhisper-client/src/lib.rs @@ -23,15 +23,23 @@ pub struct ListenClientBuilder { api_base: Option, api_key: Option, params: Option, + trace_headers: Option, _marker: PhantomData, } +#[derive(Clone, Default)] +pub struct TraceHeaders { + pub sentry_trace: Option, + pub baggage: Option, +} + impl Default for ListenClientBuilder { fn default() -> Self { Self { api_base: None, api_key: None, params: None, + trace_headers: None, _marker: PhantomData, } } @@ -53,11 +61,17 @@ impl ListenClientBuilder { self } + pub fn trace_headers(mut self, headers: TraceHeaders) -> Self { + self.trace_headers = Some(headers); + self + } + pub fn adapter(self) -> ListenClientBuilder { ListenClientBuilder { api_base: self.api_base, api_key: self.api_key, params: self.params, + trace_headers: self.trace_headers, _marker: PhantomData, } } @@ -86,6 +100,15 @@ impl ListenClientBuilder { request = request.with_header(header_name, header_value); } + if let Some(ref trace_headers) = self.trace_headers { + if let Some(ref sentry_trace) = trace_headers.sentry_trace { + request = request.with_header("sentry-trace", sentry_trace.clone()); + } + if let Some(ref baggage) = trace_headers.baggage { + request = request.with_header("baggage", baggage.clone()); + } + } + request } diff --git a/plugins/listener/Cargo.toml b/plugins/listener/Cargo.toml index f393347b91..0684fc62ed 100644 --- a/plugins/listener/Cargo.toml +++ b/plugins/listener/Cargo.toml @@ -63,3 +63,5 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tokio-stream = { workspace = true } tokio-util = { workspace = true } tracing = { workspace = true } + +sentry = { workspace = true } diff --git a/plugins/listener/src/actors/listener.rs b/plugins/listener/src/actors/listener.rs index fd50f92b07..e6439fc2d0 100644 --- a/plugins/listener/src/actors/listener.rs +++ b/plugins/listener/src/actors/listener.rs @@ -6,7 +6,7 @@ use tokio::time::error::Elapsed; use owhisper_client::{ AdapterKind, ArgmaxAdapter, AssemblyAIAdapter, DeepgramAdapter, FinalizeHandle, - FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, + FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, TraceHeaders, }; use owhisper_interface::stream::{Extra, StreamResponse}; use owhisper_interface::{ControlMessage, MixedMessage}; @@ -270,6 +270,46 @@ fn build_extra(args: &ListenerArgs) -> (f64, Extra) { (session_offset_secs, extra) } +fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders { + let mut trace_headers = TraceHeaders::default(); + + sentry::with_scope( + |scope| { + scope.set_tag("session_id", &args.session_id); + scope.set_tag("stt_provider", provider_name); + scope.set_tag("channel_mode", format!("{:?}", args.mode)); + scope.set_tag("model", &args.model); + scope.set_tag( + "languages", + args.languages + .iter() + .map(|l| l.iso639().code()) + .collect::>() + .join(","), + ); + scope.set_tag("onboarding", args.onboarding.to_string()); + }, + || { + sentry::configure_scope(|scope| { + if let Some(span) = scope.get_span() { + trace_headers.sentry_trace = span + .iter_headers() + .find(|(k, _)| *k == "sentry-trace") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + trace_headers.baggage = span + .iter_headers() + .find(|(k, _)| *k == "baggage") + .map(|(_, v)| v.to_string()) + .filter(|s| !s.is_empty()); + } + }); + }, + ); + + trace_headers +} + async fn spawn_rx_task_single_with_adapter( args: ListenerArgs, myself: ActorRef, @@ -286,11 +326,23 @@ async fn spawn_rx_task_single_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(single)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_single(); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -302,12 +354,13 @@ async fn spawn_rx_task_single_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(single)" ); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(single)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(single)"); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res, @@ -345,11 +398,23 @@ async fn spawn_rx_task_dual_with_adapter( let (tx, rx) = tokio::sync::mpsc::channel::>(32); + let adapter = A::default(); + let trace_headers = build_trace_headers(&args, adapter.provider_name()); + + tracing::info!( + session_id = %args.session_id, + channel_mode = ?args.mode, + model = %args.model, + provider = adapter.provider_name(), + "listener_actor_starting(dual)" + ); + let client = owhisper_client::ListenClient::builder() .adapter::() .api_base(args.base_url.clone()) .api_key(args.api_key.clone()) .params(build_listen_params(&args)) + .trace_headers(trace_headers) .build_dual(); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); @@ -361,12 +426,13 @@ async fn spawn_rx_task_dual_with_adapter( Err(_elapsed) => { tracing::error!( timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(), + session_id = %args.session_id, "listen_ws_connect_timeout(dual)" ); return Err(actor_error("listen_ws_connect_timeout")); } Ok(Err(e)) => { - tracing::error!(error = ?e, "listen_ws_connect_failed(dual)"); + tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(dual)"); return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e))); } Ok(Ok(res)) => res,