Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/receiver-ui/src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ async fn get_streams(state: State<'_, Arc<AppState>>) -> CmdResult<control_api::
Ok(control_api::get_streams(&state).await)
}

#[tauri::command]
async fn get_stream_metrics(
state: State<'_, Arc<AppState>>,
) -> CmdResult<Vec<receiver::ui_events::StreamMetricsPayload>> {
Ok(control_api::get_stream_metrics(&state).await)
}

#[tauri::command]
async fn put_earliest_epoch(
state: State<'_, Arc<AppState>>,
Expand Down Expand Up @@ -905,6 +912,7 @@ fn main() {
get_mode,
put_mode,
get_streams,
get_stream_metrics,
put_earliest_epoch,
get_races,
create_race,
Expand Down
4 changes: 4 additions & 0 deletions apps/receiver-ui/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ export async function getStreams(): Promise<StreamsResponse> {
return invoke<StreamsResponse>("get_streams");
}

export async function getStreamMetrics(): Promise<StreamMetrics[]> {
return invoke<StreamMetrics[]>("get_stream_metrics");
}

export async function putSubscriptions(
subscriptions: SubscriptionItem[],
): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions apps/receiver-ui/src/lib/components/ForwardersTab.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";

const apiMocks = vi.hoisted(() => ({
getForwarders: vi.fn(),
getStreamMetrics: vi.fn().mockResolvedValue([]),
}));

vi.mock("$lib/api", () => apiMocks);
Expand Down
1 change: 1 addition & 0 deletions apps/receiver-ui/src/lib/store-updater.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const apiMocks = vi.hoisted(() => ({
putDbfConfig: vi.fn().mockResolvedValue(undefined),
clearDbf: vi.fn().mockResolvedValue(undefined),
updateSubscriptionEventType: vi.fn().mockResolvedValue(undefined),
getStreamMetrics: vi.fn().mockResolvedValue([]),
}));

const desktopUpdaterMocks = vi.hoisted(() => ({
Expand Down
15 changes: 15 additions & 0 deletions apps/receiver-ui/src/lib/store.svelte.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ export async function loadAll(): Promise<void> {
nextMode,
nextRaces,
nextForwarders,
nextMetrics,
] = await Promise.all([
api.getStatus(),
api.getStreams(),
Expand All @@ -654,6 +655,13 @@ export async function loadAll(): Promise<void> {
ok: false as const,
error: String(error),
})),
api.getStreamMetrics().catch((e: unknown) => {
console.warn(
"getStreamMetrics failed, will rely on real-time updates:",
e,
);
return [] as api.StreamMetrics[];
}),
]);

await loadDbfConfig();
Expand All @@ -667,6 +675,13 @@ export async function loadAll(): Promise<void> {
);
void prefetchEarliestEpochOptions(nextStreams.streams);
}
if (nextMetrics.length > 0) {
const merged = new Map(store.streamMetrics);
for (const m of nextMetrics) {
merged.set(streamKey(m.forwarder_id, m.reader_ip), m);
}
store.streamMetrics = merged;
}
store.logEntries = nextLogs.entries;
if (nextRaces) {
const prevRaceId = store.raceIdDraft;
Expand Down
29 changes: 29 additions & 0 deletions apps/receiver-ui/src/test/+layout.svelte.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const apiMocks = vi.hoisted(() => ({
getDbfConfig: vi.fn().mockResolvedValue({ enabled: false, path: "" }),
putDbfConfig: vi.fn().mockResolvedValue(undefined),
clearDbf: vi.fn().mockResolvedValue(undefined),
getStreamMetrics: vi.fn().mockResolvedValue([]),
}));

vi.mock("$lib/api", () => apiMocks);
Expand Down Expand Up @@ -233,6 +234,34 @@ describe("receiver layout SSE updates", () => {
expect(store.forwarders).toBeNull();
});

it("merges cached stream metrics into store on initial load", async () => {
apiMocks.getStreamMetrics.mockResolvedValueOnce([
{
forwarder_id: "fwd-1",
reader_ip: "10.0.0.1:10000",
raw_count: 50,
dedup_count: 45,
retransmit_count: 5,
lag_ms: 100,
epoch_raw_count: 20,
epoch_dedup_count: 18,
epoch_retransmit_count: 2,
unique_chips: 10,
epoch_last_received_at: "2026-03-22T12:00:00Z",
epoch_lag_ms: 50,
},
]);

render(Layout);

await waitFor(() => {
const entry = store.streamMetrics.get("fwd-1/10.0.0.1:10000");
expect(entry).toBeTruthy();
expect(entry?.raw_count).toBe(50);
expect(entry?.unique_chips).toBe(10);
});
});

it("renders nested route content", async () => {
pageState.pathname = "/admin";
render(LayoutChildrenHarness);
Expand Down
61 changes: 61 additions & 0 deletions services/receiver/src/control_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct AppState {
pub upstream_url: Arc<RwLock<Option<String>>>,
pub ui_tx: broadcast::Sender<ReceiverUiEvent>,
pub stream_counts: crate::cache::StreamCounts,
pub stream_metrics_cache:
Arc<RwLock<HashMap<(String, String), crate::ui_events::StreamMetricsPayload>>>,
pub receiver_id: Arc<RwLock<String>>,
pub db_integrity_ok: bool,
pub http_client: reqwest::Client,
Expand Down Expand Up @@ -95,6 +97,7 @@ impl AppState {
upstream_url: Arc::new(RwLock::new(None)),
ui_tx,
stream_counts: crate::cache::StreamCounts::new(),
stream_metrics_cache: Arc::new(RwLock::new(HashMap::new())),
receiver_id: Arc::new(RwLock::new(receiver_id)),
db_integrity_ok,
http_client,
Expand All @@ -121,6 +124,27 @@ impl AppState {
self.dbf_config_version.subscribe()
}

pub async fn cache_stream_metrics(&self, payload: &crate::ui_events::StreamMetricsPayload) {
let key = (payload.forwarder_id.clone(), payload.reader_ip.clone());
self.stream_metrics_cache
.write()
.await
.insert(key, payload.clone());
}

pub async fn clear_stream_metrics_cache(&self) {
self.stream_metrics_cache.write().await.clear();
}

pub async fn get_stream_metrics_snapshot(&self) -> Vec<crate::ui_events::StreamMetricsPayload> {
self.stream_metrics_cache
.read()
.await
.values()
.cloned()
.collect()
}

pub fn request_disconnect_shutdown(&self) {
let _ = self.shutdown_tx.send(ShutdownSignal::Disconnect);
}
Expand Down Expand Up @@ -932,6 +956,10 @@ pub async fn get_streams(state: &AppState) -> StreamsResponse {
state.build_streams_response().await
}

pub async fn get_stream_metrics(state: &AppState) -> Vec<crate::ui_events::StreamMetricsPayload> {
state.get_stream_metrics_snapshot().await
}

// ---------------------------------------------------------------------------
// Race management (via WS proxy session)
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1752,6 +1780,7 @@ pub async fn disconnect(state: &AppState) {
if current == ConnectionState::Disconnected {
return;
}
state.clear_stream_metrics_cache().await;
state
.set_connection_state(ConnectionState::Disconnecting)
.await;
Expand Down Expand Up @@ -2199,6 +2228,38 @@ mod tests {
assert_eq!(*shutdown_rx.borrow(), ShutdownSignal::Terminate);
}

#[tokio::test]
async fn disconnect_clears_cached_stream_metrics() {
let db = Db::open_in_memory().unwrap();
let (state, _shutdown_rx) = AppState::new(db, "recv-test".to_owned());
state.set_connection_state(ConnectionState::Connected).await;

state
.cache_stream_metrics(&crate::ui_events::StreamMetricsPayload {
forwarder_id: "fwd-1".to_owned(),
reader_ip: "10.0.0.1:10000".to_owned(),
raw_count: 10,
dedup_count: 9,
retransmit_count: 1,
lag_ms: Some(250),
epoch_raw_count: 4,
epoch_dedup_count: 3,
epoch_retransmit_count: 1,
unique_chips: 2,
epoch_last_received_at: Some("2026-03-22T12:00:00Z".to_owned()),
epoch_lag_ms: Some(125),
})
.await;

disconnect(&state).await;

assert!(state.get_stream_metrics_snapshot().await.is_empty());
assert_eq!(
*state.connection_state.borrow(),
ConnectionState::Disconnecting
);
}

async fn run_test_server(router: Router) -> std::net::SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
Expand Down
Loading
Loading