diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index 3642270b4..a3af14737 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -99,9 +99,15 @@ pub fn webm_stream( } } - const MAX_RETRY_COUNT: usize = 3; - // To make sure we don't retry forever - // Retry is set to 0 when we successfully read a tag + // NOTE: MAX_RETRY_COUNT is intentionally set to 25. With the 3-second delay + // between retries, this yields a worst-case wait of 75 seconds before we + // give up on the current stream. This is acceptable in our live-streaming + // context because downstream components already enforce a stricter overall + // timeout, and tolerating longer temporary input stalls here reduces + // unnecessary reconnect churn on brief network or encoder hiccups. + // The counter resets to 0 on every successful tag read, so this only + // triggers on continuous EOF with zero progress. + const MAX_RETRY_COUNT: usize = 25; let mut retry_count = 0; let result = loop { @@ -190,6 +196,10 @@ pub fn webm_stream( }, _ = stop_notifier.notified() => { let _ = tx.send(WhenEofControlFlow::Break); + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { + trace!("EOF wait timed out, retrying"); + let _ = tx.send(WhenEofControlFlow::Continue); } } }); diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index a98001fe1..7cae2398f 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Context; use cadeau::xmf::vpx::{VpxCodec, VpxDecoder, VpxEncoder}; @@ -13,8 +13,15 @@ use crate::debug::mastroka_spec_name; const VPX_EFLAG_FORCE_KF: u32 = 0x00000001; +/// Maximum number of consecutive frames that can be skipped when adaptive frame +/// skipping is active. Controls the trade-off between latency and frame delivery: +/// - 1 = skip-encode alternating pattern, 50% max information loss (default) +/// - 2 = up to 2 skips then 1 encode, 67% max loss +/// - 0 = never skip (disables adaptive skipping effectively) +const MAX_CONSECUTIVE_FRAME_SKIPS: u32 = 1; + #[cfg(feature = "perf-diagnostics")] -fn duration_as_millis_u64(duration: std::time::Duration) -> u64 { +fn duration_as_millis_u64(duration: Duration) -> u64 { u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } @@ -105,7 +112,9 @@ where last_encoded_abs_time: Option, // Adaptive frame skipping state + #[cfg(feature = "perf-diagnostics")] stream_start: Instant, + processing_time: Duration, last_ratio: f64, frames_since_last_encode: u32, adaptive_frame_skip: bool, @@ -192,7 +201,9 @@ where decoder, cut_block_state: CutBlockState::HaventMet, last_encoded_abs_time: None, + #[cfg(feature = "perf-diagnostics")] stream_start: Instant::now(), + processing_time: Duration::ZERO, last_ratio: 1.0, frames_since_last_encode: 0, adaptive_frame_skip: config.adaptive_frame_skip, @@ -263,7 +274,9 @@ where "VideoBlock created" ); + let processing_started = Instant::now(); self.process_current_block(&video_block)?; + self.processing_time += processing_started.elapsed(); Ok(WriterResult::Continue) } @@ -369,13 +382,17 @@ where Ok(frame) } + /// Calculates the ratio of media time advanced to active processing time. + /// This uses `processing_time`, which is intended to exclude idle waits (for example, + /// time spent waiting for file growth during EOF retries), so that temporary stalls + /// do not permanently corrupt the frame skip decision. fn current_realtime_ratio(&self, media_advanced_ms: u64) -> f64 { #[allow(clippy::cast_possible_truncation)] // u64 max is ~584 million years in ms; no real truncation risk - let wall_ms = self.stream_start.elapsed().as_millis() as u64; - if wall_ms == 0 { + let processing_ms = self.processing_time.as_millis() as u64; + if processing_ms == 0 { 1.0 } else { - media_advanced_ms as f64 / wall_ms as f64 + media_advanced_ms as f64 / processing_ms as f64 } } @@ -383,7 +400,12 @@ where // Skip encoding when falling behind real-time. The ratio naturally self-regulates: // skipping makes processing faster (decode-only), which pushes ratio back above 1.0, // which resumes encoding. This bang-bang control keeps the stream near real-time. - self.adaptive_frame_skip && self.last_ratio < 1.0 + // + // Allow at most MAX_CONSECUTIVE_FRAME_SKIPS consecutive skips to cap information loss. + // With the default of 1, the pattern is skip-encode-skip-encode (50% max loss). + // Accepts growing delay over forced frame drops — preferable for shadow sessions + // where a few seconds of latency is tolerable but frozen frames are not. + self.adaptive_frame_skip && self.last_ratio < 1.0 && self.frames_since_last_encode < MAX_CONSECUTIVE_FRAME_SKIPS } #[cfg(feature = "perf-diagnostics")]