Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

122 changes: 73 additions & 49 deletions apps/api/src/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,82 @@ export const listenSocketHandler: Handler = async (c, next) => {
const clientUrl = new URL(c.req.url, "http://localhost");
const provider = clientUrl.searchParams.get("provider") ?? "deepgram";

let connection: WsProxyConnection;
try {
connection = createProxyFromRequest(clientUrl, c.req.raw.headers);
await connection.preconnectUpstream();
Metrics.websocketConnected(provider);
} catch (error) {
Sentry.captureException(error, {
tags: { provider, operation: "upstream_connect" },
});
const detail =
error instanceof Error ? error.message : "upstream_connect_failed";
const status = detail === "upstream_connect_timeout" ? 504 : 502;
return c.json({ error: "upstream_connect_failed", detail }, status);
}
const sentryTrace = c.req.header("sentry-trace");
const baggage = c.req.header("baggage");

const connectionStartTime = performance.now();
return Sentry.continueTrace({ sentryTrace, baggage }, () => {
return Sentry.startSpan(
{ name: `WebSocket /listen ${provider}`, op: "websocket.server" },
async () => {
let connection: WsProxyConnection;
try {
connection = createProxyFromRequest(clientUrl, c.req.raw.headers);
await connection.preconnectUpstream();
Metrics.websocketConnected(provider);
} catch (error) {
Sentry.addBreadcrumb({
category: "websocket",
message: "Upstream connection failed",
level: "error",
data: { provider, error: String(error) },
});
Sentry.captureException(error, {
tags: { provider, operation: "upstream_connect" },
});
const detail =
error instanceof Error ? error.message : "upstream_connect_failed";
const status = detail === "upstream_connect_timeout" ? 504 : 502;
return c.json({ error: "upstream_connect_failed", detail }, status);
}
Comment on lines +23 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: Potential metrics imbalance on upgrade failure.

Metrics.websocketConnected is called after upstream preconnection succeeds (line 27), but if the WebSocket upgrade fails later (lines 86-88), Metrics.websocketDisconnected is never called because onClose won't fire. This creates an imbalance where a connection is counted but the corresponding disconnection isn't.

Consider tracking whether metrics were recorded and calling Metrics.websocketDisconnected in the upgrade failure path:

         const response = await handler(c, next);
         if (!response) {
           connection.closeConnections();
+          Metrics.websocketDisconnected(provider, performance.now() - connectionStartTime);
           return c.json({ error: "upgrade_failed" }, 400);
         }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In apps/api/src/listen.ts around lines 23 to 42, Metrics.websocketConnected is
incremented after preconnectUpstream but may never be decremented if the
subsequent WebSocket upgrade fails; add a local boolean (e.g.,
connectedMetricRecorded = false) set to true immediately after calling
Metrics.websocketConnected, then in the upgrade failure/error path (the code
handling the upgrade failure around lines 86–88) check that flag and call
Metrics.websocketDisconnected before returning/closing so every successful
websocketConnected call always has a corresponding websocketDisconnected call.


const handler = upgradeWebSocket(() => {
return {
onOpen(_event, ws) {
connection.initializeUpstream(ws.raw);
},
async onMessage(event) {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
const connectionStartTime = performance.now();

const handler = upgradeWebSocket(() => {
return {
onOpen(_event, ws) {
connection.initializeUpstream(ws.raw);
},
async onMessage(event) {
const payload = await normalizeWsData(event.data);
if (!payload) {
return;
}
await connection.sendToUpstream(payload);
},
onClose(event) {
const code = event?.code ?? 1000;
const reason = event?.reason || "client_closed";
connection.closeConnections(code, reason);
Metrics.websocketDisconnected(
provider,
performance.now() - connectionStartTime,
);
},
onError(event) {
Sentry.addBreadcrumb({
category: "websocket",
message: "Client WebSocket error",
level: "error",
data: { provider },
});
Sentry.captureException(
event instanceof Error
? event
: new Error("websocket_client_error"),
{ tags: { provider, operation: "websocket" } },
);
connection.closeConnections(1011, "client_error");
},
};
});

const response = await handler(c, next);
if (!response) {
connection.closeConnections();
return c.json({ error: "upgrade_failed" }, 400);
}
await connection.sendToUpstream(payload);
return response;
},
onClose(event) {
const code = event?.code ?? 1000;
const reason = event?.reason || "client_closed";
connection.closeConnections(code, reason);
Metrics.websocketDisconnected(
provider,
performance.now() - connectionStartTime,
);
},
onError(event) {
Sentry.captureException(
event instanceof Error ? event : new Error("websocket_client_error"),
{ tags: { provider, operation: "websocket" } },
);
connection.closeConnections(1011, "client_error");
},
};
);
});

const response = await handler(c, next);
if (!response) {
connection.closeConnections();
return c.json({ error: "upgrade_failed" }, 400);
}
return response;
};
23 changes: 23 additions & 0 deletions owhisper/owhisper-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@ pub struct ListenClientBuilder<A: RealtimeSttAdapter = DeepgramAdapter> {
api_base: Option<String>,
api_key: Option<String>,
params: Option<owhisper_interface::ListenParams>,
trace_headers: Option<TraceHeaders>,
_marker: PhantomData<A>,
}

#[derive(Clone, Default)]
pub struct TraceHeaders {
pub sentry_trace: Option<String>,
pub baggage: Option<String>,
}

impl Default for ListenClientBuilder {
fn default() -> Self {
Self {
api_base: None,
api_key: None,
params: None,
trace_headers: None,
_marker: PhantomData,
}
}
Expand All @@ -53,11 +61,17 @@ impl<A: RealtimeSttAdapter> ListenClientBuilder<A> {
self
}

pub fn trace_headers(mut self, headers: TraceHeaders) -> Self {
self.trace_headers = Some(headers);
self
}

pub fn adapter<B: RealtimeSttAdapter>(self) -> ListenClientBuilder<B> {
ListenClientBuilder {
api_base: self.api_base,
api_key: self.api_key,
params: self.params,
trace_headers: self.trace_headers,
_marker: PhantomData,
}
}
Expand Down Expand Up @@ -86,6 +100,15 @@ impl<A: RealtimeSttAdapter> ListenClientBuilder<A> {
request = request.with_header(header_name, header_value);
}

if let Some(ref trace_headers) = self.trace_headers {
if let Some(ref sentry_trace) = trace_headers.sentry_trace {
request = request.with_header("sentry-trace", sentry_trace.clone());
}
if let Some(ref baggage) = trace_headers.baggage {
request = request.with_header("baggage", baggage.clone());
}
}

request
}

Expand Down
2 changes: 2 additions & 0 deletions plugins/listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

sentry = { workspace = true }
72 changes: 69 additions & 3 deletions plugins/listener/src/actors/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::time::error::Elapsed;

use owhisper_client::{
AdapterKind, ArgmaxAdapter, AssemblyAIAdapter, DeepgramAdapter, FinalizeHandle,
FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter,
FireworksAdapter, OpenAIAdapter, RealtimeSttAdapter, SonioxAdapter, TraceHeaders,
};
use owhisper_interface::stream::{Extra, StreamResponse};
use owhisper_interface::{ControlMessage, MixedMessage};
Expand Down Expand Up @@ -270,6 +270,46 @@ fn build_extra(args: &ListenerArgs) -> (f64, Extra) {
(session_offset_secs, extra)
}

fn build_trace_headers(args: &ListenerArgs, provider_name: &str) -> TraceHeaders {
let mut trace_headers = TraceHeaders::default();

sentry::with_scope(
|scope| {
scope.set_tag("session_id", &args.session_id);
scope.set_tag("stt_provider", provider_name);
scope.set_tag("channel_mode", format!("{:?}", args.mode));
scope.set_tag("model", &args.model);
scope.set_tag(
"languages",
args.languages
.iter()
.map(|l| l.iso639().code())
.collect::<Vec<_>>()
.join(","),
);
scope.set_tag("onboarding", args.onboarding.to_string());
},
|| {
sentry::configure_scope(|scope| {
if let Some(span) = scope.get_span() {
trace_headers.sentry_trace = span
.iter_headers()
.find(|(k, _)| *k == "sentry-trace")
.map(|(_, v)| v.to_string())
.filter(|s| !s.is_empty());
trace_headers.baggage = span
.iter_headers()
.find(|(k, _)| *k == "baggage")
.map(|(_, v)| v.to_string())
.filter(|s| !s.is_empty());
}
});
},
);

trace_headers
}

async fn spawn_rx_task_single_with_adapter<A: RealtimeSttAdapter>(
args: ListenerArgs,
myself: ActorRef<ListenerMsg>,
Expand All @@ -286,11 +326,23 @@ async fn spawn_rx_task_single_with_adapter<A: RealtimeSttAdapter>(

let (tx, rx) = tokio::sync::mpsc::channel::<MixedMessage<Bytes, ControlMessage>>(32);

let adapter = A::default();
let trace_headers = build_trace_headers(&args, adapter.provider_name());

tracing::info!(
session_id = %args.session_id,
channel_mode = ?args.mode,
model = %args.model,
provider = adapter.provider_name(),
"listener_actor_starting(single)"
);

let client = owhisper_client::ListenClient::builder()
.adapter::<A>()
.api_base(args.base_url.clone())
.api_key(args.api_key.clone())
.params(build_listen_params(&args))
.trace_headers(trace_headers)
.build_single();

let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
Expand All @@ -302,12 +354,13 @@ async fn spawn_rx_task_single_with_adapter<A: RealtimeSttAdapter>(
Err(_elapsed) => {
tracing::error!(
timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(),
session_id = %args.session_id,
"listen_ws_connect_timeout(single)"
);
return Err(actor_error("listen_ws_connect_timeout"));
}
Ok(Err(e)) => {
tracing::error!(error = ?e, "listen_ws_connect_failed(single)");
tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(single)");
return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e)));
}
Ok(Ok(res)) => res,
Expand Down Expand Up @@ -345,11 +398,23 @@ async fn spawn_rx_task_dual_with_adapter<A: RealtimeSttAdapter>(

let (tx, rx) = tokio::sync::mpsc::channel::<MixedMessage<(Bytes, Bytes), ControlMessage>>(32);

let adapter = A::default();
let trace_headers = build_trace_headers(&args, adapter.provider_name());

tracing::info!(
session_id = %args.session_id,
channel_mode = ?args.mode,
model = %args.model,
provider = adapter.provider_name(),
"listener_actor_starting(dual)"
);

let client = owhisper_client::ListenClient::builder()
.adapter::<A>()
.api_base(args.base_url.clone())
.api_key(args.api_key.clone())
.params(build_listen_params(&args))
.trace_headers(trace_headers)
.build_dual();

let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
Expand All @@ -361,12 +426,13 @@ async fn spawn_rx_task_dual_with_adapter<A: RealtimeSttAdapter>(
Err(_elapsed) => {
tracing::error!(
timeout_secs = LISTEN_CONNECT_TIMEOUT.as_secs_f32(),
session_id = %args.session_id,
"listen_ws_connect_timeout(dual)"
);
return Err(actor_error("listen_ws_connect_timeout"));
}
Ok(Err(e)) => {
tracing::error!(error = ?e, "listen_ws_connect_failed(dual)");
tracing::error!(error = ?e, session_id = %args.session_id, "listen_ws_connect_failed(dual)");
return Err(actor_error(format!("listen_ws_connect_failed: {:?}", e)));
}
Ok(Ok(res)) => res,
Expand Down