From 0d92a25b2ebce6a36b0d68e247f0c4f72dacb1c1 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Sat, 7 Feb 2026 10:38:35 +0900 Subject: [PATCH 1/3] HITL updates --- .../agent_framework/_workflows/_runner.py | 15 + .../_workflows/_runner_context.py | 29 ++ .../tests/workflow/test_fan_out_hitl_pause.py | 431 ++++++++++++++++++ 3 files changed, 475 insertions(+) create mode 100644 python/packages/core/tests/workflow/test_fan_out_hitl_pause.py diff --git a/python/packages/core/agent_framework/_workflows/_runner.py b/python/packages/core/agent_framework/_workflows/_runner.py index f3a475e034..41fb2eb197 100644 --- a/python/packages/core/agent_framework/_workflows/_runner.py +++ b/python/packages/core/agent_framework/_workflows/_runner.py @@ -104,6 +104,9 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]: logger.info(f"Starting superstep {self._iteration + 1}") yield WorkflowEvent.superstep_started(iteration=self._iteration + 1) + # Reset per-superstep tracking for HITL detection + self._ctx.reset_superstep_request_info_tracking() + # Run iteration concurrently with live event streaming: we poll # for new events while the iteration coroutine progresses. iteration_task = asyncio.create_task(self._run_iteration()) @@ -149,6 +152,18 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]: yield WorkflowEvent.superstep_completed(iteration=self._iteration) + # Check for HITL pause: if any request_info events were emitted + # during this superstep, pause the workflow even if there are pending + # messages. This prevents parallel nodes from continuing to run while + # HITL input is needed. Pending messages are preserved in memory and + # will be delivered alongside HITL responses in the next run. + if self._ctx.had_request_info_in_superstep(): + logger.info( + f"Pausing workflow after superstep {self._iteration}: " + "request_info event(s) emitted during fan-out execution" + ) + break + # Check for convergence: no more messages to process if not await self._ctx.has_messages(): break diff --git a/python/packages/core/agent_framework/_workflows/_runner_context.py b/python/packages/core/agent_framework/_workflows/_runner_context.py index ed81026245..8d0f56e7dd 100644 --- a/python/packages/core/agent_framework/_workflows/_runner_context.py +++ b/python/packages/core/agent_framework/_workflows/_runner_context.py @@ -281,6 +281,18 @@ async def get_pending_request_info_events(self) -> dict[str, WorkflowEvent[Any]] """ ... + def reset_superstep_request_info_tracking(self) -> None: + """Reset tracking of new request_info events for the current superstep.""" + ... + + def had_request_info_in_superstep(self) -> bool: + """Check if any request_info events were emitted during the current superstep. + + Returns: + True if at least one request_info event was emitted since the last reset. + """ + ... + class InProcRunnerContext: """In-process execution context for local execution and optional checkpointing.""" @@ -306,6 +318,9 @@ def __init__(self, checkpoint_storage: CheckpointStorage | None = None): # Streaming flag - set by workflow's run(..., stream=True) vs run(..., stream=False) self._streaming: bool = False + # Track whether new request_info events were emitted during the current superstep + self._new_request_info_in_superstep: bool = False + # region Messaging and Events async def send_message(self, message: Message) -> None: self._messages.setdefault(message.source_id, []) @@ -415,6 +430,7 @@ def reset_for_new_run(self) -> None: # Clear any pending events (best-effort) by recreating the queue self._event_queue = asyncio.Queue() self._streaming = False # Reset streaming flag + self._new_request_info_in_superstep = False async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None: """Apply a checkpoint to the current context, mutating its state.""" @@ -481,6 +497,7 @@ async def add_request_info_event(self, event: WorkflowEvent[Any]) -> None: if event.request_id is None: raise ValueError("request_info event must have a request_id") self._pending_request_info_events[event.request_id] = event + self._new_request_info_in_superstep = True await self.add_event(event) async def send_request_info_response(self, request_id: str, response: Any) -> None: @@ -521,3 +538,15 @@ async def get_pending_request_info_events(self) -> dict[str, WorkflowEvent[Any]] A dictionary mapping request IDs to their corresponding WorkflowEvent (type='request_info'). """ return dict(self._pending_request_info_events) + + def reset_superstep_request_info_tracking(self) -> None: + """Reset tracking of new request_info events for the current superstep.""" + self._new_request_info_in_superstep = False + + def had_request_info_in_superstep(self) -> bool: + """Check if any request_info events were emitted during the current superstep. + + Returns: + True if at least one request_info event was emitted since the last reset. + """ + return self._new_request_info_in_superstep diff --git a/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py b/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py new file mode 100644 index 0000000000..2c691902ba --- /dev/null +++ b/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py @@ -0,0 +1,431 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for fan-out HITL pause behavior. + +When a workflow fans out to parallel nodes and one node requests HITL (via request_info), +the workflow should pause at the superstep boundary. Messages from other parallel nodes +are preserved and delivered alongside HITL responses in the next run. + +See: https://github.com/microsoft/agent-framework/issues/3539 +""" + +from dataclasses import dataclass + +import pytest + +from agent_framework import ( + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + WorkflowRunState, + handler, + response_handler, +) +from agent_framework._workflows._request_info_mixin import RequestInfoMixin + + +# region Test Executors + + +class DispatchExecutor(Executor): + """Start executor that dispatches a string message to fan-out targets.""" + + @handler + async def handle(self, message: str, ctx: WorkflowContext) -> None: + await ctx.send_message(message) + + +class WorkerExecutor(Executor): + """Executor that processes input and sends a result downstream.""" + + @handler + async def handle(self, data: str, ctx: WorkflowContext) -> None: + await ctx.send_message(f"worker:{data}") + + +class HITLExecutor(Executor, RequestInfoMixin): + """Executor that requests human-in-the-loop input.""" + + @handler + async def handle(self, data: str, ctx: WorkflowContext) -> None: + await ctx.request_info(f"need_info:{data}", str) + + @response_handler + async def on_response(self, original: str, response: str, ctx: WorkflowContext) -> None: + await ctx.send_message(f"hitl:{response}") + + +class CollectorExecutor(Executor): + """Executor that collects results as workflow outputs.""" + + def __init__(self, id: str): + super().__init__(id=id) + self.collected: list[str] = [] + + @handler + async def handle(self, data: str, ctx: WorkflowContext) -> None: + self.collected.append(data) + await ctx.yield_output(data) + + +@dataclass +class TypedRequest: + """Typed request for HITL.""" + + question: str + + +@dataclass +class TypedResponse: + """Typed response for HITL.""" + + answer: str + + +class TypedHITLExecutor(Executor, RequestInfoMixin): + """HITL executor using typed request/response.""" + + @handler + async def handle(self, data: str, ctx: WorkflowContext) -> None: + await ctx.request_info(TypedRequest(question=data), TypedResponse) + + @response_handler + async def on_response( + self, original: TypedRequest, response: TypedResponse, ctx: WorkflowContext + ) -> None: + await ctx.send_message(f"hitl:{response.answer}") + + +# endregion + + +class TestFanOutHITLPause: + """Tests for workflow pause behavior when fan-out nodes request HITL.""" + + async def test_fan_out_pauses_when_hitl_requested(self): + """When one fan-out node requests HITL, the workflow pauses at the superstep + boundary. Messages from other parallel nodes are held, not delivered. + + Topology: Dispatch -> fan-out -> [Worker, HITL] -> Collector + """ + dispatch = DispatchExecutor(id="dispatch") + worker = WorkerExecutor(id="worker") + hitl = HITLExecutor(id="hitl") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [worker, hitl]) + .add_edge(worker, collector) + .add_edge(hitl, collector) + .build() + ) + + # First run: Worker processes and sends to Collector, HITL requests info + request_events: list[WorkflowEvent] = [] + outputs: list[str] = [] + final_state: WorkflowRunState | None = None + + async for event in workflow.run("hello", stream=True): + if event.type == "request_info": + request_events.append(event) + elif event.type == "output": + outputs.append(event.data) + elif event.type == "status": + final_state = event.state + + # Workflow should be idle with pending requests + assert final_state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + + # HITL request should be surfaced + assert len(request_events) == 1 + assert request_events[0].data == "need_info:hello" + + # Key assertion: Collector should NOT have received any messages yet. + # Worker's message is held at the superstep boundary. + assert len(outputs) == 0 + assert len(collector.collected) == 0 + + # Resume with HITL response + outputs2: list[str] = [] + final_state2: WorkflowRunState | None = None + + async for event in workflow.run( + stream=True, responses={request_events[0].request_id: "human_answer"} + ): + if event.type == "output": + outputs2.append(event.data) + elif event.type == "status": + final_state2 = event.state + + # Workflow should complete + assert final_state2 == WorkflowRunState.IDLE + + # Collector should have received BOTH worker and HITL results + assert len(outputs2) == 2 + assert "worker:hello" in outputs2 + assert "hitl:human_answer" in outputs2 + + async def test_fan_out_without_hitl_continues_normally(self): + """Fan-out without HITL should not be affected by the pause behavior. + + Topology: Dispatch -> fan-out -> [Worker1, Worker2] -> Collector + """ + dispatch = DispatchExecutor(id="dispatch") + worker1 = WorkerExecutor(id="worker1") + worker2 = WorkerExecutor(id="worker2") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [worker1, worker2]) + .add_edge(worker1, collector) + .add_edge(worker2, collector) + .build() + ) + + result = await workflow.run("hello") + + assert result.get_final_state() == WorkflowRunState.IDLE + outputs = result.get_outputs() + assert len(outputs) == 2 + assert "worker:hello" in outputs + assert "worker:hello" in outputs + + async def test_multiple_hitl_requests_in_fan_out(self): + """When multiple fan-out nodes request HITL, all requests are surfaced + before the workflow pauses. + + Topology: Dispatch -> fan-out -> [HITL1, HITL2] -> Collector + """ + dispatch = DispatchExecutor(id="dispatch") + hitl1 = HITLExecutor(id="hitl1") + hitl2 = HITLExecutor(id="hitl2") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [hitl1, hitl2]) + .add_edge(hitl1, collector) + .add_edge(hitl2, collector) + .build() + ) + + # First run: Both HITL nodes request info + request_events: list[WorkflowEvent] = [] + outputs: list[str] = [] + final_state: WorkflowRunState | None = None + + async for event in workflow.run("hello", stream=True): + if event.type == "request_info": + request_events.append(event) + elif event.type == "output": + outputs.append(event.data) + elif event.type == "status": + final_state = event.state + + assert final_state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + assert len(request_events) == 2 + assert len(outputs) == 0 + + # Resume with both responses + responses = { + request_events[0].request_id: "answer1", + request_events[1].request_id: "answer2", + } + outputs2: list[str] = [] + final_state2: WorkflowRunState | None = None + + async for event in workflow.run(stream=True, responses=responses): + if event.type == "output": + outputs2.append(event.data) + elif event.type == "status": + final_state2 = event.state + + assert final_state2 == WorkflowRunState.IDLE + assert len(outputs2) == 2 + assert "hitl:answer1" in outputs2 + assert "hitl:answer2" in outputs2 + + async def test_fan_out_hitl_with_typed_request_response(self): + """HITL pause works correctly with typed (dataclass) request/response. + + Topology: Dispatch -> fan-out -> [Worker, TypedHITL] -> Collector + """ + dispatch = DispatchExecutor(id="dispatch") + worker = WorkerExecutor(id="worker") + hitl = TypedHITLExecutor(id="typed_hitl") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [worker, hitl]) + .add_edge(worker, collector) + .add_edge(hitl, collector) + .build() + ) + + # First run + request_events: list[WorkflowEvent] = [] + outputs: list[str] = [] + + async for event in workflow.run("what is 2+2?", stream=True): + if event.type == "request_info": + request_events.append(event) + elif event.type == "output": + outputs.append(event.data) + + assert len(request_events) == 1 + assert isinstance(request_events[0].data, TypedRequest) + assert request_events[0].data.question == "what is 2+2?" + assert len(outputs) == 0 # Worker's message held + + # Resume with typed response + outputs2: list[str] = [] + + async for event in workflow.run( + stream=True, + responses={request_events[0].request_id: TypedResponse(answer="4")}, + ): + if event.type == "output": + outputs2.append(event.data) + + assert len(outputs2) == 2 + assert "worker:what is 2+2?" in outputs2 + assert "hitl:4" in outputs2 + + async def test_non_streaming_fan_out_hitl_pause(self): + """HITL pause works correctly in non-streaming mode. + + Topology: Dispatch -> fan-out -> [Worker, HITL] -> Collector + """ + dispatch = DispatchExecutor(id="dispatch") + worker = WorkerExecutor(id="worker") + hitl = HITLExecutor(id="hitl") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [worker, hitl]) + .add_edge(worker, collector) + .add_edge(hitl, collector) + .build() + ) + + # Non-streaming run + result = await workflow.run("hello") + + assert result.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS + assert len(result.get_outputs()) == 0 # Worker's message held + + # Get request info events + request_events = result.get_request_info_events() + assert len(request_events) == 1 + + # Resume with response + result2 = await workflow.run(responses={request_events[0].request_id: "done"}) + assert result2.get_final_state() == WorkflowRunState.IDLE + outputs = result2.get_outputs() + assert len(outputs) == 2 + assert "worker:hello" in outputs + assert "hitl:done" in outputs + + async def test_single_node_hitl_still_works(self): + """Single-node HITL (no fan-out) should continue to work as before. + + This is a regression test to ensure the pause logic doesn't break + simple sequential HITL workflows. + """ + hitl = HITLExecutor(id="hitl") + collector = CollectorExecutor(id="collector") + + workflow = ( + WorkflowBuilder() + .set_start_executor(hitl) + .add_edge(hitl, collector) + .build() + ) + + # First run + request_events: list[WorkflowEvent] = [] + async for event in workflow.run("hello", stream=True): + if event.type == "request_info": + request_events.append(event) + + assert len(request_events) == 1 + + # Resume + outputs: list[str] = [] + async for event in workflow.run( + stream=True, responses={request_events[0].request_id: "world"} + ): + if event.type == "output": + outputs.append(event.data) + + assert len(outputs) == 1 + assert outputs[0] == "hitl:world" + + async def test_fan_out_hitl_pause_with_fan_in(self): + """HITL pause works with fan-out/fan-in topology. + + Topology: Dispatch -> fan-out -> [Worker, HITL] -> fan-in -> Aggregator + """ + + class AggregatorExecutor(Executor): + """Aggregates fan-in results.""" + + def __init__(self, id: str): + super().__init__(id=id) + self.aggregated: list[str] = [] + + @handler + async def handle(self, data: list[str], ctx: WorkflowContext) -> None: + self.aggregated = data + await ctx.yield_output(data) + + dispatch = DispatchExecutor(id="dispatch") + worker = WorkerExecutor(id="worker") + hitl = HITLExecutor(id="hitl") + aggregator = AggregatorExecutor(id="aggregator") + + workflow = ( + WorkflowBuilder() + .set_start_executor(dispatch) + .add_fan_out_edges(dispatch, [worker, hitl]) + .add_fan_in_edges([worker, hitl], aggregator) + .build() + ) + + # First run: HITL pauses, worker's message is held + request_events: list[WorkflowEvent] = [] + outputs: list = [] + + async for event in workflow.run("hello", stream=True): + if event.type == "request_info": + request_events.append(event) + elif event.type == "output": + outputs.append(event.data) + + assert len(request_events) == 1 + assert len(outputs) == 0 # Aggregator didn't run + + # Resume: both messages reach the fan-in, aggregator processes them + outputs2: list = [] + async for event in workflow.run( + stream=True, responses={request_events[0].request_id: "human_input"} + ): + if event.type == "output": + outputs2.append(event.data) + + assert len(outputs2) == 1 + aggregated = outputs2[0] + assert isinstance(aggregated, list) + assert len(aggregated) == 2 + assert "worker:hello" in aggregated + assert "hitl:human_input" in aggregated From a61d2d02c4263047d7998834b7317dbd92bcbb63 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Feb 2026 09:17:32 +0900 Subject: [PATCH 2/3] Pause workflow at superstep boundary when fan-out node requests HITL --- .../_workflows/_runner_context.py | 3 + .../tests/workflow/test_fan_out_hitl_pause.py | 44 +-- .../getting_started/workflows/README.md | 1 + .../fan_out_with_hitl_and_loop.py | 301 ++++++++++++++++++ 4 files changed, 316 insertions(+), 33 deletions(-) create mode 100644 python/samples/getting_started/workflows/human-in-the-loop/fan_out_with_hitl_and_loop.py diff --git a/python/packages/core/agent_framework/_workflows/_runner_context.py b/python/packages/core/agent_framework/_workflows/_runner_context.py index 8d0f56e7dd..fc72ba7819 100644 --- a/python/packages/core/agent_framework/_workflows/_runner_context.py +++ b/python/packages/core/agent_framework/_workflows/_runner_context.py @@ -451,6 +451,9 @@ async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None: # Restore workflow ID self._workflow_id = checkpoint.workflow_id + # Reset superstep tracking - restored events are pre-existing, not new + self._new_request_info_in_superstep = False + # endregion Checkpointing def set_workflow_id(self, workflow_id: str) -> None: diff --git a/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py b/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py index 2c691902ba..a5a1ee4dce 100644 --- a/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py +++ b/python/packages/core/tests/workflow/test_fan_out_hitl_pause.py @@ -11,8 +11,6 @@ from dataclasses import dataclass -import pytest - from agent_framework import ( Executor, WorkflowBuilder, @@ -24,7 +22,6 @@ ) from agent_framework._workflows._request_info_mixin import RequestInfoMixin - # region Test Executors @@ -91,9 +88,7 @@ async def handle(self, data: str, ctx: WorkflowContext) -> None: await ctx.request_info(TypedRequest(question=data), TypedResponse) @response_handler - async def on_response( - self, original: TypedRequest, response: TypedResponse, ctx: WorkflowContext - ) -> None: + async def on_response(self, original: TypedRequest, response: TypedResponse, ctx: WorkflowContext) -> None: await ctx.send_message(f"hitl:{response.answer}") @@ -115,8 +110,7 @@ async def test_fan_out_pauses_when_hitl_requested(self): collector = CollectorExecutor(id="collector") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [worker, hitl]) .add_edge(worker, collector) .add_edge(hitl, collector) @@ -152,9 +146,7 @@ async def test_fan_out_pauses_when_hitl_requested(self): outputs2: list[str] = [] final_state2: WorkflowRunState | None = None - async for event in workflow.run( - stream=True, responses={request_events[0].request_id: "human_answer"} - ): + async for event in workflow.run(stream=True, responses={request_events[0].request_id: "human_answer"}): if event.type == "output": outputs2.append(event.data) elif event.type == "status": @@ -179,8 +171,7 @@ async def test_fan_out_without_hitl_continues_normally(self): collector = CollectorExecutor(id="collector") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [worker1, worker2]) .add_edge(worker1, collector) .add_edge(worker2, collector) @@ -207,8 +198,7 @@ async def test_multiple_hitl_requests_in_fan_out(self): collector = CollectorExecutor(id="collector") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [hitl1, hitl2]) .add_edge(hitl1, collector) .add_edge(hitl2, collector) @@ -262,8 +252,7 @@ async def test_fan_out_hitl_with_typed_request_response(self): collector = CollectorExecutor(id="collector") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [worker, hitl]) .add_edge(worker, collector) .add_edge(hitl, collector) @@ -310,8 +299,7 @@ async def test_non_streaming_fan_out_hitl_pause(self): collector = CollectorExecutor(id="collector") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [worker, hitl]) .add_edge(worker, collector) .add_edge(hitl, collector) @@ -345,12 +333,7 @@ async def test_single_node_hitl_still_works(self): hitl = HITLExecutor(id="hitl") collector = CollectorExecutor(id="collector") - workflow = ( - WorkflowBuilder() - .set_start_executor(hitl) - .add_edge(hitl, collector) - .build() - ) + workflow = WorkflowBuilder(start_executor=hitl).add_edge(hitl, collector).build() # First run request_events: list[WorkflowEvent] = [] @@ -362,9 +345,7 @@ async def test_single_node_hitl_still_works(self): # Resume outputs: list[str] = [] - async for event in workflow.run( - stream=True, responses={request_events[0].request_id: "world"} - ): + async for event in workflow.run(stream=True, responses={request_events[0].request_id: "world"}): if event.type == "output": outputs.append(event.data) @@ -395,8 +376,7 @@ async def handle(self, data: list[str], ctx: WorkflowContext) -> None: aggregator = AggregatorExecutor(id="aggregator") workflow = ( - WorkflowBuilder() - .set_start_executor(dispatch) + WorkflowBuilder(start_executor=dispatch) .add_fan_out_edges(dispatch, [worker, hitl]) .add_fan_in_edges([worker, hitl], aggregator) .build() @@ -417,9 +397,7 @@ async def handle(self, data: list[str], ctx: WorkflowContext) -> None: # Resume: both messages reach the fan-in, aggregator processes them outputs2: list = [] - async for event in workflow.run( - stream=True, responses={request_events[0].request_id: "human_input"} - ): + async for event in workflow.run(stream=True, responses={request_events[0].request_id: "human_input"}): if event.type == "output": outputs2.append(event.data) diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 1d16f8f24b..8562033925 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -87,6 +87,7 @@ Once comfortable with these, explore the rest of the samples below. | SequentialBuilder Request Info | [human-in-the-loop/sequential_request_info.py](./human-in-the-loop/sequential_request_info.py) | Request info for agent responses mid-workflow using `.with_request_info()` on SequentialBuilder | | ConcurrentBuilder Request Info | [human-in-the-loop/concurrent_request_info.py](./human-in-the-loop/concurrent_request_info.py) | Review concurrent agent outputs before aggregation using `.with_request_info()` on ConcurrentBuilder | | GroupChatBuilder Request Info | [human-in-the-loop/group_chat_request_info.py](./human-in-the-loop/group_chat_request_info.py) | Steer group discussions with periodic guidance using `.with_request_info()` on GroupChatBuilder | +| Fan-out HITL Pause | [human-in-the-loop/fan_out_with_hitl_and_loop.py](./human-in-the-loop/fan_out_with_hitl_and_loop.py) | Pause workflow at superstep boundary when a fan-out node requests HITL, preventing runaway parallel loops | ### tool-approval diff --git a/python/samples/getting_started/workflows/human-in-the-loop/fan_out_with_hitl_and_loop.py b/python/samples/getting_started/workflows/human-in-the-loop/fan_out_with_hitl_and_loop.py new file mode 100644 index 0000000000..c0bb9f4f9b --- /dev/null +++ b/python/samples/getting_started/workflows/human-in-the-loop/fan_out_with_hitl_and_loop.py @@ -0,0 +1,301 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Sample: Fan-out with HITL pause and looping node + +Demonstrates the fan-out HITL pause behavior. +When a workflow fans out to parallel branches and one branch requests HITL +(human-in-the-loop), the workflow pauses at the superstep boundary. This +prevents the other branch from continuing to run away in a loop while +human input is pending. + +Topology: + + Dispatcher + | + fan-out + / \ + Analyst Reviewer (HITL) + | + Refiner + | + Analyst (cycle: Analyst <-> Refiner) + +- The Analyst <-> Refiner cycle iteratively refines a draft. +- The Reviewer requests human approval via request_info. +- Without the HITL pause, the Analyst/Refiner loop would keep cycling + through supersteps while the Reviewer waits for human input. +- With the HITL pause, the workflow pauses after the first superstep + where the Reviewer calls request_info. The Analyst's message to + the Refiner is held until the human responds. + +Prerequisites: +- No external dependencies. This sample uses plain executors (no LLM) to demonstrate +the behavior. +""" + +import asyncio +from dataclasses import dataclass +from typing import Any + +from agent_framework import ( + Executor, + WorkflowBuilder, + WorkflowContext, + WorkflowEvent, + WorkflowRunState, + handler, + response_handler, +) +from typing_extensions import Never + +# --------------------------------------------------------------------------- +# Data types +# --------------------------------------------------------------------------- + + +@dataclass +class Draft: + """A document draft that gets refined iteratively.""" + + content: str + version: int + + +@dataclass +class ReviewRequest: + """Request for human review of the research plan.""" + + plan: str + + +@dataclass +class ReviewResponse: + """Human's review response.""" + + approved: bool + feedback: str + + +# --------------------------------------------------------------------------- +# Executors +# --------------------------------------------------------------------------- + + +class Dispatcher(Executor): + """Fans out the initial prompt to both the Analyst and the Reviewer.""" + + @handler + async def dispatch(self, prompt: str, ctx: WorkflowContext[str]) -> None: + print(f"\n[Dispatcher] Received prompt: '{prompt}'") + print("[Dispatcher] Fanning out to Analyst and Reviewer...") + await ctx.send_message(prompt) + + +class Analyst(Executor): + """Produces a draft and sends it to the Refiner for improvement. + + On receiving a string (initial prompt), creates a v1 draft. + On receiving a Draft back from the Refiner, checks if it's good + enough or sends it back for another round. + """ + + MAX_VERSIONS = 3 + + @handler + async def handle_prompt(self, prompt: str, ctx: WorkflowContext[Draft]) -> None: + draft = Draft(content=f"Initial analysis of '{prompt}'", version=1) + print(f" [Analyst] Created draft v{draft.version}: '{draft.content}'") + print(" [Analyst] Sending to Refiner for improvement...") + await ctx.send_message(draft) + + @handler + async def handle_refined(self, draft: Draft, ctx: WorkflowContext[Draft, Draft]) -> None: + print(f" [Analyst] Received refined draft v{draft.version}: '{draft.content}'") + if draft.version >= self.MAX_VERSIONS: + print(f" [Analyst] Draft v{draft.version} is final. Done.") + await ctx.yield_output(draft) + else: + print(" [Analyst] Needs more work, sending back to Refiner...") + await ctx.send_message(draft) + + +class Refiner(Executor): + """Refines a draft and sends it back to the Analyst.""" + + @handler + async def refine(self, draft: Draft, ctx: WorkflowContext[Draft]) -> None: + improved = Draft( + content=f"{draft.content} [refined v{draft.version + 1}]", + version=draft.version + 1, + ) + print(f" [Refiner] Improved draft to v{improved.version}: '{improved.content}'") + await ctx.send_message(improved) + + +class Reviewer(Executor): + """Requests human review of the research plan before proceeding. + + This is the HITL node. When it receives the prompt, it asks a human + to approve the plan. The workflow pauses until the human responds. + """ + + @handler + async def request_review(self, prompt: str, ctx: WorkflowContext) -> None: + plan = f"Research plan for: '{prompt}'" + print(f" [Reviewer] Requesting human approval for: '{plan}'") + await ctx.request_info( + ReviewRequest(plan=plan), + ReviewResponse, + ) + + @response_handler + async def handle_review( + self, + original: ReviewRequest, + response: ReviewResponse, + ctx: WorkflowContext[Never, str], + ) -> None: + if response.approved: + result = f"Plan APPROVED: {original.plan}" + else: + result = f"Plan NEEDS REVISION ({response.feedback}): {original.plan}" + print(f" [Reviewer] Human responded: {result}") + await ctx.yield_output(result) + + +# --------------------------------------------------------------------------- +# Workflow construction +# --------------------------------------------------------------------------- + + +def build_workflow(): + """Build the fan-out workflow with a looping branch and HITL branch. + + Graph: + Dispatcher --fan-out--> [Analyst, Reviewer] + Analyst --> Refiner --> Analyst (cycle) + Reviewer: HITL (request_info / response_handler) + """ + dispatcher = Dispatcher(id="dispatcher") + analyst = Analyst(id="analyst") + refiner = Refiner(id="refiner") + reviewer = Reviewer(id="reviewer") + + return ( + WorkflowBuilder(start_executor=dispatcher) + .add_fan_out_edges(dispatcher, [analyst, reviewer]) + .add_edge(analyst, refiner) + .add_edge(refiner, analyst) + .build() + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main(): + workflow = build_workflow() + + print("=" * 60) + print("Fan-out HITL Pause Demo") + print("=" * 60) + print() + print("Topology:") + print(" Dispatcher --fan-out--> [Analyst, Reviewer]") + print(" Analyst <--cycle--> Refiner") + print(" Reviewer: HITL (requests human approval)") + print() + print("Expected behavior: The Analyst/Refiner loop should NOT") + print("continue while the Reviewer waits for human input.") + print() + + # --- First run --- + print("-" * 60) + print("FIRST RUN: Starting workflow...") + print("-" * 60) + + request_events: list[WorkflowEvent] = [] + outputs: list[Any] = [] + superstep_count = 0 + final_state: WorkflowRunState | None = None + + async for event in workflow.run("Impact of AI on software development", stream=True): + if event.type == "superstep_started": + superstep_count += 1 + print(f"\n--- Superstep {superstep_count} ---") + elif event.type == "request_info": + request_events.append(event) + print(f"\n ** HITL REQUEST: {event.data}") + elif event.type == "output": + outputs.append(event.data) + print(f"\n ** OUTPUT: {event.data}") + elif event.type == "status": + final_state = event.state + + print(f"\n{'=' * 60}") + print("First run completed.") + print(f" Supersteps executed: {superstep_count}") + print(f" Final state: {final_state}") + print(f" HITL requests: {len(request_events)}") + print(f" Outputs: {len(outputs)}") + print() + + # Key observation: only 1 superstep ran. The Analyst sent a message + # to the Refiner, but it was NOT delivered because the Reviewer + # requested HITL in the same superstep. The workflow paused. + print("VERIFIED: Workflow paused after 1 superstep.") + print(" The Analyst/Refiner loop did NOT continue.") + print(" The Analyst's message to Refiner is held in memory.") + print() + + # --- Simulate human response --- + print("-" * 60) + print("HUMAN RESPONSE: Approving the plan...") + print("-" * 60) + + request_id = request_events[0].request_id + human_response = ReviewResponse(approved=True, feedback="Looks good!") + + outputs2: list[Any] = [] + superstep_count2 = 0 + final_state2: WorkflowRunState | None = None + + async for event in workflow.run( + stream=True, + responses={request_id: human_response}, + ): + if event.type == "superstep_started": + superstep_count2 += 1 + print(f"\n--- Superstep {superstep_count2} ---") + elif event.type == "output": + outputs2.append(event.data) + print(f"\n ** OUTPUT: {event.data}") + elif event.type == "status": + final_state2 = event.state + + print(f"\n{'=' * 60}") + print("Second run completed.") + print(f" Supersteps executed: {superstep_count2}") + print(f" Final state: {final_state2}") + print(f" Outputs: {len(outputs2)}") + + # After HITL response, the held messages are delivered. The + # Analyst/Refiner loop runs to completion AND the Reviewer's + # response handler produces output. + print() + for i, output in enumerate(outputs2): + print(f" Output {i + 1}: {output}") + + print() + print("VERIFIED: Both branches completed after HITL response.") + print(" The Analyst/Refiner loop ran to completion.") + print(" The Reviewer's approval was processed.") + print() + print("Done!") + + +if __name__ == "__main__": + asyncio.run(main()) From a302ba8e6ec5c545748bd3a5a235049aca674a08 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Feb 2026 09:29:59 +0900 Subject: [PATCH 3/3] Improve logging --- python/packages/core/agent_framework/_workflows/_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/packages/core/agent_framework/_workflows/_runner.py b/python/packages/core/agent_framework/_workflows/_runner.py index 41fb2eb197..9dcc6d448c 100644 --- a/python/packages/core/agent_framework/_workflows/_runner.py +++ b/python/packages/core/agent_framework/_workflows/_runner.py @@ -160,7 +160,7 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]: if self._ctx.had_request_info_in_superstep(): logger.info( f"Pausing workflow after superstep {self._iteration}: " - "request_info event(s) emitted during fan-out execution" + "request_info event(s) emitted during this superstep" ) break