Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions crates/video-streamer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ pub fn webm_stream(
}
}

const MAX_RETRY_COUNT: usize = 3;
// To make sure we don't retry forever
// Retry is set to 0 when we successfully read a tag
// NOTE: MAX_RETRY_COUNT is intentionally set to 25. With the 3-second delay
// between retries, this yields a worst-case wait of 75 seconds before we
// give up on the current stream. This is acceptable in our live-streaming
// context because downstream components already enforce a stricter overall
// timeout, and tolerating longer temporary input stalls here reduces
// unnecessary reconnect churn on brief network or encoder hiccups.
// The counter resets to 0 on every successful tag read, so this only
// triggers on continuous EOF with zero progress.
const MAX_RETRY_COUNT: usize = 25;
let mut retry_count = 0;

let result = loop {
Expand Down Expand Up @@ -190,6 +196,10 @@ pub fn webm_stream(
},
_ = stop_notifier.notified() => {
let _ = tx.send(WhenEofControlFlow::Break);
},
_ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
trace!("EOF wait timed out, retrying");
let _ = tx.send(WhenEofControlFlow::Continue);
}
}
});
Expand Down
34 changes: 28 additions & 6 deletions crates/video-streamer/src/streamer/tag_writers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Instant;
use std::time::{Duration, Instant};

use anyhow::Context;
use cadeau::xmf::vpx::{VpxCodec, VpxDecoder, VpxEncoder};
Expand All @@ -13,8 +13,15 @@ use crate::debug::mastroka_spec_name;

const VPX_EFLAG_FORCE_KF: u32 = 0x00000001;

/// Maximum number of consecutive frames that can be skipped when adaptive frame
/// skipping is active. Controls the trade-off between latency and frame delivery:
/// - 1 = skip-encode alternating pattern, 50% max information loss (default)
/// - 2 = up to 2 skips then 1 encode, 67% max loss
/// - 0 = never skip (disables adaptive skipping effectively)
const MAX_CONSECUTIVE_FRAME_SKIPS: u32 = 1;

#[cfg(feature = "perf-diagnostics")]
fn duration_as_millis_u64(duration: std::time::Duration) -> u64 {
fn duration_as_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
}

Expand Down Expand Up @@ -105,7 +112,9 @@ where
last_encoded_abs_time: Option<u64>,

// Adaptive frame skipping state
#[cfg(feature = "perf-diagnostics")]
stream_start: Instant,
processing_time: Duration,
last_ratio: f64,
frames_since_last_encode: u32,
adaptive_frame_skip: bool,
Expand Down Expand Up @@ -192,7 +201,9 @@ where
decoder,
cut_block_state: CutBlockState::HaventMet,
last_encoded_abs_time: None,
#[cfg(feature = "perf-diagnostics")]
stream_start: Instant::now(),
processing_time: Duration::ZERO,
last_ratio: 1.0,
frames_since_last_encode: 0,
adaptive_frame_skip: config.adaptive_frame_skip,
Expand Down Expand Up @@ -263,7 +274,9 @@ where
"VideoBlock created"
);

let processing_started = Instant::now();
self.process_current_block(&video_block)?;
self.processing_time += processing_started.elapsed();

Ok(WriterResult::Continue)
}
Expand Down Expand Up @@ -369,21 +382,30 @@ where
Ok(frame)
}

/// Calculates the ratio of media time advanced to active processing time.
/// This uses `processing_time`, which is intended to exclude idle waits (for example,
/// time spent waiting for file growth during EOF retries), so that temporary stalls
/// do not permanently corrupt the frame skip decision.
fn current_realtime_ratio(&self, media_advanced_ms: u64) -> f64 {
#[allow(clippy::cast_possible_truncation)] // u64 max is ~584 million years in ms; no real truncation risk
let wall_ms = self.stream_start.elapsed().as_millis() as u64;
if wall_ms == 0 {
let processing_ms = self.processing_time.as_millis() as u64;
if processing_ms == 0 {
1.0
} else {
media_advanced_ms as f64 / wall_ms as f64
media_advanced_ms as f64 / processing_ms as f64
}
}

fn should_skip_encode(&self) -> bool {
// Skip encoding when falling behind real-time. The ratio naturally self-regulates:
// skipping makes processing faster (decode-only), which pushes ratio back above 1.0,
// which resumes encoding. This bang-bang control keeps the stream near real-time.
self.adaptive_frame_skip && self.last_ratio < 1.0
//
// Allow at most MAX_CONSECUTIVE_FRAME_SKIPS consecutive skips to cap information loss.
// With the default of 1, the pattern is skip-encode-skip-encode (50% max loss).
// Accepts growing delay over forced frame drops — preferable for shadow sessions
// where a few seconds of latency is tolerable but frozen frames are not.
self.adaptive_frame_skip && self.last_ratio < 1.0 && self.frames_since_last_encode < MAX_CONSECUTIVE_FRAME_SKIPS
}

#[cfg(feature = "perf-diagnostics")]
Expand Down