Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions agentex/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,42 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/tasks/{task_id}/signal:
post:
tags:
- Tasks
summary: Signal Task
description: Dispatch a Temporal signal to the task's workflow. The workflow
must have a `@workflow.signal(name=...)` handler registered for the supplied
`signal_name`; the optional `payload` is forwarded as the signal's single
argument. Returns 4xx if the task isn't currently RUNNING.
operationId: signal_task_tasks__task_id__signal_post
parameters:
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/SignalTaskRequest'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/Task'
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/tasks/{task_id}/stream:
get:
tags:
Expand Down Expand Up @@ -5638,6 +5674,48 @@ components:
required:
- content
title: SendMessageRequest
SignalTaskRequest:
properties:
signal_name:
type: string
title: Name of the Temporal signal to dispatch
payload:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Optional JSON payload forwarded to the workflow's signal handler
merge_params:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Optional shallow-merge patch applied to the task's params column
after the signal succeeds. Top-level keys overwrite; pass full nested
objects to change subfields.
type: object
required:
- signal_name
title: SignalTaskRequest
description: 'Dispatch a Temporal signal to a running task''s workflow.


The workflow must register a matching `@workflow.signal(name=...)`

handler for the supplied ``signal_name``. ``payload`` is forwarded as

the signal''s single argument; the workflow is responsible for any

shape validation.


``merge_params`` is an optional shallow-merge into the task''s stored

``params`` JSONB column. Used by live-config flows that want the

persisted task row to reflect the new config alongside the workflow

