From b45292d7b9cc06b79aff482d2a7c6cb044d87e88 Mon Sep 17 00:00:00 2001 From: PoulavBhowmick03 Date: Thu, 19 Feb 2026 18:47:40 +0530 Subject: [PATCH 1/3] feat: added cli validator tests --- crates/cli/src/commands/test/validator.rs | 216 +++++++++++++++++++++- 1 file changed, 207 insertions(+), 9 deletions(-) diff --git a/crates/cli/src/commands/test/validator.rs b/crates/cli/src/commands/test/validator.rs index 26cd64d1..7b5e34ab 100644 --- a/crates/cli/src/commands/test/validator.rs +++ b/crates/cli/src/commands/test/validator.rs @@ -1,9 +1,26 @@ //! Validator client connectivity tests. -use super::{TestCategoryResult, TestConfigArgs}; -use crate::error::Result; +use std::{collections::HashMap, io::Write, sync::mpsc, time::Duration}; + use clap::Args; -use std::{io::Write, time::Duration}; +use rand::Rng; +use tokio::{ + net::TcpStream, + time::{Instant, timeout}, +}; + +use super::{ + AllCategoriesResult, TestCaseName, TestCategory, TestCategoryResult, TestConfigArgs, + TestResult, TestVerdict, calculate_score, evaluate_highest_rtt, evaluate_rtt, filter_tests, + publish_result_to_obol_api, sort_tests, write_result_to_file, write_result_to_writer, +}; +use crate::{duration::Duration as CliDuration, error::Result}; + +// Thresholds (from Go implementation) +const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50); +const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240); +const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50); +const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240); /// Arguments for the validator test command. #[derive(Args, Clone, Debug)] @@ -30,10 +47,191 @@ pub struct TestValidatorArgs { } /// Runs the validator client tests. -pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result { - // TODO: Implement validator tests - // - Ping - // - PingMeasure - // - PingLoad - unimplemented!("validator test not yet implemented") +pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result { + tracing::info!("Starting validator client test"); + + let start_time = Instant::now(); + + // Get and filter test cases + let all_test_cases = HashMap::from([ + (TestCaseName::new("Ping", 1), ()), + (TestCaseName::new("PingMeasure", 2), ()), + (TestCaseName::new("PingLoad", 3), ()), + ]); + let mut queued_tests = filter_tests(&all_test_cases, args.test_config.test_cases.as_deref()); + + if queued_tests.is_empty() { + return Err(crate::error::CliError::Other( + "test case not supported".into(), + )); + } + + sort_tests(&mut queued_tests); + + // Run tests with timeout + let test_results = tokio::time::timeout(args.test_config.timeout, async { + let mut results = Vec::new(); + for test in queued_tests.iter() { + let result = match test.name.as_str() { + "Ping" => ping_test(&args).await, + "PingMeasure" => ping_measure_test(&args).await, + "PingLoad" => ping_load_test(&args).await, + _ => TestResult::new(&test.name).fail(std::io::Error::other("unknown test")), + }; + results.push(result); + } + results + }) + .await + .unwrap_or_else(|_| { + vec![TestResult::new("Timeout").fail(std::io::Error::other("timeout interrupted"))] + }); + let score = calculate_score(&test_results); + + let mut res = TestCategoryResult::new(TestCategory::Validator); + res.targets.insert(args.api_address.clone(), test_results); + res.execution_time = Some(CliDuration::new(start_time.elapsed())); + res.score = Some(score); + + if !args.test_config.quiet { + write_result_to_writer(&res, writer)?; + } + + if !args.test_config.output_json.is_empty() { + write_result_to_file(&res, args.test_config.output_json.as_ref()).await?; + } + + if args.test_config.publish { + let all = AllCategoriesResult { + validator: Some(res.clone()), + ..Default::default() + }; + publish_result_to_obol_api( + all, + &args.test_config.publish_addr, + &args.test_config.publish_private_key_file, + ) + .await?; + } + + Ok(res) +} + +async fn ping_test(args: &TestValidatorArgs) -> TestResult { + let mut result = TestResult::new("Ping"); + + match timeout( + Duration::from_secs(1), + TcpStream::connect(&args.api_address), + ) + .await + { + Ok(Ok(_conn)) => { + result.verdict = TestVerdict::Ok; + } + Ok(Err(e)) => { + return result.fail(e); + } + Err(_) => { + return result.fail(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "connection timeout", + )); + } + } + + result +} + +async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult { + let mut result = TestResult::new("PingMeasure"); + let before = Instant::now(); + + match timeout( + Duration::from_secs(1), + TcpStream::connect(&args.api_address), + ) + .await + { + Ok(Ok(_conn)) => { + let rtt = before.elapsed(); + result = evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR); + } + Ok(Err(e)) => { + return result.fail(e); + } + Err(_) => { + return result.fail(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "connection timeout", + )); + } + } + + result +} + +async fn ping_load_test(args: &TestValidatorArgs) -> TestResult { + tracing::info!( + duration = ?args.load_test_duration, + target = %args.api_address, + "Running ping load tests..." + ); + + let mut result = TestResult::new("PingLoad"); + + let (tx, rx) = mpsc::channel::(); + let address = args.api_address.clone(); + let duration = args.load_test_duration; + + let handle = tokio::spawn(async move { + let start = Instant::now(); + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + while start.elapsed() < duration { + interval.tick().await; + + let tx = tx.clone(); + let addr = address.clone(); + let remaining = duration.saturating_sub(start.elapsed()); + + tokio::spawn(async move { + ping_continuously(addr, tx, remaining).await; + }); + } + }); + + let _ = handle.await; + + let mut rtts = Vec::new(); + while let Ok(rtt) = rx.try_recv() { + rtts.push(rtt); + } + + tracing::info!(target = %args.api_address, "Ping load tests finished"); + + result = evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR); + + result +} + +async fn ping_continuously(address: String, tx: mpsc::Sender, max_duration: Duration) { + let start = Instant::now(); + + while start.elapsed() < max_duration { + let before = Instant::now(); + + match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await { + Ok(Ok(_conn)) => { + let rtt = before.elapsed(); + if tx.send(rtt).is_err() { + return; + } + } + _ => return, + } + + let sleep_ms = rand::thread_rng().gen_range(0..100); + tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + } } From 2ee355bfe9b12deb0bb4507fb0c8743b9044fa92 Mon Sep 17 00:00:00 2001 From: PoulavBhowmick03 Date: Thu, 26 Feb 2026 21:22:30 +0530 Subject: [PATCH 2/3] suggestions --- crates/cli/src/commands/test/mod.rs | 13 +- crates/cli/src/commands/test/validator.rs | 143 ++++++++++++++++------ 2 files changed, 110 insertions(+), 46 deletions(-) diff --git a/crates/cli/src/commands/test/mod.rs b/crates/cli/src/commands/test/mod.rs index 800d571c..d16f097d 100644 --- a/crates/cli/src/commands/test/mod.rs +++ b/crates/cli/src/commands/test/mod.rs @@ -120,18 +120,13 @@ pub struct TestConfigArgs { } /// Lists available test case names for a given test category. -/// TODO: Fill with enums TestCases of each category fn list_test_cases(category: TestCategory) -> Vec { // Returns available test case names for each category. match category { - TestCategory::Validator => { - // From validator::supported_validator_test_cases() - vec![ - "Ping".to_string(), - "PingMeasure".to_string(), - "PingLoad".to_string(), - ] - } + TestCategory::Validator => validator::ValidatorTestCase::all() + .iter() + .map(|tc| tc.name().to_string()) + .collect(), TestCategory::Beacon => { // TODO: Extract from beacon::supported_beacon_test_cases() vec![] diff --git a/crates/cli/src/commands/test/validator.rs b/crates/cli/src/commands/test/validator.rs index 7b5e34ab..82bcfb18 100644 --- a/crates/cli/src/commands/test/validator.rs +++ b/crates/cli/src/commands/test/validator.rs @@ -1,18 +1,19 @@ //! Validator client connectivity tests. -use std::{collections::HashMap, io::Write, sync::mpsc, time::Duration}; +use std::{io::Write, time::Duration}; use clap::Args; use rand::Rng; use tokio::{ net::TcpStream, + sync::mpsc, time::{Instant, timeout}, }; use super::{ - AllCategoriesResult, TestCaseName, TestCategory, TestCategoryResult, TestConfigArgs, - TestResult, TestVerdict, calculate_score, evaluate_highest_rtt, evaluate_rtt, filter_tests, - publish_result_to_obol_api, sort_tests, write_result_to_file, write_result_to_writer, + AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict, + calculate_score, evaluate_highest_rtt, evaluate_rtt, publish_result_to_obol_api, + write_result_to_file, write_result_to_writer, }; use crate::{duration::Duration as CliDuration, error::Result}; @@ -22,6 +23,34 @@ const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240); const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50); const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240); +/// Validator test cases. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ValidatorTestCase { + Ping, + PingMeasure, + PingLoad, +} + +impl ValidatorTestCase { + /// Returns all validator test cases. + pub fn all() -> &'static [ValidatorTestCase] { + &[ + ValidatorTestCase::Ping, + ValidatorTestCase::PingMeasure, + ValidatorTestCase::PingLoad, + ] + } + + /// Returns the test name as a string. + pub fn name(&self) -> &'static str { + match self { + ValidatorTestCase::Ping => "Ping", + ValidatorTestCase::PingMeasure => "PingMeasure", + ValidatorTestCase::PingLoad => "PingLoad", + } + } +} + /// Arguments for the validator test command. #[derive(Args, Clone, Debug)] pub struct TestValidatorArgs { @@ -53,12 +82,16 @@ pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result = if let Some(ref filter) = args.test_config.test_cases + { + ValidatorTestCase::all() + .iter() + .filter(|tc| filter.contains(&tc.name().to_string())) + .copied() + .collect() + } else { + ValidatorTestCase::all().to_vec() + }; if queued_tests.is_empty() { return Err(crate::error::CliError::Other( @@ -66,26 +99,9 @@ pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result ping_test(&args).await, - "PingMeasure" => ping_measure_test(&args).await, - "PingLoad" => ping_load_test(&args).await, - _ => TestResult::new(&test.name).fail(std::io::Error::other("unknown test")), - }; - results.push(result); - } - results - }) - .await - .unwrap_or_else(|_| { - vec![TestResult::new("Timeout").fail(std::io::Error::other("timeout interrupted"))] - }); + let test_results = run_tests_with_timeout(&args, &queued_tests).await; + let score = calculate_score(&test_results); let mut res = TestCategoryResult::new(TestCategory::Validator); @@ -117,8 +133,55 @@ pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result Vec { + let (tx, mut rx) = mpsc::channel::(100); + let mut test_iter = tests.iter().peekable(); + + let timeout_result = tokio::time::timeout(args.test_config.timeout, async { + while let Some(&test_case) = test_iter.next() { + let result = run_single_test(args, test_case).await; + let _ = tx.send(result); + } + }) + .await; + + // Collect all completed results + drop(tx); + let mut results = Vec::new(); + while let Ok(result) = rx.try_recv() { + results.push(result); + } + + if timeout_result.is_err() { + if let Some(&interrupted_test) = test_iter.peek() { + results.push( + TestResult::new(interrupted_test.name()) + .fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)), + ); + } + } + + results +} + +/// Runs a single test case. +async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult { + match test_case { + ValidatorTestCase::Ping => ping_test(args).await, + ValidatorTestCase::PingMeasure => ping_measure_test(args).await, + ValidatorTestCase::PingLoad => ping_load_test(args).await, + } +} + async fn ping_test(args: &TestValidatorArgs) -> TestResult { - let mut result = TestResult::new("Ping"); + let mut result = TestResult::new(ValidatorTestCase::Ping.name()); match timeout( Duration::from_secs(1), @@ -144,7 +207,7 @@ async fn ping_test(args: &TestValidatorArgs) -> TestResult { } async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult { - let mut result = TestResult::new("PingMeasure"); + let mut result = TestResult::new(ValidatorTestCase::PingMeasure.name()); let before = Instant::now(); match timeout( @@ -178,9 +241,9 @@ async fn ping_load_test(args: &TestValidatorArgs) -> TestResult { "Running ping load tests..." ); - let mut result = TestResult::new("PingLoad"); + let mut result = TestResult::new(ValidatorTestCase::PingLoad.name()); - let (tx, rx) = mpsc::channel::(); + let (tx, mut rx) = mpsc::channel::(100); let address = args.api_address.clone(); let duration = args.load_test_duration; @@ -188,6 +251,7 @@ async fn ping_load_test(args: &TestValidatorArgs) -> TestResult { let start = Instant::now(); let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.tick().await; while start.elapsed() < duration { interval.tick().await; @@ -222,15 +286,20 @@ async fn ping_continuously(address: String, tx: mpsc::Sender, max_dura let before = Instant::now(); match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await { - Ok(Ok(_conn)) => { + Ok(Ok(conn)) => { let rtt = before.elapsed(); - if tx.send(rtt).is_err() { + if tx.send(rtt).await.is_err() { + drop(conn); return; } } - _ => return, + Ok(Err(e)) => { + tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test"); + } + Err(e) => { + tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test"); + } } - let sleep_ms = rand::thread_rng().gen_range(0..100); tokio::time::sleep(Duration::from_millis(sleep_ms)).await; } From 073a39d37f60866fed1ae6aaf45984e3bee2ff9e Mon Sep 17 00:00:00 2001 From: PoulavBhowmick03 Date: Thu, 26 Feb 2026 21:27:41 +0530 Subject: [PATCH 3/3] clippy --- crates/cli/src/commands/test/validator.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/crates/cli/src/commands/test/validator.rs b/crates/cli/src/commands/test/validator.rs index 82bcfb18..ded915f2 100644 --- a/crates/cli/src/commands/test/validator.rs +++ b/crates/cli/src/commands/test/validator.rs @@ -145,9 +145,9 @@ async fn run_tests_with_timeout( let mut test_iter = tests.iter().peekable(); let timeout_result = tokio::time::timeout(args.test_config.timeout, async { - while let Some(&test_case) = test_iter.next() { + for &test_case in test_iter.by_ref() { let result = run_single_test(args, test_case).await; - let _ = tx.send(result); + let _ = tx.send(result).await; } }) .await; @@ -159,13 +159,13 @@ async fn run_tests_with_timeout( results.push(result); } - if timeout_result.is_err() { - if let Some(&interrupted_test) = test_iter.peek() { - results.push( - TestResult::new(interrupted_test.name()) - .fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)), - ); - } + if timeout_result.is_err() + && let Some(&interrupted_test) = test_iter.peek() + { + results.push( + TestResult::new(interrupted_test.name()) + .fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)), + ); } results