Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions crates/cli/src/commands/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,13 @@ pub struct TestConfigArgs {
}

/// Lists available test case names for a given test category.
/// TODO: Fill with enums TestCases of each category
fn list_test_cases(category: TestCategory) -> Vec<String> {
// Returns available test case names for each category.
match category {
TestCategory::Validator => {
// From validator::supported_validator_test_cases()
vec![
"Ping".to_string(),
"PingMeasure".to_string(),
"PingLoad".to_string(),
]
}
TestCategory::Validator => validator::ValidatorTestCase::all()
.iter()
.map(|tc| tc.name().to_string())
.collect(),
TestCategory::Beacon => {
// TODO: Extract from beacon::supported_beacon_test_cases()
vec![]
Expand Down
285 changes: 276 additions & 9 deletions crates/cli/src/commands/test/validator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
//! Validator client connectivity tests.

use super::{TestCategoryResult, TestConfigArgs};
use crate::error::Result;
use clap::Args;
use std::{io::Write, time::Duration};

use clap::Args;
use rand::Rng;
use tokio::{
net::TcpStream,
sync::mpsc,
time::{Instant, timeout},
};

use super::{
AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict,
calculate_score, evaluate_highest_rtt, evaluate_rtt, publish_result_to_obol_api,
write_result_to_file, write_result_to_writer,
};
use crate::{duration::Duration as CliDuration, error::Result};

// Thresholds (from Go implementation)
const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240);
const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240);

/// Validator test cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ValidatorTestCase {
Ping,
PingMeasure,
PingLoad,
}

impl ValidatorTestCase {
/// Returns all validator test cases.
pub fn all() -> &'static [ValidatorTestCase] {
&[
ValidatorTestCase::Ping,
ValidatorTestCase::PingMeasure,
ValidatorTestCase::PingLoad,
]
}

/// Returns the test name as a string.
pub fn name(&self) -> &'static str {
match self {
ValidatorTestCase::Ping => "Ping",
ValidatorTestCase::PingMeasure => "PingMeasure",
ValidatorTestCase::PingLoad => "PingLoad",
}
}
}

/// Arguments for the validator test command.
#[derive(Args, Clone, Debug)]
pub struct TestValidatorArgs {
Expand All @@ -30,10 +76,231 @@ pub struct TestValidatorArgs {
}

/// Runs the validator client tests.
pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> {
// TODO: Implement validator tests
// - Ping
// - PingMeasure
// - PingLoad
unimplemented!("validator test not yet implemented")
pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result<TestCategoryResult> {
tracing::info!("Starting validator client test");

let start_time = Instant::now();

// Get and filter test cases
let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases
{
ValidatorTestCase::all()
.iter()
.filter(|tc| filter.contains(&tc.name().to_string()))
.copied()
.collect()
} else {
ValidatorTestCase::all().to_vec()
};

if queued_tests.is_empty() {
return Err(crate::error::CliError::Other(
"test case not supported".into(),
));
}

// Run tests with timeout
let test_results = run_tests_with_timeout(&args, &queued_tests).await;

let score = calculate_score(&test_results);

let mut res = TestCategoryResult::new(TestCategory::Validator);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was a bit confused as this was the first time writing this, so I took AI's suggestion to write the run function. Do take a look if any changes are needed here.

res.targets.insert(args.api_address.clone(), test_results);
res.execution_time = Some(CliDuration::new(start_time.elapsed()));
res.score = Some(score);

if !args.test_config.quiet {
write_result_to_writer(&res, writer)?;
}

if !args.test_config.output_json.is_empty() {
write_result_to_file(&res, args.test_config.output_json.as_ref()).await?;
}

if args.test_config.publish {
let all = AllCategoriesResult {
validator: Some(res.clone()),
..Default::default()
};
publish_result_to_obol_api(
all,
&args.test_config.publish_addr,
&args.test_config.publish_private_key_file,
)
.await?;
}

Ok(res)
}

/// Timeout error message
const ERR_TIMEOUT_INTERRUPTED: &str = "timeout";

/// Runs tests with timeout, keeping completed tests on timeout.
async fn run_tests_with_timeout(
args: &TestValidatorArgs,
tests: &[ValidatorTestCase],
) -> Vec<TestResult> {
let (tx, mut rx) = mpsc::channel::<TestResult>(100);
let mut test_iter = tests.iter().peekable();

let timeout_result = tokio::time::timeout(args.test_config.timeout, async {
for &test_case in test_iter.by_ref() {
let result = run_single_test(args, test_case).await;
let _ = tx.send(result).await;
}
})
.await;

// Collect all completed results
drop(tx);
let mut results = Vec::new();
while let Ok(result) = rx.try_recv() {
results.push(result);
}

if timeout_result.is_err()
&& let Some(&interrupted_test) = test_iter.peek()
{
results.push(
TestResult::new(interrupted_test.name())
.fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)),
);
}

results
}

