From aa2eb572a373400b5549ac544c4f9a5febb5717e Mon Sep 17 00:00:00 2001 From: Michael Chou Date: Wed, 10 Jun 2026 16:01:13 -0700 Subject: [PATCH] update task params in db; send temporal signal to workflow --- agentex/openapi.yaml | 78 +++++++++++++++++++ agentex/src/api/routes/tasks.py | 27 +++++++ agentex/src/api/schemas/tasks.py | 32 ++++++++ .../domain/repositories/task_repository.py | 42 +++++++++- agentex/src/domain/services/task_service.py | 10 +++ .../src/domain/use_cases/tasks_use_case.py | 46 +++++++++++ 6 files changed, 234 insertions(+), 1 deletion(-) diff --git a/agentex/openapi.yaml b/agentex/openapi.yaml index b122020e..0ad7f0fb 100644 --- a/agentex/openapi.yaml +++ b/agentex/openapi.yaml @@ -824,6 +824,42 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /tasks/{task_id}/signal: + post: + tags: + - Tasks + summary: Signal Task + description: Dispatch a Temporal signal to the task's workflow. The workflow + must have a `@workflow.signal(name=...)` handler registered for the supplied + `signal_name`; the optional `payload` is forwarded as the signal's single + argument. Returns 4xx if the task isn't currently RUNNING. + operationId: signal_task_tasks__task_id__signal_post + parameters: + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SignalTaskRequest' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/Task' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /tasks/{task_id}/stream: get: tags: @@ -5638,6 +5674,48 @@ components: required: - content title: SendMessageRequest + SignalTaskRequest: + properties: + signal_name: + type: string + title: Name of the Temporal signal to dispatch + payload: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional JSON payload forwarded to the workflow's signal handler + merge_params: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Optional shallow-merge patch applied to the task's params column + after the signal succeeds. Top-level keys overwrite; pass full nested + objects to change subfields. + type: object + required: + - signal_name + title: SignalTaskRequest + description: 'Dispatch a Temporal signal to a running task''s workflow. + + + The workflow must register a matching `@workflow.signal(name=...)` + + handler for the supplied ``signal_name``. ``payload`` is forwarded as + + the signal''s single argument; the workflow is responsible for any + + shape validation. + + + ``merge_params`` is an optional shallow-merge into the task''s stored + + ``params`` JSONB column. Used by live-config flows that want the + + persisted task row to reflect the new config alongside the workflow + + signal (e.g. ConfigModal Save).' Span: properties: id: diff --git a/agentex/src/api/routes/tasks.py b/agentex/src/api/routes/tasks.py index dc557ed3..af196dec 100644 --- a/agentex/src/api/routes/tasks.py +++ b/agentex/src/api/routes/tasks.py @@ -12,6 +12,7 @@ ) from src.api.schemas.delete_response import DeleteResponse from src.api.schemas.tasks import ( + SignalTaskRequest, Task, TaskRelationships, TaskResponse, @@ -302,6 +303,32 @@ async def timeout_task( return Task.model_validate(updated) +@router.post( + "/{task_id}/signal", + response_model=Task, + summary="Signal Task", + description=( + "Dispatch a Temporal signal to the task's workflow. The workflow " + "must have a `@workflow.signal(name=...)` handler registered for " + "the supplied `signal_name`; the optional `payload` is forwarded " + "as the signal's single argument. Returns 4xx if the task isn't " + "currently RUNNING." + ), +) +async def signal_task( + task_id: DAuthorizedId(AgentexResourceType.task, AuthorizedOperationType.update), + request: SignalTaskRequest, + task_use_case: DTaskUseCase, +) -> Task: + updated = await task_use_case.signal_task( + id=task_id, + signal_name=request.signal_name, + payload=request.payload, + merge_params=request.merge_params, + ) + return Task.model_validate(updated) + + @router.get( "/{task_id}/stream", summary="Stream Task Events by ID", diff --git a/agentex/src/api/schemas/tasks.py b/agentex/src/api/schemas/tasks.py index 9a8a53dd..bf6e52f6 100644 --- a/agentex/src/api/schemas/tasks.py +++ b/agentex/src/api/schemas/tasks.py @@ -84,3 +84,35 @@ class TaskStatusReasonRequest(BaseModel): None, title="Optional reason for the status change", ) + + +class SignalTaskRequest(BaseModel): + """Dispatch a Temporal signal to a running task's workflow. + + The workflow must register a matching `@workflow.signal(name=...)` + handler for the supplied ``signal_name``. ``payload`` is forwarded as + the signal's single argument; the workflow is responsible for any + shape validation. + + ``merge_params`` is an optional shallow-merge into the task's stored + ``params`` JSONB column. Used by live-config flows that want the + persisted task row to reflect the new config alongside the workflow + signal (e.g. ConfigModal Save). + """ + + signal_name: str = Field( + ..., + title="Name of the Temporal signal to dispatch", + ) + payload: dict[str, Any] | None = Field( + None, + title="Optional JSON payload forwarded to the workflow's signal handler", + ) + merge_params: dict[str, Any] | None = Field( + None, + title=( + "Optional shallow-merge patch applied to the task's params column " + "after the signal succeeds. Top-level keys overwrite; pass full " + "nested objects to change subfields." + ), + ) diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index 231c0e9c..96949bc3 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -3,7 +3,8 @@ from typing import Annotated, Literal from fastapi import Depends -from sqlalchemy import distinct, func, select, update +from sqlalchemy import cast, distinct, func, select, update +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import selectinload from src.adapters.crud_store.adapter_postgres import ( ColumnPrimitiveValue, @@ -213,6 +214,45 @@ async def update(self, task: TaskEntity) -> TaskEntity: # Return with agents populated return TaskEntity.model_validate(modified_orm) + async def merge_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically shallow-merge ``patch`` into the task's ``params`` + JSONB column. Returns the updated entity, or ``None`` if no task + with that id exists. + + Uses Postgres' ``||`` operator on JSONB so the read-modify-write + is collapsed to a single statement (no race with concurrent + writers). ``patch`` keys overwrite existing keys at the top + level; nested objects are NOT deep-merged — pass the full nested + replacement if you need to change a subfield. + + Intended for surfaces that need ``tasks.params`` to reflect the + latest agent config after a live edit (see + ``TasksUseCase.signal_task``); not a general-purpose updater. + """ + + async with ( + self.start_async_db_session(True) as session, + async_sql_exception_handler(), + ): + # ``COALESCE(params, '{}'::jsonb)`` so a NULL existing value + # doesn't poison the concat to NULL. Both operands cast to + # JSONB explicitly so Postgres picks the JSONB ``||`` operator + # (not the text concat overload). + existing = func.coalesce(TaskORM.params, cast({}, JSONB)) + merged = existing.op("||", return_type=JSONB)(cast(patch, JSONB)) + stmt = ( + update(TaskORM) + .where(TaskORM.id == task_id) + .values(params=merged) + .returning(TaskORM) + ) + result = await session.execute(stmt) + row = result.scalar_one_or_none() + await session.commit() + if row is None: + return None + return TaskEntity.model_validate(row) + async def transition_status( self, task_id: str, diff --git a/agentex/src/domain/services/task_service.py b/agentex/src/domain/services/task_service.py index 809dd954..9cdee218 100644 --- a/agentex/src/domain/services/task_service.py +++ b/agentex/src/domain/services/task_service.py @@ -242,6 +242,16 @@ async def update_task(self, task: TaskEntity) -> TaskEntity: return updated_task + async def merge_task_params(self, task_id: str, patch: dict) -> TaskEntity | None: + """Atomically merge ``patch`` into ``tasks.params``. Returns the + updated entity, or ``None`` if no task with ``task_id`` exists. + + Used by live-config flows (e.g. ConfigModal Save → signal + + DB persistence) so the persisted task row reflects the new + agent config without waiting for the next task to be created. + """ + return await self.task_repository.merge_params(task_id, patch) + async def delete_task(self, id: str | None = None, name: str | None = None) -> None: """ Delete a task from the repository. diff --git a/agentex/src/domain/use_cases/tasks_use_case.py b/agentex/src/domain/use_cases/tasks_use_case.py index 4ad1a61a..f3e30816 100644 --- a/agentex/src/domain/use_cases/tasks_use_case.py +++ b/agentex/src/domain/use_cases/tasks_use_case.py @@ -3,6 +3,7 @@ from fastapi import Depends from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.adapters.temporal.adapter_temporal import DTemporalAdapter from src.domain.entities.tasks import TaskEntity, TaskRelationships, TaskStatus from src.domain.exceptions import ClientError from src.domain.services.task_service import DAgentTaskService @@ -19,8 +20,10 @@ class TasksUseCase: def __init__( self, task_service: DAgentTaskService, + temporal_adapter: DTemporalAdapter, ): self.task_service = task_service + self.temporal_adapter = temporal_adapter async def get_task( self, @@ -192,5 +195,48 @@ async def timeout_task( TaskStatus.TIMED_OUT, id=id, name=name, reason=reason ) + async def signal_task( + self, + id: str, + signal_name: str, + payload: dict[str, Any] | None = None, + merge_params: dict[str, Any] | None = None, + ) -> TaskEntity: + """Dispatch a Temporal signal to a running task's workflow. + + Validates that the task is currently RUNNING before signaling — + Temporal will raise its own ``WorkflowNotFoundError`` for closed + workflows, but returning a 4xx-mapped ``ClientError`` upfront is + friendlier than the bare-exception round-trip. The signal name + + payload are forwarded as-is; the workflow's registered + ``@workflow.signal`` handler owns shape validation. + + ``workflow_id`` matches the task id in this codebase (see + TasksService.create_task — Temporal workflows are started with + ``workflow_id=task.id``). + + Optional ``merge_params``: if supplied, shallow-merges into the + task's ``params`` JSONB column after the signal dispatch + succeeds. Used by live-config flows so the persisted task row + reflects the new agent config without waiting for the next task + to be created. Returns the task entity post-merge (or pre-merge + if no patch was given). + """ + task = await self.task_service.get_task(id=id) + if task.status != TaskStatus.RUNNING: + raise ClientError( + f"Task {id} is not running (status={task.status}); cannot signal." + ) + await self.temporal_adapter.signal_workflow( + workflow_id=id, + signal=signal_name, + arg=payload, + ) + if merge_params: + merged = await self.task_service.merge_task_params(id, merge_params) + if merged is not None: + return merged + return task + DTaskUseCase = Annotated[TasksUseCase, Depends(TasksUseCase)]