Skip to content

[GLUTEN-11980][CORE] For decimal-key joins, if one side falls back to Spark, force fallback the other side#12000

Open
beliefer wants to merge 3 commits intoapache:mainfrom
beliefer:11980
Open

[GLUTEN-11980][CORE] For decimal-key joins, if one side falls back to Spark, force fallback the other side#12000
beliefer wants to merge 3 commits intoapache:mainfrom
beliefer:11980

Conversation

@beliefer
Copy link
Copy Markdown
Contributor

@beliefer beliefer commented Apr 28, 2026

What changes are proposed in this pull request?

This PR proposes to fix a bug. Join operator lost data due to decimal join key if one side is vanilla scan and the other side is native scan.

For each join, if one side's scan (FileSourceScanExec or HiveTableScanExec) cannot be offloaded to the native engine, but the other side. If the join key of this join exists decmal type, the row could not be matched well due to the decimal precision within Spark is different from Velox.

Then we will see many rows cannot be matched and even zero rows could be matched.

Why do Spark and Gluten have different precision processing for decimal types?
Spark prioritizes accuracy consistency and would rather sacrifice some performance for this purpose; Gluten pursues ultimate performance, which may introduce subtle differences in accuracy in certain calculations.
There are a lot of issues: #4652

Fixes #11980

How was this patch tested?

Manual tests in our production environment.
UT.

Was this patch authored or co-authored using generative AI tooling?

'No'.
I just use AI to generate the comments.

@github-actions github-actions Bot added CORE works for Gluten Core VELOX labels Apr 28, 2026
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@beliefer beliefer marked this pull request as draft April 28, 2026 09:37
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@beliefer beliefer marked this pull request as ready for review April 30, 2026 08:49
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

* AdaptiveSparkPlanExec is handled by descending into its `initialPlan`; all other non-join nodes
* are handled recursively through their children.
*/
private def validateJoin(plan: SparkPlan): Unit =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to a separate rule?

Copy link
Copy Markdown
Contributor Author

@beliefer beliefer May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created another rule AddFallbackTagsForJoin like AddFallbackTags and change the code of WithRewrites like:

case class WithRewrites(
      validator: Validator,
      rewriteRules: Seq[RewriteSingleNode],
      offloadRules: Seq[OffloadSingleNode])
    extends Rule[SparkPlan] {
    private val validate = AddFallbackTags(validator)
    private val validateJoin = AddFallbackTagsForJoin(validator)
    private val rewrite = RewriteSparkPlanRulesManager(validate, validateJoin, rewriteRules)
    private val offload = LegacyOffload(offloadRules)

    override def apply(plan: SparkPlan): SparkPlan = {
      Seq(rewrite, validate, validateJoin, offload).foldLeft(plan) {
        case (plan, stage) =>
          stage(plan)
      }
    }
  }

Then we must change the constructor of RewriteSparkPlanRulesManager to accept AddFallbackTagsForJoin and change the logic of RewriteSparkPlanRulesManager.
I think it's not worth to do this, so I merged the code into AddFallbackTags.

}
}

testGluten(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this test here? Looks like FallbackSuite has covered the test. And please update the PR description, this problem because one side join is native while another side fallback, so we need to fallback both side, it does not related to decimal, right?

Copy link
Copy Markdown
Contributor Author

@beliefer beliefer May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It is related to decimal join key. We do not need fallback the other side while one side falling back if the join key have good equality(e.g. string, int) .

The test FallbackSuite cannot cover all the code path, so we need to add these test cases.

Copy link
Copy Markdown
Contributor

@jinchengchenghh jinchengchenghh May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe more why decimal is different and need to handle differently in PR description?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@beliefer beliefer requested a review from jinchengchenghh May 1, 2026 04:43
*
* When the join key is a decimal type, a native (Velox) scan and a vanilla Spark scan
* ([[FileSourceScanExec]] or `HiveTableScanExec`) may produce different representations of the
* same decimal value: the native reader may surface raw uncoerced int128_t values while the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the native side to support this case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one side fallbacks, this side should insert ColumnarToRow, why this representation issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation principle of decimal determines their different accuracies, and it is a difficult problem for me to solve now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find the root cause and fix the result mismatch issue in native side, other one in community may fix this issue, please keep this issue open now.

* the right subtree of the join
*/
private def setFallbackTagForOtherSide(leftChild: SparkPlan, rightChild: SparkPlan): Unit = {
val leftHasFallbackScan = hasFallbackScan(leftChild)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only the scan fallback cause this issue, after filter, it may also occur?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter what, we should keep the two side are offloaded at the same time or not.

@beliefer beliefer requested a review from jinchengchenghh May 4, 2026 13:12
beliefer added 3 commits May 6, 2026 12:39
… Spark, force fallback the other side

# Conflicts:
#	backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 6, 2026

Run Gluten Clickhouse CI on x86

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core VELOX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Join operator lost data due to decimal join key.

2 participants