/// Runs a single test case.
async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult {
match test_case {
ValidatorTestCase::Ping => ping_test(args).await,
ValidatorTestCase::PingMeasure => ping_measure_test(args).await,
ValidatorTestCase::PingLoad => ping_load_test(args).await,
}
}

async fn ping_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::Ping.name());

match timeout(
Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
result.verdict = TestVerdict::Ok;
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::PingMeasure.name());
let before = Instant::now();

match timeout(
Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
let rtt = before.elapsed();
result = evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR);
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_load_test(args: &TestValidatorArgs) -> TestResult {
tracing::info!(
duration = ?args.load_test_duration,
target = %args.api_address,
"Running ping load tests..."
);

let mut result = TestResult::new(ValidatorTestCase::PingLoad.name());

let (tx, mut rx) = mpsc::channel::<Duration>(100);
let address = args.api_address.clone();
let duration = args.load_test_duration;

let handle = tokio::spawn(async move {
let start = Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(1));

Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interval tick at line 192 causes the first iteration to fire immediately, but then subsequent iterations wait 1 second. This creates inconsistent timing in the load test. The first ping_continuously task starts immediately, but subsequent tasks start at 1-second intervals. Consider using interval.tick().await before the loop or skip the first tick to ensure consistent spacing between spawned tasks.

Suggested change
// Discard the first immediate tick so that all subsequent ticks are spaced by 1 second.
interval.tick().await;

Copilot uses AI. Check for mistakes.
interval.tick().await;
while start.elapsed() < duration {
interval.tick().await;

let tx = tx.clone();
let addr = address.clone();
let remaining = duration.saturating_sub(start.elapsed());

tokio::spawn(async move {
ping_continuously(addr, tx, remaining).await;
});
}
Comment on lines +250 to +265
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The load test can spawn an unbounded number of concurrent tasks. For a 5-second test, up to 5 tasks are spawned (one per second), each running ping_continuously. However, for longer durations, this could spawn many concurrent tasks. Additionally, each ping_continuously task continuously creates connections in a loop with only a small random sleep between attempts (0-100ms). This could create hundreds or thousands of concurrent connections, potentially overwhelming the system or the target validator. Consider limiting the maximum number of concurrent tasks or the connection rate per task.

Copilot uses AI. Check for mistakes.
});

let _ = handle.await;

let mut rtts = Vec::new();
while let Ok(rtt) = rx.try_recv() {
rtts.push(rtt);
Comment on lines +271 to +272
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition in the ping_load_test function. After the spawned tasks complete (line 204), the code immediately tries to collect RTT results using try_recv. However, the spawned tasks in lines 198-200 may not have finished sending their results yet because they're not awaited. This means some RTT measurements could be lost, leading to incomplete or inaccurate test results.

The issue is that when handle.await completes, it only means the spawning loop finished, not that all the spawned ping_continuously tasks have completed and sent their RTTs. Consider using a bounded channel or waiting for all tasks to complete before collecting results.

Suggested change
while let Ok(rtt) = rx.try_recv() {
rtts.push(rtt);
loop {
match rx.recv() {
Ok(rtt) => rtts.push(rtt),
Err(_) => break,
}

Copilot uses AI. Check for mistakes.
}

tracing::info!(target = %args.api_address, "Ping load tests finished");

Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no RTT measurements are collected (rtts is empty), evaluate_highest_rtt returns a zero duration which gets marked as "Poor" by evaluate_rtt. This could be misleading - if no measurements were collected due to connection failures, the test should probably be marked as "Fail" rather than "Poor". Consider explicitly checking if rtts is empty and returning an appropriate failure result.

Suggested change
if rtts.is_empty() {
return result.fail(std::io::Error::new(
std::io::ErrorKind::Other,
"no RTT measurements collected during ping load test",
));
}

Copilot uses AI. Check for mistakes.
result = evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR);

result
}

async fn ping_continuously(address: String, tx: mpsc::Sender<Duration>, max_duration: Duration) {
let start = Instant::now();

while start.elapsed() < max_duration {
let before = Instant::now();

match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await {
Ok(Ok(conn)) => {
let rtt = before.elapsed();
if tx.send(rtt).await.is_err() {
drop(conn);
return;
}
}
Comment on lines +288 to +295
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TCP connections established during tests are not being explicitly closed. While Rust will drop them when the variable goes out of scope, it's better practice to explicitly close connections in test scenarios to avoid exhausting file descriptors during load tests. Consider explicitly dropping the connection or storing it in a variable that's immediately dropped after measuring the RTT.

Copilot uses AI. Check for mistakes.
Ok(Err(e)) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test");
}
Err(e) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test");
}
}
let sleep_ms = rand::thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
}