Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,10 @@ begin
perform durable.emit_event(
p_queue_name,
'$child:' || p_task_id::text,
jsonb_build_object('status', p_status) || coalesce(p_payload, '{}'::jsonb)
jsonb_build_object(
'inner', jsonb_build_object('status', p_status) || coalesce(p_payload, '{}'::jsonb),
'metadata', '{}'::jsonb
)
);
end if;

Expand Down Expand Up @@ -1423,6 +1426,23 @@ begin
raise exception 'event_name must be provided';
end if;

-- Validate that if p_payload is not null, it has exactly the allowed keys ('inner' and 'metadata')
if p_payload is not null and jsonb_typeof(p_payload) = 'object' then
if exists (
select 1
from jsonb_object_keys(p_payload) as k
where k not in ('inner', 'metadata')
) then
raise exception 'p_payload may only contain ''inner'' and ''metadata'' keys';
end if;
if not p_payload ? 'inner' then
raise exception 'p_payload must contain an ''inner'' key';
end if;
if not p_payload ? 'metadata' then
raise exception 'p_payload must contain a ''metadata'' key';
end if;
end if;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL validation allows non-object payloads causing Rust errors

Low Severity

The SQL validation in emit_event only validates payloads when jsonb_typeof(p_payload) = 'object'. Non-object JSON values (strings, numbers, arrays, booleans) bypass validation entirely but will cause deserialization failures in Rust, since AwaitEventResult.payload expects a DurableEventPayload structure. The reviewer requested to "enforce that exactly these two keys are present" which implies non-conforming payloads should be rejected, not silently allowed. Direct SQL calls with non-object payloads would store invalid data.

Additional Locations (1)

Fix in Cursor Fix in Web


-- Insert the event into the events table (first-writer-wins).
-- Subsequent emits for the same event are no-ops.
-- We use DO UPDATE WHERE payload IS NULL to handle the case where await_event
Expand Down
20 changes: 17 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use uuid::Uuid;
use crate::error::{DurableError, DurableResult};
use crate::task::{Task, TaskRegistry, TaskWrapper};
use crate::types::{
CancellationPolicy, RetryStrategy, SpawnDefaults, SpawnOptions, SpawnResult, SpawnResultRow,
WorkerOptions,
CancellationPolicy, DurableEventPayload, RetryStrategy, SpawnDefaults, SpawnOptions,
SpawnResult, SpawnResultRow, WorkerOptions,
};

/// Internal struct for serializing spawn options to the database.
Expand Down Expand Up @@ -684,7 +684,21 @@ where
#[cfg(feature = "telemetry")]
tracing::Span::current().record("queue", queue);

let payload_json = serde_json::to_value(payload)?;
let inner_payload_json = serde_json::to_value(payload)?;

let mut payload_wrapper = DurableEventPayload {
inner: inner_payload_json,
metadata: JsonValue::Null,
};

#[allow(unused_mut)] // mut is needed when telemetry feature is enabled
let mut metadata_map: HashMap<String, JsonValue> = HashMap::new();

#[cfg(feature = "telemetry")]
crate::telemetry::inject_trace_context(&mut metadata_map);
payload_wrapper.metadata = serde_json::to_value(metadata_map)?;

let payload_json = serde_json::to_value(payload_wrapper)?;

let query = "SELECT durable.emit_event($1, $2, $3)";
sqlx::query(query)
Expand Down
88 changes: 60 additions & 28 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use uuid::Uuid;
use crate::Durable;
use crate::error::{ControlFlow, TaskError, TaskResult};
use crate::task::Task;
use crate::types::DurableEventPayload;
use crate::types::{
AwaitEventResult, CheckpointRow, ChildCompletePayload, ChildStatus, ClaimedTask, SpawnOptions,
TaskHandle,
Expand Down Expand Up @@ -351,7 +352,9 @@ where

// Check cache for already-received event
if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) {
return Ok(serde_json::from_value(cached.clone())?);
let durable_event_payload: DurableEventPayload =
serde_json::from_value(cached.clone())?;
return self.process_event_payload_wrapper(durable_event_payload);
}

// Check if we were woken by this event but it timed out (null payload)
Expand Down Expand Up @@ -383,10 +386,39 @@ where
}

// Event arrived - cache and return
let payload = result.payload.unwrap_or(JsonValue::Null);
self.checkpoint_cache
.insert(checkpoint_name, payload.clone());
Ok(serde_json::from_value(payload)?)
let durable_event_payload = result.payload.unwrap_or(DurableEventPayload {
inner: JsonValue::Null,
metadata: JsonValue::Null,
});
self.checkpoint_cache.insert(
checkpoint_name,
serde_json::to_value(durable_event_payload.clone())?,
);

