From efd421b7f0c6c46fdbc46dd7ebead7a9133f1eca Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Tue, 13 Jan 2026 11:34:00 +0100 Subject: [PATCH 1/3] feat(sidecar): implement thread listener module Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 53 +++++ datadog-sidecar/src/setup/mod.rs | 6 + datadog-sidecar/src/setup/thread_listener.rs | 222 +++++++++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 datadog-sidecar/src/setup/thread_listener.rs diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 78bab1d2e6..4659b70ae6 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -310,6 +310,59 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - MaybeError::None } +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + let cfg = datadog_sidecar::config::FromEnv::config(); + try_c!(MasterListener::start(pid, cfg)); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_connect_worker( + pid: i32, + connection: &mut *mut SidecarTransport, +) -> MaybeError { + use datadog_sidecar::setup::connect_to_master; + + let transport = try_c!(connect_to_master(pid)); + *connection = Box::into_raw(transport); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + try_c!(MasterListener::shutdown()); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { + use datadog_sidecar::setup::MasterListener; + + MasterListener::is_active(pid) +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + try_c!(MasterListener::clear_inherited_state()); + + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 07c837aab0..fc0ed95ff5 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,6 +12,12 @@ mod windows; #[cfg(windows)] pub use self::windows::*; +// Thread-based listener module (Unix only) +#[cfg(unix)] +pub mod thread_listener; +#[cfg(unix)] +pub use thread_listener::{MasterListener, connect_to_master}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs new file mode 100644 index 0000000000..6783e2cde2 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -0,0 +1,222 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::{Arc, Mutex, OnceLock, mpsc}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use std::io; +use tokio::net::UnixListener; +use tokio::runtime::Runtime; +use tracing::{info, error}; + +use crate::config::Config; +use crate::config::IpcMode::{InstancePerProcess, Shared}; +use crate::service::blocking::SidecarTransport; +use crate::service::SidecarServer; +use crate::setup::{Liaison, SharedDirLiaison}; +use datadog_ipc::platform::AsyncChannel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: mpsc::Sender<()>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread. + /// + /// This spawns a new OS thread with a Tokio runtime that listens for + /// worker connections. Only one listener can be active per process. + pub fn start(pid: i32, config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let (shutdown_tx, shutdown_rx) = mpsc::channel(); + + // Wrap shutdown receiver in Arc> for sharing with async function + let shutdown_rx = Arc::new(Mutex::new(shutdown_rx)); + + let runtime = Runtime::new() + .map_err(|e| io::Error::other(format!("Failed to create Tokio runtime: {}", e)))?; + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + runtime.block_on(async { + if let Err(e) = run_listener(config, shutdown_rx).await { + error!("Listener thread error: {}", e); + } + }); + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx, + thread_handle: Some(thread_handle), + pid, + }); + + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + let _ = master.shutdown_tx.send(()); + + // Give the runtime a moment to process shutdown + std::thread::sleep(Duration::from_millis(100)); + + if let Some(handle) = master.thread_handle.take() { + handle.join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + /// + /// Used for fork detection: child processes inherit the listener state + /// but don't own the actual thread. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state after fork. + /// + /// Child processes must call this to prevent attempting to use the + /// parent's listener thread, which doesn't exist in the child. + pub fn clear_inherited_state() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + info!("Clearing inherited master listener state in child process"); + *listener_guard = None; + } + + Ok(()) + } +} + +/// Async listener loop that accepts worker connections. +/// +/// This runs in the listener thread's Tokio runtime and handles: +/// - Accepting new worker connections +/// - Spawning handlers for each connection +/// - Graceful shutdown on signal +async fn run_listener(config: Config, shutdown_rx: Arc>>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); + + // Create IPC server using the platform-specific Liaison + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let std_listener = liaison.attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + + std_listener.set_nonblocking(true)?; + let ipc_server = UnixListener::from_std(std_listener)?; + + info!("IPC server listening for worker connections"); + + let server = SidecarServer::default(); + + loop { + if let Ok(rx) = shutdown_rx.lock() { + if rx.try_recv().is_ok() || matches!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)) { + info!("Shutdown signal received, exiting listener loop"); + break; + } + } + + match tokio::time::timeout(Duration::from_millis(100), ipc_server.accept()).await { + Ok(Ok((client, _addr))) => { + info!("Accepted new worker connection"); + let server_clone = server.clone(); + + tokio::spawn(async move { + handle_worker_connection(client, server_clone).await; + }); + } + Ok(Err(e)) => { + error!("Failed to accept worker connection: {}", e); + } + Err(_) => { + // Timeout - continue loop to check shutdown signal + continue; + } + } + } + + info!("Listener thread shutting down"); + Ok(()) +} + +/// Handle a single worker connection. +/// +/// Processes requests from the worker and sends responses until the +/// connection is closed. +async fn handle_worker_connection( + client: tokio::net::UnixStream, + server: SidecarServer, +) { + info!("Handling worker connection"); + server.accept_connection(AsyncChannel::from(client)).await; + info!("Worker connection handler exiting"); +} + +/// Connect to the master listener as a worker. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener (PID {})", pid); + + let config = Config::get(); + + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let channel = liaison.connect_to_server() + .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, // Reconnection handled by caller + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} From 7a21e1d80de408db4de560016465bf582f9a3e1b Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 23 Jan 2026 11:29:30 +0100 Subject: [PATCH 2/3] feat(sidecar): apply feedbacks Signed-off-by: Alexandre Rulleau --- datadog-sidecar/src/entry.rs | 98 ++++++++--- datadog-sidecar/src/setup/mod.rs | 2 +- datadog-sidecar/src/setup/thread_listener.rs | 173 +++++++++---------- 3 files changed, 159 insertions(+), 114 deletions(-) diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 218ec23252..14c5594fe4 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -15,7 +15,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; #[cfg(unix)] use crate::crashtracker::crashtracker_unix_socket_path; @@ -32,7 +32,28 @@ use crate::tracer::SHM_LIMITER; use crate::watchdog::Watchdog; use crate::{ddog_daemon_entry_point, setup_daemon_process}; -async fn main_loop(listener: L, cancel: Arc) -> io::Result<()> +/// Configuration for main_loop behavior +pub struct MainLoopConfig { + pub enable_ctrl_c_handler: bool, + pub enable_crashtracker: bool, + pub external_shutdown_rx: Option>, +} + +impl Default for MainLoopConfig { + fn default() -> Self { + Self { + enable_ctrl_c_handler: true, + enable_crashtracker: true, + external_shutdown_rx: None, + } + } +} + +pub async fn main_loop( + listener: L, + cancel: Arc, + loop_config: MainLoopConfig, +) -> io::Result<()> where L: FnOnce(Box) -> Fut, Fut: Future>, @@ -64,29 +85,45 @@ where } }); - tokio::spawn(async move { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::error!("Error setting up signal handler {}", err); - } - tracing::info!("Received Ctrl-C Signal, shutting down"); - cancel(); - }); + if let Some(shutdown_rx) = loop_config.external_shutdown_rx { + let cancel = cancel.clone(); + tokio::spawn(async move { + let _ = shutdown_rx.await; + tracing::info!("External shutdown signal received"); + cancel(); + }); + } + + if loop_config.enable_ctrl_c_handler { + let cancel = cancel.clone(); + tokio::spawn(async move { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!("Error setting up signal handler {}", err); + } + tracing::info!("Received Ctrl-C Signal, shutting down"); + cancel(); + }); + } #[cfg(unix)] - tokio::spawn(async move { - let socket_path = crashtracker_unix_socket_path(); - match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default()) - { - Ok(listener) => loop { - if let Err(e) = - libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await - { - tracing::warn!("Got error while receiving crash report: {e}"); - } - }, - Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), - } - }); + if loop_config.enable_crashtracker { + tokio::spawn(async move { + let socket_path = crashtracker_unix_socket_path(); + match libdd_crashtracker::get_receiver_unix_socket( + socket_path.to_str().unwrap_or_default(), + ) { + Ok(listener) => loop { + if let Err(e) = + libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener) + .await + { + tracing::warn!("Got error while receiving crash report: {e}"); + } + }, + Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), + } + }); + } // Init. Early, before we start listening. drop(SHM_LIMITER.lock()); @@ -143,6 +180,19 @@ where } pub fn enter_listener_loop(acquire_listener: F) -> anyhow::Result<()> +where + F: FnOnce() -> io::Result<(L, C)>, + L: FnOnce(Box) -> Fut, + Fut: Future>, + C: Fn() + Sync + Send + 'static, +{ + enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default()) +} + +pub fn enter_listener_loop_with_config( + acquire_listener: F, + loop_config: MainLoopConfig, +) -> anyhow::Result<()> where F: FnOnce() -> io::Result<(L, C)>, L: FnOnce(Box) -> Fut, @@ -159,7 +209,7 @@ where let (listener, cancel) = acquire_listener()?; runtime - .block_on(main_loop(listener, Arc::new(cancel))) + .block_on(main_loop(listener, Arc::new(cancel), loop_config)) .map_err(|e| e.into()) } diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index fc0ed95ff5..1417c8bde1 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -16,7 +16,7 @@ pub use self::windows::*; #[cfg(unix)] pub mod thread_listener; #[cfg(unix)] -pub use thread_listener::{MasterListener, connect_to_master}; +pub use thread_listener::{connect_to_master, MasterListener}; use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index 6783e2cde2..dd41d0f7d7 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -1,26 +1,24 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::sync::{Arc, Mutex, OnceLock, mpsc}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; use std::io; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; use tokio::net::UnixListener; -use tokio::runtime::Runtime; -use tracing::{info, error}; +use tokio::sync::oneshot; +use tracing::{error, info}; use crate::config::Config; use crate::config::IpcMode::{InstancePerProcess, Shared}; +use crate::entry::MainLoopConfig; use crate::service::blocking::SidecarTransport; -use crate::service::SidecarServer; use crate::setup::{Liaison, SharedDirLiaison}; -use datadog_ipc::platform::AsyncChannel; use datadog_ipc::transport::blocking::BlockingTransport; static MASTER_LISTENER: OnceLock>> = OnceLock::new(); pub struct MasterListener { - shutdown_tx: mpsc::Sender<()>, + shutdown_tx: Option>, thread_handle: Option>, pid: i32, } @@ -28,38 +26,32 @@ pub struct MasterListener { impl MasterListener { /// Start the master listener thread. /// - /// This spawns a new OS thread with a Tokio runtime that listens for - /// worker connections. Only one listener can be active per process. + /// This spawns a new OS thread that calls enter_listener_loop_with_config + /// to create a Tokio runtime and listen for worker connections. + /// Only one listener can be active per process. pub fn start(pid: i32, config: Config) -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if listener_guard.is_some() { return Err(io::Error::other("Master listener is already running")); } - let (shutdown_tx, shutdown_rx) = mpsc::channel(); - - // Wrap shutdown receiver in Arc> for sharing with async function - let shutdown_rx = Arc::new(Mutex::new(shutdown_rx)); - - let runtime = Runtime::new() - .map_err(|e| io::Error::other(format!("Failed to create Tokio runtime: {}", e)))?; + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let thread_handle = thread::Builder::new() .name(format!("ddtrace-sidecar-listener-{}", pid)) .spawn(move || { - runtime.block_on(async { - if let Err(e) = run_listener(config, shutdown_rx).await { - error!("Listener thread error: {}", e); - } - }); + if let Err(e) = run_listener(config, shutdown_rx) { + error!("Listener thread error: {}", e); + } }) .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; *listener_guard = Some(MasterListener { - shutdown_tx, + shutdown_tx: Some(shutdown_tx), thread_handle: Some(thread_handle), pid, }); @@ -73,17 +65,19 @@ impl MasterListener { /// and will wait for the thread to exit cleanly. pub fn shutdown() -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if let Some(mut master) = listener_guard.take() { - let _ = master.shutdown_tx.send(()); - - // Give the runtime a moment to process shutdown - std::thread::sleep(Duration::from_millis(100)); + // Signal shutdown by sending to or dropping the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } if let Some(handle) = master.thread_handle.take() { - handle.join() + handle + .join() .map_err(|_| io::Error::other("Failed to join listener thread"))?; } @@ -113,7 +107,8 @@ impl MasterListener { /// parent's listener thread, which doesn't exist in the child. pub fn clear_inherited_state() -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if listener_guard.is_some() { @@ -125,75 +120,74 @@ impl MasterListener { } } -/// Async listener loop that accepts worker connections. -/// -/// This runs in the listener thread's Tokio runtime and handles: -/// - Accepting new worker connections -/// - Spawning handlers for each connection -/// - Graceful shutdown on signal -async fn run_listener(config: Config, shutdown_rx: Arc>>) -> io::Result<()> { - info!("Listener thread running, creating IPC server"); +/// Accept connections in a loop for thread mode. +async fn accept_socket_loop_thread( + listener: UnixListener, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in thread listener"); + break; + } + accept = listener.accept() => { + match accept { + Ok((socket, _)) => { + info!("Accepted new worker connection"); + handler(socket); + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + break; + } + } + } + } + } + Ok(()) +} - // Create IPC server using the platform-specific Liaison - let liaison: SharedDirLiaison = match config.ipc_mode { - Shared => Liaison::ipc_shared(), - InstancePerProcess => Liaison::ipc_per_process(), - }; +/// Entry point for thread listener - calls enter_listener_loop_with_config +fn run_listener(config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); - let std_listener = liaison.attempt_listen()? - .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + let acquire_listener = move || { + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; - std_listener.set_nonblocking(true)?; - let ipc_server = UnixListener::from_std(std_listener)?; + let std_listener = liaison + .attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; - info!("IPC server listening for worker connections"); + std_listener.set_nonblocking(true)?; + let listener = UnixListener::from_std(std_listener)?; - let server = SidecarServer::default(); + info!("IPC server listening for worker connections"); - loop { - if let Ok(rx) = shutdown_rx.lock() { - if rx.try_recv().is_ok() || matches!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)) { - info!("Shutdown signal received, exiting listener loop"); - break; - } - } + let cancel = || {}; + Ok(( + move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), + cancel, + )) + }; - match tokio::time::timeout(Duration::from_millis(100), ipc_server.accept()).await { - Ok(Ok((client, _addr))) => { - info!("Accepted new worker connection"); - let server_clone = server.clone(); + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; - tokio::spawn(async move { - handle_worker_connection(client, server_clone).await; - }); - } - Ok(Err(e)) => { - error!("Failed to accept worker connection: {}", e); - } - Err(_) => { - // Timeout - continue loop to check shutdown signal - continue; - } - } - } + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Thread listener failed: {}", e)))?; - info!("Listener thread shutting down"); + info!("Listener thread exiting"); Ok(()) } -/// Handle a single worker connection. -/// -/// Processes requests from the worker and sends responses until the -/// connection is closed. -async fn handle_worker_connection( - client: tokio::net::UnixStream, - server: SidecarServer, -) { - info!("Handling worker connection"); - server.accept_connection(AsyncChannel::from(client)).await; - info!("Worker connection handler exiting"); -} - /// Connect to the master listener as a worker. /// /// Establishes a connection to the master listener thread for the given PID. @@ -207,7 +201,8 @@ pub fn connect_to_master(pid: i32) -> io::Result> { InstancePerProcess => Liaison::ipc_per_process(), }; - let channel = liaison.connect_to_server() + let channel = liaison + .connect_to_server() .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; let transport = BlockingTransport::from(channel); From cc2fb3d1ae47cc0cabee323cb96202a318866bbe Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 23 Jan 2026 13:27:43 +0100 Subject: [PATCH 3/3] feat(sidecar): support threaded connection for windows Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 17 +- datadog-sidecar/src/setup/mod.rs | 8 +- datadog-sidecar/src/setup/thread_listener.rs | 2 +- .../src/setup/thread_listener_windows.rs | 211 ++++++++++++++++++ 4 files changed, 221 insertions(+), 17 deletions(-) create mode 100644 datadog-sidecar/src/setup/thread_listener_windows.rs diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 4659b70ae6..4370c41c23 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -60,6 +60,8 @@ use std::slice; use std::sync::Arc; use std::time::Duration; +use datadog_sidecar::setup::{connect_to_master, MasterListener}; + #[no_mangle] #[cfg(target_os = "windows")] pub extern "C" fn ddog_setup_crashtracking( @@ -311,10 +313,7 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { - use datadog_sidecar::setup::MasterListener; - let cfg = datadog_sidecar::config::FromEnv::config(); try_c!(MasterListener::start(pid, cfg)); @@ -322,13 +321,10 @@ pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_connect_worker( pid: i32, connection: &mut *mut SidecarTransport, ) -> MaybeError { - use datadog_sidecar::setup::connect_to_master; - let transport = try_c!(connect_to_master(pid)); *connection = Box::into_raw(transport); @@ -336,28 +332,19 @@ pub extern "C" fn ddog_sidecar_connect_worker( } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { - use datadog_sidecar::setup::MasterListener; - try_c!(MasterListener::shutdown()); MaybeError::None } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { - use datadog_sidecar::setup::MasterListener; - MasterListener::is_active(pid) } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { - use datadog_sidecar::setup::MasterListener; - try_c!(MasterListener::clear_inherited_state()); MaybeError::None diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 1417c8bde1..b82587b217 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,12 +12,18 @@ mod windows; #[cfg(windows)] pub use self::windows::*; -// Thread-based listener module (Unix only) +// Thread-based listener module (Unix) #[cfg(unix)] pub mod thread_listener; #[cfg(unix)] pub use thread_listener::{connect_to_master, MasterListener}; +// Thread-based listener module (Windows) +#[cfg(windows)] +pub mod thread_listener_windows; +#[cfg(windows)] +pub use thread_listener_windows::{connect_to_master, MasterListener}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index dd41d0f7d7..815fe1466d 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -209,7 +209,7 @@ pub fn connect_to_master(pid: i32) -> io::Result> { let sidecar_transport = Box::new(SidecarTransport { inner: Mutex::new(transport), - reconnect_fn: None, // Reconnection handled by caller + reconnect_fn: None, }); info!("Successfully connected to master listener"); diff --git a/datadog-sidecar/src/setup/thread_listener_windows.rs b/datadog-sidecar/src/setup/thread_listener_windows.rs new file mode 100644 index 0000000000..2414ff0a14 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener_windows.rs @@ -0,0 +1,211 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +use datadog_ipc::platform::metadata::ProcessHandle; +use datadog_ipc::platform::Channel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread using Windows Named Pipes. + /// + /// This spawns a new OS thread that creates a named pipe server + /// to listen for worker connections. Only one listener can be active per process. + pub fn start(pid: i32, _config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + info!("Started Windows named pipe listener (PID {})", pid); + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state. + /// Kept for API compatibility with Unix version. + pub fn clear_inherited_state() -> io::Result<()> { + Ok(()) + } +} + +/// Accept connections in a loop for Windows named pipes. +async fn accept_pipe_loop_windows( + pipe_name: String, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + let mut server = ServerOptions::new() + .first_pipe_instance(true) + .max_instances(254) // Windows allows up to 255 instances + .create(&pipe_name)?; + + info!("Named pipe server created at: {}", pipe_name); + + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in Windows pipe listener"); + break; + } + result = server.connect() => { + match result { + Ok(_) => { + info!("Accepted new worker connection on named pipe"); + handler(server); + + server = ServerOptions::new() + .create(&pipe_name)?; + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + match ServerOptions::new().create(&pipe_name) { + Ok(new_server) => server = new_server, + Err(e2) => { + error!("Failed to recover named pipe: {}", e2); + break; + } + } + } + } + } + } + } + Ok(()) +} + +/// Entry point for Windows named pipe listener +fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating Windows named pipe server"); + + let acquire_listener = move || { + let cancel = || {}; + let pipe_name_clone = pipe_name.clone(); + Ok(( + move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx), + cancel, + )) + }; + + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; + + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker using Windows Named Pipes. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener via named pipe (PID {})", pid); + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + + let client = ClientOptions::new().open(&pipe_name)?; + + info!("Connected to named pipe: {}", pipe_name); + + let raw_handle = client.as_raw_handle(); + let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) }; + + std::mem::forget(client); + + let process_handle = + ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32)))); + let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle); + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +}