feat(bigquery-analytics): workflow-node boundary events (#207 core)#8
Open
caohy1988 wants to merge 3 commits into
Open
feat(bigquery-analytics): workflow-node boundary events (#207 core)#8caohy1988 wants to merge 3 commits into
caohy1988 wants to merge 3 commits into
Conversation
Emit WORKFLOW_NODE_STARTING / WORKFLOW_NODE_COMPLETED from event- observation derivation, invocation-segment scoped. - Per-invocation contextvar accumulator (_workflow_nodes_ctx), keyed by (node.path, node.run_id); no global plugin state. - on_event_callback: on first event observed at a node, emit WORKFLOW_NODE_STARTING (carrying the C1 node envelope). Events with an empty node.path (non-workflow) are ignored — no synthetic node. - after_run_callback: pause-aware drain before INVOCATION_COMPLETED. A node is left open (no COMPLETED) when its last event carried long_running_tool_ids (paused leaf) or it is a strict path-ancestor of a paused node. Completed rows carry the node envelope, the node's last-event timestamp (via new EventData.timestamp_override, so STARTING->COMPLETED yields per-node duration), and attributes.adk.workflow_node_status = 'completed'. - Accumulator cleared in the after_run finally alongside the other per-invocation context vars. - Producer _EVENT_VIEW_DEFS gains typed views for both types. Scope: the 'failed' outcome is reserved for a follow-up once a reliable invocation-error signal exists (none is cleanly available today); this drain only ever stamps 'completed'. No WORKFLOW_NODE_PAUSED — open nodes are inferable from latest STARTING without COMPLETED within the query window. Tests: starting-once-per-node, empty-path-skip, completed-on-drain, paused-leaf-open, ancestor-of-paused-open, sibling-no-bleed.
test_envelope_node_with_parent_run_id fires a node-bearing STATE_DELTA event, which now also emits a WORKFLOW_NODE_STARTING row (#207). Switch the single-row capture to row-filtering and assert the C1 envelope on the STATE_DELTA row.
getattr(source_event, 'timestamp', None) is typed Any | None, but _WorkflowNodeProgress.last_event_ts is float. Coerce at the boundary (fall back to time.time() when absent) so construction type-checks, and drop the now-dead None guards on the field.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Staging in the fork. Implements the #207 v4 contract core (event-observation derivation, invocation-segment scope, pause-aware drain), per the design reviewed on GoogleCloudPlatform/BigQuery-Agent-Analytics-SDK#207.
Producer
_workflow_nodes_ctx), keyed by(node.path, node.run_id)— invocation-local, no global plugin state (locked answer Q1).on_event_callback: emitWORKFLOW_NODE_STARTINGon first observation of a node; emptynode.path(non-workflow events) ignored — no synthetic node.after_run_callback: pause-aware drain beforeINVOCATION_COMPLETED(deterministic lifecycle point, Q3). A node stays open if its last event carriedlong_running_tool_ids(paused leaf) or it is a strict path-ancestor of a paused node.COMPLETEDrows carry the node envelope, the node's last-event timestamp (newEventData.timestamp_override→ per-node duration), andattributes.adk.workflow_node_status = 'completed'.finally._EVENT_VIEW_DEFSgains typed views for both types.Scope (Q2):
failedis reserved for a follow-up — there is no reliable invocation-error signal in ADK today (after_run_callbackgets no outcome;end_invocationis set on normal end too; discrete*_ERRORevents ≠ invocation failed). This drain only ever stampscompleted; thestatusfield is present and forward-compatible. NoWORKFLOW_NODE_PAUSED.Tests: starting-once-per-node, empty-path-skip, completed-on-drain, paused-leaf-open, ancestor-of-paused-open, sibling-no-bleed.
Unblocks: google#211
WORKFLOW_NODE_*typed columns (consumer SDK follow-up).Local run blocked by the offline env (stale venv); pyink/isort clean, syntax validated — CI is the test gate.