self.process_event_payload_wrapper(durable_event_payload)
}

fn process_event_payload_wrapper<T: DeserializeOwned>(
&self,
value: DurableEventPayload,
) -> TaskResult<T> {
#[cfg(feature = "telemetry")]
{
use opentelemetry::KeyValue;
use opentelemetry::trace::TraceContextExt;
use tracing_opentelemetry::OpenTelemetrySpanExt;

let metadata: Option<HashMap<String, JsonValue>> =
serde_json::from_value(value.metadata)?;
if let Some(metadata) = metadata {
let context = crate::telemetry::extract_trace_context(&metadata);
tracing::Span::current().add_link_with_attributes(
context.span().span_context().clone(),
vec![KeyValue::new("sentry.link.type", "previous_trace")],
);
}
}
Ok(serde_json::from_value(value.inner)?)
}

/// Emit an event to this task's queue.
Expand All @@ -404,22 +436,13 @@ where
)
)]
pub async fn emit_event<T: Serialize>(&self, event_name: &str, payload: &T) -> TaskResult<()> {
if event_name.is_empty() {
return Err(TaskError::Validation {
message: "event_name must be non-empty".to_string(),
});
}

let payload_json = serde_json::to_value(payload)?;
let query = "SELECT durable.emit_event($1, $2, $3)";
sqlx::query(query)
.bind(self.durable.queue_name())
.bind(event_name)
.bind(&payload_json)
.execute(self.durable.pool())
.await?;

Ok(())
self.durable
.emit_event(event_name, payload, None)
.await
.map_err(|e| TaskError::EmitEventFailed {
event_name: event_name.to_string(),
error: e,
})
}

/// Extend the task's lease to prevent timeout.
Expand Down Expand Up @@ -693,8 +716,11 @@ where

// Check cache for already-received event
if let Some(cached) = self.checkpoint_cache.get(&checkpoint_name) {
let payload: ChildCompletePayload = serde_json::from_value(cached.clone())?;
return Self::process_child_payload(&step_name, payload);
let durable_event_payload: DurableEventPayload =
serde_json::from_value(cached.clone())?;
let child_complete_payload: ChildCompletePayload =
self.process_event_payload_wrapper(durable_event_payload)?;
return Self::process_child_payload(&step_name, child_complete_payload);
}

// Check if we were woken by this event but it timed out (null payload)
Expand Down Expand Up @@ -724,12 +750,18 @@ where
}

// Event arrived - parse and return
let payload_json = result.payload.unwrap_or(JsonValue::Null);
self.checkpoint_cache
.insert(checkpoint_name, payload_json.clone());
let durable_event_payload = result.payload.unwrap_or(DurableEventPayload {
inner: JsonValue::Null,
metadata: JsonValue::Null,
});
self.checkpoint_cache.insert(
checkpoint_name,
serde_json::to_value(durable_event_payload.clone())?,
);

let payload: ChildCompletePayload = serde_json::from_value(payload_json)?;
Self::process_child_payload(&step_name, payload)
let child_complete_payload: ChildCompletePayload =
self.process_event_payload_wrapper(durable_event_payload)?;
Self::process_child_payload(&step_name, child_complete_payload)
}

/// Process the child completion payload and return the appropriate result.
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub enum TaskError {
#[error("failed to spawn subtask `{name}`: {error}")]
SubtaskSpawnFailed { name: String, error: DurableError },

/// Error occurred while trying to emit an event.
#[error("failed to emit event `{event_name}`: {error}")]
EmitEventFailed {
event_name: String,
error: DurableError,
},

/// A child task failed.
///
/// Returned by [`TaskContext::join`](crate::TaskContext::join) when the child
Expand Down Expand Up @@ -231,6 +238,13 @@ pub fn serialize_task_error(err: &TaskError) -> JsonValue {
"subtask_name": name,
})
}
TaskError::EmitEventFailed { event_name, error } => {
serde_json::json!({
"name": "EmitEventFailed",
"message": error.to_string(),
"event_name": event_name,
})
}
TaskError::ChildFailed { step_name, message } => {
serde_json::json!({
"name": "ChildFailed",
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ pub use context::TaskContext;
pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult};
pub use task::{ErasedTask, Task, TaskWrapper};
pub use types::{
CancellationPolicy, ClaimedTask, RetryStrategy, SpawnDefaults, SpawnOptions, SpawnResult,
TaskHandle, WorkerOptions,
CancellationPolicy, ClaimedTask, DurableEventPayload, RetryStrategy, SpawnDefaults,
SpawnOptions, SpawnResult, TaskHandle, WorkerOptions,
};
pub use worker::Worker;

Expand Down
Loading