signal (e.g. ConfigModal Save).'
Span:
properties:
id:
Expand Down
27 changes: 27 additions & 0 deletions agentex/src/api/routes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from src.api.schemas.delete_response import DeleteResponse
from src.api.schemas.tasks import (
SignalTaskRequest,
Task,
TaskRelationships,
TaskResponse,
Expand Down Expand Up @@ -302,6 +303,32 @@ async def timeout_task(
return Task.model_validate(updated)


@router.post(
"/{task_id}/signal",
response_model=Task,
summary="Signal Task",
description=(
"Dispatch a Temporal signal to the task's workflow. The workflow "
"must have a `@workflow.signal(name=...)` handler registered for "
"the supplied `signal_name`; the optional `payload` is forwarded "
"as the signal's single argument. Returns 4xx if the task isn't "
"currently RUNNING."
),
)
async def signal_task(
task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update),
request: SignalTaskRequest,
task_use_case: DTaskUseCase,
) -> Task:
updated = await task_use_case.signal_task(
id=task_id,
signal_name=request.signal_name,
payload=request.payload,
merge_params=request.merge_params,
)
return Task.model_validate(updated)


@router.get(
"/{task_id}/stream",
summary="Stream Task Events by ID",
Expand Down
32 changes: 32 additions & 0 deletions agentex/src/api/schemas/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,35 @@ class TaskStatusReasonRequest(BaseModel):
None,
title="Optional reason for the status change",
)


class SignalTaskRequest(BaseModel):
"""Dispatch a Temporal signal to a running task's workflow.

The workflow must register a matching `@workflow.signal(name=...)`
handler for the supplied ``signal_name``. ``payload`` is forwarded as
the signal's single argument; the workflow is responsible for any
shape validation.

``merge_params`` is an optional shallow-merge into the task's stored
``params`` JSONB column. Used by live-config flows that want the
persisted task row to reflect the new config alongside the workflow
signal (e.g. ConfigModal Save).
"""

signal_name: str = Field(
...,
title="Name of the Temporal signal to dispatch",
)
payload: dict[str, Any] | None = Field(
None,
title="Optional JSON payload forwarded to the workflow's signal handler",
)
merge_params: dict[str, Any] | None = Field(
None,
title=(
"Optional shallow-merge patch applied to the task's params column "
"after the signal succeeds. Top-level keys overwrite; pass full "
"nested objects to change subfields."
),
)
42 changes: 41 additions & 1 deletion agentex/src/domain/repositories/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Annotated, Literal

from fastapi import Depends
from sqlalchemy import distinct, func, select, update
from sqlalchemy import cast, distinct, func, select, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import selectinload
from src.adapters.crud_store.adapter_postgres import (
ColumnPrimitiveValue,
Expand Down Expand Up @@ -213,6 +214,45 @@ async def update(self, task: TaskEntity) -> TaskEntity:
# Return with agents populated
return TaskEntity.model_validate(modified_orm)

async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None:
"""Atomically shallow-merge ``patch`` into the task's ``params``
JSONB column. Returns the updated entity, or ``None`` if no task
with that id exists.

Uses Postgres' ``||`` operator on JSONB so the read-modify-write
is collapsed to a single statement (no race with concurrent
writers). ``patch`` keys overwrite existing keys at the top
level; nested objects are NOT deep-merged — pass the full nested
replacement if you need to change a subfield.

Intended for surfaces that need ``tasks.params`` to reflect the
latest agent config after a live edit (see
``TasksUseCase.signal_task``); not a general-purpose updater.
"""

async with (
self.start_async_db_session(True) as session,
async_sql_exception_handler(),
):
# ``COALESCE(params, '{}'::jsonb)`` so a NULL existing value
# doesn't poison the concat to NULL. Both operands cast to
# JSONB explicitly so Postgres picks the JSONB ``||`` operator
# (not the text concat overload).
existing = func.coalesce(TaskORM.params, cast({}, JSONB))
merged = existing.op("||", return_type=JSONB)(cast(patch, JSONB))
stmt = (
update(TaskORM)
.where(TaskORM.id == task_id)
.values(params=merged)
.returning(TaskORM)
)
result = await session.execute(stmt)
row = result.scalar_one_or_none()
await session.commit()
if row is None:
return None
return TaskEntity.model_validate(row)

async def transition_status(
self,
task_id: str,
Expand Down
10 changes: 10 additions & 0 deletions agentex/src/domain/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ async def update_task(self, task: TaskEntity) -> TaskEntity:

return updated_task

async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None:
"""Atomically merge ``patch`` into ``tasks.params``. Returns the
updated entity, or ``None`` if no task with ``task_id`` exists.

Used by live-config flows (e.g. ConfigModal Save → signal +
DB persistence) so the persisted task row reflects the new
agent config without waiting for the next task to be created.
"""
return await self.task_repository.merge_params(task_id, patch)

async def delete_task(self, id: str | None = None, name: str | None = None) -> None:
"""
Delete a task from the repository.
Expand Down
46 changes: 46 additions & 0 deletions agentex/src/domain/use_cases/tasks_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from fastapi import Depends

from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.adapters.temporal.adapter_temporal import DTemporalAdapter
from src.domain.entities.tasks import TaskEntity, TaskRelationships, TaskStatus
from src.domain.exceptions import ClientError
from src.domain.services.task_service import DAgentTaskService
Expand All @@ -19,8 +20,10 @@ class TasksUseCase:
def __init__(
self,
task_service: DAgentTaskService,
temporal_adapter: DTemporalAdapter,
):
self.task_service = task_service
self.temporal_adapter = temporal_adapter

async def get_task(
self,
Expand Down Expand Up @@ -192,5 +195,48 @@ async def timeout_task(
TaskStatus.TIMED_OUT, id=id, name=name, reason=reason
)

async def signal_task(
self,
id: str,
signal_name: str,
payload: dict[str, Any] | None = None,
merge_params: dict[str, Any] | None = None,
) -> TaskEntity:
"""Dispatch a Temporal signal to a running task's workflow.

Validates that the task is currently RUNNING before signaling —
Temporal will raise its own ``WorkflowNotFoundError`` for closed
workflows, but returning a 4xx-mapped ``ClientError`` upfront is
friendlier than the bare-exception round-trip. The signal name +
payload are forwarded as-is; the workflow's registered
``@workflow.signal`` handler owns shape validation.

``workflow_id`` matches the task id in this codebase (see
TasksService.create_task — Temporal workflows are started with
``workflow_id=task.id``).

Optional ``merge_params``: if supplied, shallow-merges into the
task's ``params`` JSONB column after the signal dispatch
succeeds. Used by live-config flows so the persisted task row
reflects the new agent config without waiting for the next task
to be created. Returns the task entity post-merge (or pre-merge
if no patch was given).
"""
task = await self.task_service.get_task(id=id)
if task.status != TaskStatus.RUNNING:
raise ClientError(
f"Task {id} is not running (status={task.status}); cannot signal."
)
await self.temporal_adapter.signal_workflow(
workflow_id=id,
signal=signal_name,
arg=payload,
)
if merge_params:
merged = await self.task_service.merge_task_params(id, merge_params)
if merged is not None:
return merged
return task
Comment on lines +235 to +239

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent stale-entity return when DB merge is skipped

When merge_params is provided, merge_task_params returns None only if the task row wasn't found by the UPDATE ... WHERE id = ? — meaning the task was deleted between the RUNNING guard above and the DB write. In that case the signal was already dispatched to Temporal (the workflow proceeds with the new config), but the code silently falls through to return task, handing the caller a 200 with the pre-signal entity and never surfacing that the DB write failed. The caller has no way to distinguish "merge succeeded" from "merge was silently dropped".

Prompt To Fix With AI
This is a comment left during a code review.
Path: agentex/src/domain/use_cases/tasks_use_case.py
Line: 235-239

Comment:
**Silent stale-entity return when DB merge is skipped**

When `merge_params` is provided, `merge_task_params` returns `None` only if the task row wasn't found by the `UPDATE ... WHERE id = ?` — meaning the task was deleted between the RUNNING guard above and the DB write. In that case the signal was already dispatched to Temporal (the workflow proceeds with the new config), but the code silently falls through to `return task`, handing the caller a 200 with the *pre-signal* entity and never surfacing that the DB write failed. The caller has no way to distinguish "merge succeeded" from "merge was silently dropped".

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex



DTaskUseCase = Annotated[TasksUseCase, Depends(TasksUseCase)]
Loading