Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ use std::slice;
use std::sync::Arc;
use std::time::Duration;

use datadog_sidecar::setup::{connect_to_master, MasterListener};

#[no_mangle]
#[cfg(target_os = "windows")]
pub extern "C" fn ddog_setup_crashtracking(
Expand Down Expand Up @@ -310,6 +312,44 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
let cfg = datadog_sidecar::config::FromEnv::config();
try_c!(MasterListener::start(pid, cfg));

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_worker(
pid: i32,
connection: &mut *mut SidecarTransport,
) -> MaybeError {
let transport = try_c!(connect_to_master(pid));
*connection = Box::into_raw(transport);

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
try_c!(MasterListener::shutdown());

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
MasterListener::is_active(pid)
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
try_c!(MasterListener::clear_inherited_state());

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
try_c!(blocking::ping(transport));
Expand Down
98 changes: 74 additions & 24 deletions datadog-sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
},
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};

#[cfg(unix)]
use crate::crashtracker::crashtracker_unix_socket_path;
Expand All @@ -32,7 +32,28 @@ use crate::tracer::SHM_LIMITER;
use crate::watchdog::Watchdog;
use crate::{ddog_daemon_entry_point, setup_daemon_process};

async fn main_loop<L, C, Fut>(listener: L, cancel: Arc<C>) -> io::Result<()>
/// Configuration for main_loop behavior
pub struct MainLoopConfig {
pub enable_ctrl_c_handler: bool,
pub enable_crashtracker: bool,
pub external_shutdown_rx: Option<oneshot::Receiver<()>>,
}

impl Default for MainLoopConfig {
fn default() -> Self {
Self {
enable_ctrl_c_handler: true,
enable_crashtracker: true,
external_shutdown_rx: None,
}
}
}

pub async fn main_loop<L, C, Fut>(
listener: L,
cancel: Arc<C>,
loop_config: MainLoopConfig,
) -> io::Result<()>
where
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Fut: Future<Output = io::Result<()>>,
Expand Down Expand Up @@ -64,29 +85,45 @@ where
}
});

tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!("Error setting up signal handler {}", err);
}
tracing::info!("Received Ctrl-C Signal, shutting down");
cancel();
});
if let Some(shutdown_rx) = loop_config.external_shutdown_rx {
let cancel = cancel.clone();
tokio::spawn(async move {
let _ = shutdown_rx.await;
tracing::info!("External shutdown signal received");
cancel();
});
}

if loop_config.enable_ctrl_c_handler {
let cancel = cancel.clone();
tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!("Error setting up signal handler {}", err);
}
tracing::info!("Received Ctrl-C Signal, shutting down");
cancel();
});
}

#[cfg(unix)]
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default())
{
Ok(listener) => loop {
if let Err(e) =
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});
if loop_config.enable_crashtracker {
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
match libdd_crashtracker::get_receiver_unix_socket(
socket_path.to_str().unwrap_or_default(),
) {
Ok(listener) => loop {
if let Err(e) =
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener)
.await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});
}

// Init. Early, before we start listening.
drop(SHM_LIMITER.lock());
Expand Down Expand Up @@ -143,6 +180,19 @@ where
}

pub fn enter_listener_loop<F, L, Fut, C>(acquire_listener: F) -> anyhow::Result<()>
where
F: FnOnce() -> io::Result<(L, C)>,
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Fut: Future<Output = io::Result<()>>,
C: Fn() + Sync + Send + 'static,
{
enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default())
}

pub fn enter_listener_loop_with_config<F, L, Fut, C>(
acquire_listener: F,
loop_config: MainLoopConfig,
) -> anyhow::Result<()>
where
F: FnOnce() -> io::Result<(L, C)>,
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Expand All @@ -159,7 +209,7 @@ where
let (listener, cancel) = acquire_listener()?;

runtime
.block_on(main_loop(listener, Arc::new(cancel)))
.block_on(main_loop(listener, Arc::new(cancel), loop_config))
.map_err(|e| e.into())
}

Expand Down
12 changes: 12 additions & 0 deletions datadog-sidecar/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ mod windows;
#[cfg(windows)]
pub use self::windows::*;

// Thread-based listener module (Unix)
#[cfg(unix)]
pub mod thread_listener;
#[cfg(unix)]
pub use thread_listener::{connect_to_master, MasterListener};

// Thread-based listener module (Windows)
#[cfg(windows)]
pub mod thread_listener_windows;
#[cfg(windows)]
pub use thread_listener_windows::{connect_to_master, MasterListener};

use datadog_ipc::platform::Channel;
use std::io;

Expand Down
Loading
Loading