[GLUTEN-11980][CORE] For decimal-key joins, if one side falls back to Spark, force fallback the other side#12000
[GLUTEN-11980][CORE] For decimal-key joins, if one side falls back to Spark, force fallback the other side#12000beliefer wants to merge 3 commits intoapache:mainfrom
Conversation
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| * AdaptiveSparkPlanExec is handled by descending into its `initialPlan`; all other non-join nodes | ||
| * are handled recursively through their children. | ||
| */ | ||
| private def validateJoin(plan: SparkPlan): Unit = |
There was a problem hiding this comment.
Could we move this to a separate rule?
There was a problem hiding this comment.
I created another rule AddFallbackTagsForJoin like AddFallbackTags and change the code of WithRewrites like:
case class WithRewrites(
validator: Validator,
rewriteRules: Seq[RewriteSingleNode],
offloadRules: Seq[OffloadSingleNode])
extends Rule[SparkPlan] {
private val validate = AddFallbackTags(validator)
private val validateJoin = AddFallbackTagsForJoin(validator)
private val rewrite = RewriteSparkPlanRulesManager(validate, validateJoin, rewriteRules)
private val offload = LegacyOffload(offloadRules)
override def apply(plan: SparkPlan): SparkPlan = {
Seq(rewrite, validate, validateJoin, offload).foldLeft(plan) {
case (plan, stage) =>
stage(plan)
}
}
}
Then we must change the constructor of RewriteSparkPlanRulesManager to accept AddFallbackTagsForJoin and change the logic of RewriteSparkPlanRulesManager.
I think it's not worth to do this, so I merged the code into AddFallbackTags.
| } | ||
| } | ||
|
|
||
| testGluten( |
There was a problem hiding this comment.
Why do we need this test here? Looks like FallbackSuite has covered the test. And please update the PR description, this problem because one side join is native while another side fallback, so we need to fallback both side, it does not related to decimal, right?
There was a problem hiding this comment.
No. It is related to decimal join key. We do not need fallback the other side while one side falling back if the join key have good equality(e.g. string, int) .
The test FallbackSuite cannot cover all the code path, so we need to add these test cases.
There was a problem hiding this comment.
Could you describe more why decimal is different and need to handle differently in PR description?
| * | ||
| * When the join key is a decimal type, a native (Velox) scan and a vanilla Spark scan | ||
| * ([[FileSourceScanExec]] or `HiveTableScanExec`) may produce different representations of the | ||
| * same decimal value: the native reader may surface raw uncoerced int128_t values while the |
There was a problem hiding this comment.
Can we update the native side to support this case?
There was a problem hiding this comment.
If one side fallbacks, this side should insert ColumnarToRow, why this representation issue?
There was a problem hiding this comment.
The implementation principle of decimal determines their different accuracies, and it is a difficult problem for me to solve now.
There was a problem hiding this comment.
We should find the root cause and fix the result mismatch issue in native side, other one in community may fix this issue, please keep this issue open now.
| * the right subtree of the join | ||
| */ | ||
| private def setFallbackTagForOtherSide(leftChild: SparkPlan, rightChild: SparkPlan): Unit = { | ||
| val leftHasFallbackScan = hasFallbackScan(leftChild) |
There was a problem hiding this comment.
Not only the scan fallback cause this issue, after filter, it may also occur?
There was a problem hiding this comment.
No matter what, we should keep the two side are offloaded at the same time or not.
… Spark, force fallback the other side # Conflicts: # backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
|
Run Gluten Clickhouse CI on x86 |
What changes are proposed in this pull request?
This PR proposes to fix a bug. Join operator lost data due to decimal join key if one side is vanilla scan and the other side is native scan.
For each join, if one side's scan (
FileSourceScanExecorHiveTableScanExec) cannot be offloaded to the native engine, but the other side. If the join key of this join exists decmal type, the row could not be matched well due to the decimal precision within Spark is different from Velox.Then we will see many rows cannot be matched and even zero rows could be matched.
Why do Spark and Gluten have different precision processing for decimal types?
Spark prioritizes accuracy consistency and would rather sacrifice some performance for this purpose; Gluten pursues ultimate performance, which may introduce subtle differences in accuracy in certain calculations.
There are a lot of issues: #4652
Fixes #11980
How was this patch tested?
Manual tests in our production environment.
UT.
Was this patch authored or co-authored using generative AI tooling?
'No'.
I just use AI to generate the comments.