diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 78bab1d2e6..4370c41c23 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -60,6 +60,8 @@ use std::slice; use std::sync::Arc; use std::time::Duration; +use datadog_sidecar::setup::{connect_to_master, MasterListener}; + #[no_mangle] #[cfg(target_os = "windows")] pub extern "C" fn ddog_setup_crashtracking( @@ -310,6 +312,44 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - MaybeError::None } +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { + let cfg = datadog_sidecar::config::FromEnv::config(); + try_c!(MasterListener::start(pid, cfg)); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_worker( + pid: i32, + connection: &mut *mut SidecarTransport, +) -> MaybeError { + let transport = try_c!(connect_to_master(pid)); + *connection = Box::into_raw(transport); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { + try_c!(MasterListener::shutdown()); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { + MasterListener::is_active(pid) +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { + try_c!(MasterListener::clear_inherited_state()); + + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 218ec23252..14c5594fe4 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -15,7 +15,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; #[cfg(unix)] use crate::crashtracker::crashtracker_unix_socket_path; @@ -32,7 +32,28 @@ use crate::tracer::SHM_LIMITER; use crate::watchdog::Watchdog; use crate::{ddog_daemon_entry_point, setup_daemon_process}; -async fn main_loop(listener: L, cancel: Arc) -> io::Result<()> +/// Configuration for main_loop behavior +pub struct MainLoopConfig { + pub enable_ctrl_c_handler: bool, + pub enable_crashtracker: bool, + pub external_shutdown_rx: Option>, +} + +impl Default for MainLoopConfig { + fn default() -> Self { + Self { + enable_ctrl_c_handler: true, + enable_crashtracker: true, + external_shutdown_rx: None, + } + } +} + +pub async fn main_loop( + listener: L, + cancel: Arc, + loop_config: MainLoopConfig, +) -> io::Result<()> where L: FnOnce(Box) -> Fut, Fut: Future>, @@ -64,29 +85,45 @@ where } }); - tokio::spawn(async move { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::error!("Error setting up signal handler {}", err); - } - tracing::info!("Received Ctrl-C Signal, shutting down"); - cancel(); - }); + if let Some(shutdown_rx) = loop_config.external_shutdown_rx { + let cancel = cancel.clone(); + tokio::spawn(async move { + let _ = shutdown_rx.await; + tracing::info!("External shutdown signal received"); + cancel(); + }); + } + + if loop_config.enable_ctrl_c_handler { + let cancel = cancel.clone(); + tokio::spawn(async move { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!("Error setting up signal handler {}", err); + } + tracing::info!("Received Ctrl-C Signal, shutting down"); + cancel(); + }); + } #[cfg(unix)] - tokio::spawn(async move { - let socket_path = crashtracker_unix_socket_path(); - match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default()) - { - Ok(listener) => loop { - if let Err(e) = - libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await - { - tracing::warn!("Got error while receiving crash report: {e}"); - } - }, - Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), - } - }); + if loop_config.enable_crashtracker { + tokio::spawn(async move { + let socket_path = crashtracker_unix_socket_path(); + match libdd_crashtracker::get_receiver_unix_socket( + socket_path.to_str().unwrap_or_default(), + ) { + Ok(listener) => loop { + if let Err(e) = + libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener) + .await + { + tracing::warn!("Got error while receiving crash report: {e}"); + } + }, + Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), + } + }); + } // Init. Early, before we start listening. drop(SHM_LIMITER.lock()); @@ -143,6 +180,19 @@ where } pub fn enter_listener_loop(acquire_listener: F) -> anyhow::Result<()> +where + F: FnOnce() -> io::Result<(L, C)>, + L: FnOnce(Box) -> Fut, + Fut: Future>, + C: Fn() + Sync + Send + 'static, +{ + enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default()) +} + +pub fn enter_listener_loop_with_config( + acquire_listener: F, + loop_config: MainLoopConfig, +) -> anyhow::Result<()> where F: FnOnce() -> io::Result<(L, C)>, L: FnOnce(Box) -> Fut, @@ -159,7 +209,7 @@ where let (listener, cancel) = acquire_listener()?; runtime - .block_on(main_loop(listener, Arc::new(cancel))) + .block_on(main_loop(listener, Arc::new(cancel), loop_config)) .map_err(|e| e.into()) } diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 07c837aab0..b82587b217 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,6 +12,18 @@ mod windows; #[cfg(windows)] pub use self::windows::*; +// Thread-based listener module (Unix) +#[cfg(unix)] +pub mod thread_listener; +#[cfg(unix)] +pub use thread_listener::{connect_to_master, MasterListener}; + +// Thread-based listener module (Windows) +#[cfg(windows)] +pub mod thread_listener_windows; +#[cfg(windows)] +pub use thread_listener_windows::{connect_to_master, MasterListener}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs new file mode 100644 index 0000000000..815fe1466d --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -0,0 +1,217 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::UnixListener; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::config::IpcMode::{InstancePerProcess, Shared}; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +use crate::setup::{Liaison, SharedDirLiaison}; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread. + /// + /// This spawns a new OS thread that calls enter_listener_loop_with_config + /// to create a Tokio runtime and listen for worker connections. + /// Only one listener can be active per process. + pub fn start(pid: i32, config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener(config, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to or dropping the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + /// + /// Used for fork detection: child processes inherit the listener state + /// but don't own the actual thread. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state after fork. + /// + /// Child processes must call this to prevent attempting to use the + /// parent's listener thread, which doesn't exist in the child. + pub fn clear_inherited_state() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + info!("Clearing inherited master listener state in child process"); + *listener_guard = None; + } + + Ok(()) + } +} + +/// Accept connections in a loop for thread mode. +async fn accept_socket_loop_thread( + listener: UnixListener, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in thread listener"); + break; + } + accept = listener.accept() => { + match accept { + Ok((socket, _)) => { + info!("Accepted new worker connection"); + handler(socket); + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + break; + } + } + } + } + } + Ok(()) +} + +/// Entry point for thread listener - calls enter_listener_loop_with_config +fn run_listener(config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); + + let acquire_listener = move || { + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let std_listener = liaison + .attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + + std_listener.set_nonblocking(true)?; + let listener = UnixListener::from_std(std_listener)?; + + info!("IPC server listening for worker connections"); + + let cancel = || {}; + Ok(( + move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), + cancel, + )) + }; + + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; + + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener (PID {})", pid); + + let config = Config::get(); + + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let channel = liaison + .connect_to_server() + .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} diff --git a/datadog-sidecar/src/setup/thread_listener_windows.rs b/datadog-sidecar/src/setup/thread_listener_windows.rs new file mode 100644 index 0000000000..2414ff0a14 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener_windows.rs @@ -0,0 +1,211 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +use datadog_ipc::platform::metadata::ProcessHandle; +use datadog_ipc::platform::Channel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread using Windows Named Pipes. + /// + /// This spawns a new OS thread that creates a named pipe server + /// to listen for worker connections. Only one listener can be active per process. + pub fn start(pid: i32, _config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + info!("Started Windows named pipe listener (PID {})", pid); + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state. + /// Kept for API compatibility with Unix version. + pub fn clear_inherited_state() -> io::Result<()> { + Ok(()) + } +} + +/// Accept connections in a loop for Windows named pipes. +async fn accept_pipe_loop_windows( + pipe_name: String, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + let mut server = ServerOptions::new() + .first_pipe_instance(true) + .max_instances(254) // Windows allows up to 255 instances + .create(&pipe_name)?; + + info!("Named pipe server created at: {}", pipe_name); + + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in Windows pipe listener"); + break; + } + result = server.connect() => { + match result { + Ok(_) => { + info!("Accepted new worker connection on named pipe"); + handler(server); + + server = ServerOptions::new() + .create(&pipe_name)?; + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + match ServerOptions::new().create(&pipe_name) { + Ok(new_server) => server = new_server, + Err(e2) => { + error!("Failed to recover named pipe: {}", e2); + break; + } + } + } + } + } + } + } + Ok(()) +} + +/// Entry point for Windows named pipe listener +fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating Windows named pipe server"); + + let acquire_listener = move || { + let cancel = || {}; + let pipe_name_clone = pipe_name.clone(); + Ok(( + move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx), + cancel, + )) + }; + + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; + + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker using Windows Named Pipes. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener via named pipe (PID {})", pid); + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + + let client = ClientOptions::new().open(&pipe_name)?; + + info!("Connected to named pipe: {}", pipe_name); + + let raw_handle = client.as_raw_handle(); + let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) }; + + std::mem::forget(client); + + let process_handle = + ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32)))); + let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle); + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +}