Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,19 @@ begin

execute format('comment on column durable.%I.state is %L', 'c_' || p_queue_name, 'User-defined. Checkpoint value from ctx.step(). Any JSON-serializable value.');

-- Event table WITH metadata column
execute format(
'create table if not exists durable.%I (
event_name text primary key,
payload jsonb,
metadata jsonb,
emitted_at timestamptz not null default durable.current_time()
)',
'e_' || p_queue_name
);

execute format('comment on column durable.%I.payload is %L', 'e_' || p_queue_name, 'User-defined. Event payload. Internal child events use: {"status": "completed"|"failed"|"cancelled", "result"?: <json>, "error"?: <json>}');
execute format('comment on column durable.%I.metadata is %L', 'e_' || p_queue_name, 'System metadata (e.g., trace context for distributed tracing). Format: {"durable::otel_context": {...}}');

execute format(
'create table if not exists durable.%I (
Expand Down Expand Up @@ -1210,7 +1213,8 @@ create function durable.await_event (
)
returns table (
should_suspend boolean,
payload jsonb
payload jsonb,
metadata jsonb
)
language plpgsql
as $$
Expand All @@ -1219,6 +1223,7 @@ declare
v_run_task_id uuid;
v_existing_payload jsonb;
v_event_payload jsonb;
v_event_metadata jsonb;
v_checkpoint_payload jsonb;
v_resolved_payload jsonb;
v_timeout_at timestamptz;
Expand Down Expand Up @@ -1252,7 +1257,8 @@ begin
using p_task_id, p_step_name;

if v_checkpoint_payload is not null then
return query select false, v_checkpoint_payload;
-- Return from checkpoint without metadata (trace linking already done)
return query select false, v_checkpoint_payload, null::jsonb;
return;
end if;
-- Ensure a row exists for this event so we can take a row-level lock.
Expand Down Expand Up @@ -1313,13 +1319,14 @@ begin
raise exception 'Run "%" does not belong to task "%"', p_run_id, p_task_id;
end if;

-- Fetch event payload and metadata
execute format(
'select payload
'select payload, metadata
from durable.%I
where event_name = $1',
'e_' || p_queue_name
)
into v_event_payload
into v_event_payload, v_event_metadata
using p_event_name;

if v_existing_payload is not null then
Expand Down Expand Up @@ -1354,7 +1361,7 @@ begin
updated_at = excluded.updated_at',
'c_' || p_queue_name
) using p_task_id, p_step_name, v_resolved_payload, p_run_id, v_now;
return query select false, v_resolved_payload;
return query select false, v_resolved_payload, v_event_metadata;
return;
end if;

Expand All @@ -1366,7 +1373,7 @@ begin
'update durable.%I set wake_event = null where run_id = $1',
'r_' || p_queue_name
) using p_run_id;
return query select false, null::jsonb;
return query select false, null::jsonb, null::jsonb;
return;
end if;

Expand Down Expand Up @@ -1400,7 +1407,7 @@ begin
't_' || p_queue_name
) using p_task_id;

return query select true, null::jsonb;
return query select true, null::jsonb, null::jsonb;
return;
end;
$$;
Expand All @@ -1409,7 +1416,8 @@ $$;
create function durable.emit_event (
p_queue_name text,
p_event_name text,
p_payload jsonb default null
p_payload jsonb default null,
p_metadata jsonb default null
)
returns void
language plpgsql
Expand All @@ -1428,14 +1436,16 @@ begin
-- We use DO UPDATE WHERE payload IS NULL to handle the case where await_event
-- created a placeholder row before emit_event ran.
execute format(
'insert into durable.%I (event_name, payload, emitted_at)
values ($1, $2, $3)
'insert into durable.%I (event_name, payload, metadata, emitted_at)
values ($1, $2, $3, $4)
on conflict (event_name) do update
set payload = excluded.payload, emitted_at = excluded.emitted_at
set payload = excluded.payload,
metadata = excluded.metadata,
emitted_at = excluded.emitted_at
where durable.%I.payload is null',
'e_' || p_queue_name,
'e_' || p_queue_name
) using p_event_name, v_payload, v_now;
) using p_event_name, v_payload, p_metadata, v_now;

get diagnostics v_inserted_count = row_count;

Expand Down
15 changes: 14 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,24 @@ where

let payload_json = serde_json::to_value(payload)?;

let query = "SELECT durable.emit_event($1, $2, $3)";
#[allow(unused_mut)]
let mut metadata: Option<JsonValue> = None;

#[cfg(feature = "telemetry")]
{
let mut trace_context = HashMap::new();
crate::telemetry::inject_trace_context(&mut trace_context);
if !trace_context.is_empty() {
metadata = serde_json::to_value(trace_context).ok();
}
}

let query = "SELECT durable.emit_event($1, $2, $3, $4)";
sqlx::query(query)
.bind(queue)
.bind(event_name)
.bind(&payload_json)
.bind(&metadata)
.execute(executor)
.await?;

Expand Down
65 changes: 44 additions & 21 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ where

// Check cache for already-received event
if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) {
// No trace linking needed - already done on original receipt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace linking fails when task wakes from suspension

High Severity

The trace context metadata is never linked when a task resumes after being woken by emit_event. When emit_event wakes a sleeping task, it creates a checkpoint containing only the payload (not the metadata). On resume, TaskContext::create loads this checkpoint into checkpoint_cache. When await_event is called, it hits the cache at line 353 and returns directly, bypassing the SQL query that would fetch metadata from the events table. The trace linking code at lines 392-395 is never reached, breaking the PR's stated goal of linking traces from await_event to emit_event.

Additional Locations (1)

Fix in Cursor Fix in Web

return Ok(serde_json::from_value(cached.clone())?);
}

Expand All @@ -365,7 +366,7 @@ where
// Call await_event stored procedure
let timeout_secs = timeout.map(|d| d.as_secs() as i32);

let query = "SELECT should_suspend, payload
let query = "SELECT should_suspend, payload, metadata
FROM durable.await_event($1, $2, $3, $4, $5, $6)";

let result: AwaitEventResult = sqlx::query_as(query)
Expand All @@ -383,10 +384,35 @@ where
}

// Event arrived - cache and return
let payload = result.payload.unwrap_or(JsonValue::Null);
let payload_json = result.payload.unwrap_or(JsonValue::Null);
self.checkpoint_cache
.insert(checkpoint_name, payload.clone());
Ok(serde_json::from_value(payload)?)
.insert(checkpoint_name, payload_json.clone());

// Link trace context if metadata is present
#[cfg(feature = "telemetry")]
if let Some(ref metadata) = result.metadata {
Self::link_trace_from_metadata(metadata);
}

Ok(serde_json::from_value(payload_json)?)
}

#[cfg(feature = "telemetry")]
fn link_trace_from_metadata(metadata: &JsonValue) {
use opentelemetry::KeyValue;
use opentelemetry::trace::TraceContextExt;
use std::collections::HashMap;
use tracing_opentelemetry::OpenTelemetrySpanExt;

if let Ok(trace_context) =
serde_json::from_value::<HashMap<String, JsonValue>>(metadata.clone())
{
let context = crate::telemetry::extract_trace_context(&trace_context);
tracing::Span::current().add_link_with_attributes(
context.span().span_context().clone(),
vec![KeyValue::new("sentry.link.type", "previous_trace")],
);
}
}

/// Emit an event to this task's queue.
Expand All @@ -404,22 +430,13 @@ where
)
)]
pub async fn emit_event<T: Serialize>(&self, event_name: &str, payload: &T) -> TaskResult<()> {
if event_name.is_empty() {
return Err(TaskError::Validation {
message: "event_name must be non-empty".to_string(),
});
}

let payload_json = serde_json::to_value(payload)?;
let query = "SELECT durable.emit_event($1, $2, $3)";
sqlx::query(query)
.bind(self.durable.queue_name())
.bind(event_name)
.bind(&payload_json)
.execute(self.durable.pool())
.await?;

Ok(())
self.durable
.emit_event(event_name, payload, None)
.await
.map_err(|e| TaskError::EmitEventFailed {
event_name: event_name.to_string(),
error: e,
})
}

/// Extend the task's lease to prevent timeout.
Expand Down Expand Up @@ -706,7 +723,7 @@ where
}

// Call await_event stored procedure (no timeout for join - we wait indefinitely)
let query = "SELECT should_suspend, payload
let query = "SELECT should_suspend, payload, metadata
FROM durable.await_event($1, $2, $3, $4, $5, $6)";

let result: AwaitEventResult = sqlx::query_as(query)
Expand All @@ -728,6 +745,12 @@ where
self.checkpoint_cache
.insert(checkpoint_name, payload_json.clone());

// Link trace context if metadata is present
#[cfg(feature = "telemetry")]
if let Some(ref metadata) = result.metadata {
Self::link_trace_from_metadata(metadata);
}

let payload: ChildCompletePayload = serde_json::from_value(payload_json)?;
Self::process_child_payload(&step_name, payload)
}
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub enum TaskError {
#[error("failed to spawn subtask `{name}`: {error}")]
SubtaskSpawnFailed { name: String, error: DurableError },

/// Error occurred while trying to emit an event.
#[error("failed to emit event `{event_name}`: {error}")]
EmitEventFailed {
event_name: String,
error: DurableError,
},

/// A child task failed.
///
/// Returned by [`TaskContext::join`](crate::TaskContext::join) when the child
Expand Down Expand Up @@ -231,6 +238,13 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue {
"subtask_name": name,
})
}
TaskError::EmitEventFailed { event_name, error } => {
serde_json::json!({
"name": "EmitEventFailed",
"message": error.to_string(),
"event_name": event_name,
})
}
TaskError::ChildFailed { step_name, message } => {
serde_json::json!({
"name": "ChildFailed",
Expand Down
Loading