Skip to content

Fix: per-index trigger rule evaluation broken for mapped task groups after upstream expansion#68426

Open
bujjibabukatta wants to merge 2 commits into
apache:mainfrom
bujjibabukatta:fix/trigger-rule-placeholder-rewrite-68417
Open

Fix: per-index trigger rule evaluation broken for mapped task groups after upstream expansion#68426
bujjibabukatta wants to merge 2 commits into
apache:mainfrom
bujjibabukatta:fix/trigger-rule-placeholder-rewrite-68417

Conversation

@bujjibabukatta

Copy link
Copy Markdown
Contributor

Closes #68417

Problem

When you use a mapped task group (e.g. divide_and_report.expand(i=gen_examples())),
tasks inside the group should evaluate their trigger rules per index. so if
divide(0) fails, only report_success(0) should become upstream_failed, while
report_success(1), (2), (3) still run successfully.

This was working correctly until PR #59691 landed. That PR fixed a real bug in XCom
resolution (downstream tasks incorrectly referencing a placeholder that no longer
existed after upstream expansion), but it accidentally broke trigger-rule evaluation
as a side effect.

What went wrong ?

When an upstream task expands (its summary placeholder at map_index=-1 gets replaced
by real instances at 0, 1, 2, ...), PR #59691 introduced a rewrite that maps the
downstream placeholder's dependency from -1 to 0. This rewrite is correct for
XCom resolution, but it was also kicking in during trigger-rule evaluation of the
downstream summary task instance (the unexpanded placeholder, also at map_index=-1).

The result: the summary instance of report_success would evaluate its ALL_SUCCESS
trigger rule against only divide(0) instead of all divide instances. Since
divide(0) failed, report_success(-1) got marked upstream_failed before it ever
had a chance to expand. Once the summary is upstream_failed, all its expanded
instances (0, 1, 2, 3) inherit that state so everything ends up
upstream_failed instead of just index 0.

Before the fix:
report_success: {0: upstream_failed, 1: upstream_failed, 2: upstream_failed, 3: upstream_failed} ❌

After the fix:
report_success: {0: upstream_failed, 1: success, 2: success, 3: success} ✅

The Fix

The fix is a small, targeted change in trigger_rule_dep.py.

When evaluating the trigger rule for a summary task instance (map_index < 0)
inside a mapped task group, and the upstream task is not an expansion dependency
(i.e. it lives inside the same group), we now skip the placeholder-to-index-0 rewrite
entirely.

  • For fast-triggered rules (ONE_SUCCESS, ONE_FAILED, ONE_DONE): behavior is
    unchanged we return None so all upstream instances are considered, letting the
    rule fire as soon as one instance meets the condition.

  • For all other rules (ALL_SUCCESS, NONE_FAILED, etc.): we return the summary
    map index (-1) directly. After the upstream has expanded, its -1 placeholder no
    longer exists in the database, so the trigger rule sees zero relevant instances and
    passes. The summary task instance is then free to expand, and each resulting
    per-index instance evaluates its trigger rule independently against the correct
    upstream instance.

The XCom resolution path is completely unaffected because it calls
get_relevant_upstream_map_indexes directly, not through the trigger-rule evaluation
code path.

Tests

The following existing tests cover this fix and were broken before this change:

  • test_one_failed_trigger_rule_in_mapped_task_group_is_per_index verifies that a
    single upstream failure only affects the downstream instance at the same index
  • test_one_failed_trigger_rule_runs_on_indirect_failure_in_mapped_task_group
    verifies that ONE_FAILED still fires correctly before expansion
  • test_downstream_placeholder_handles_upstream_post_expansion verifies that the
    XCom resolution rewrite still works correctly and is unaffected by this change

…turning None vectors

VectorStoreIndex._get_node_with_embedding() calls node.copy() internally
before attaching embeddings, so reading node.embedding from the original
node list after index construction always returned None.

Fix by calling embed_model.get_text_embedding_batch() before building the
index and assigning the results directly to the original node objects.
VectorStoreIndex then skips re-embedding nodes that already carry a vector.

Closes apache#68416
…n for summary TIs

PR apache#59691 added `_should_use_post_expansion_placeholder` to fix XCom
resolution when an upstream placeholder (map_index=-1) is replaced by its
first expanded instance (map_index=0). The rewrite correctly resolves the
downstream dependency for XCom/argument purposes, but also inadvertently
applied during trigger-rule evaluation of not-yet-expanded summary task
instances (map_index < 0) inside mapped task groups.

When a summary TI's trigger rule (e.g. ALL_SUCCESS) is evaluated after the
upstream has expanded but before the summary TI itself has expanded, the
rewrite narrows the relevant upstream set to only the first expanded instance
(index 0). A failure at index 0 then marks the summary TI as upstream_failed,
preventing expansion and causing every subsequent per-index instance to be
incorrectly upstream_failed.

Fix: in `trigger_rule_dep.py`, extend the existing summary-TI guard to also
cover non-fast-triggered rules for non-expansion-dep upstreams. For those
cases return `ti.map_index` (-1) directly, which points to the upstream
placeholder that no longer exists after expansion. The trigger rule therefore
sees no relevant instances and passes, allowing the summary TI to expand so
that each resulting instance is evaluated per-index.

Fast-triggered rules (ONE_SUCCESS / ONE_FAILED / ONE_DONE) continue to return
None (all instances relevant) so they can fire before expansion as before.

Fixes: apache#68417
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redo #59691: placeholder upstream map-index resolution breaks per-index expansion in mapped task groups

1 participant