From 7f35ead0f4adaa556b6760f4d25ef14c018eb4f1 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 6 May 2026 11:47:29 +0800 Subject: [PATCH] [AURON #2238] Add support for auron.never.convert.reason in Iceberg scan scenarios --- .../sql/auron/AuronConvertProvider.scala | 2 +- .../spark/sql/auron/AuronConverters.scala | 41 +++++++++------ .../sql/auron/hudi/HudiConvertProvider.scala | 27 ++++++---- thirdparty/auron-iceberg/pom.xml | 5 ++ .../iceberg/IcebergConvertProvider.scala | 20 ++++---- .../auron/iceberg/IcebergScanSupport.scala | 51 +++++++++---------- .../AuronIcebergIntegrationSuite.scala | 31 +++++++++++ .../auron/paimon/PaimonConvertProvider.scala | 9 +++- 8 files changed, 120 insertions(+), 66 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala index 91a26508d..8ac7e9fb8 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.auron import org.apache.spark.sql.execution.SparkPlan trait AuronConvertProvider { - def isEnabled: Boolean + def isEnabled(exec: SparkPlan): Boolean def isSupported(exec: SparkPlan): Boolean diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index cc12a176a..1d721657b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -161,7 +161,7 @@ object AuronConverters extends Logging { case e: BroadcastExchangeExec if enableBroadcastExchange => tryConvert(e, convertBroadcastExchangeExec) case e: FileSourceScanExec if enableScan => // scan - extConvertProviders.find(p => p.isEnabled && p.isSupported(e)) match { + extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match { case Some(provider) => tryConvert(e, provider.convert) case None => tryConvert(e, convertFileSourceScanExec) } @@ -239,23 +239,34 @@ object AuronConverters extends Logging { case exec: ForceNativeExecutionWrapperBase => exec case exec => - extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match { - case Some(provider) => tryConvert(exec, provider.convert) - case None => - Shims.get.convertMoreSparkPlan(exec) match { - case Some(exec) => - exec.setTagValue(convertibleTag, true) - exec.setTagValue(convertStrategyTag, AlwaysConvert) - exec - case None => - if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader + try { + extConvertProviders.find(h => h.isEnabled(exec) && h.isSupported(exec)) match { + case Some(provider) => tryConvert(exec, provider.convert) + case None => + Shims.get.convertMoreSparkPlan(exec) match { + case Some(exec) => exec.setTagValue(convertibleTag, true) exec.setTagValue(convertStrategyTag, AlwaysConvert) exec - } else { - addNeverConvertReasonTag(exec) - } - } + case None => + if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader + exec.setTagValue(convertibleTag, true) + exec.setTagValue(convertStrategyTag, AlwaysConvert) + exec + } else { + addNeverConvertReasonTag(exec) + } + } + } + } catch { + case e @ (_: NotImplementedError | _: AssertionError | _: Exception) => + exec.setTagValue(convertToNonNativeTag, true) + exec.setTagValue(convertibleTag, false) + exec.setTagValue(convertStrategyTag, NeverConvert) + exec.setTagValue( + neverConvertReasonTag, + s"${e.getMessage.replaceFirst("^assertion failed: ?", "")}") + exec } } } diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala index da96d5644..19f0caf1f 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala @@ -27,17 +27,22 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration class HudiConvertProvider extends AuronConvertProvider with Logging { - override def isEnabled: Boolean = { - val sparkVersion = org.apache.spark.SPARK_VERSION - val versionParts = sparkVersion.split("[\\.-]", 3) - val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption) - val maybeMinor = - if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None - val supported = (for { - major <- maybeMajor - minor <- maybeMinor - } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false) - SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported + override def isEnabled(exec: SparkPlan): Boolean = { + exec match { + case _: FileSourceScanExec => + // Only handle Hudi-backed file scans; other scans fall through. + val sparkVersion = org.apache.spark.SPARK_VERSION + val versionParts = sparkVersion.split("[\\.-]", 3) + val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption) + val maybeMinor = + if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None + val supported = (for { + major <- maybeMajor + minor <- maybeMinor + } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false) + SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported + case _ => false + } } override def isSupported(exec: SparkPlan): Boolean = { diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index bb686f481..ec5b3db23 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -81,6 +81,11 @@ ${project.version} test + + org.scala-lang + scala-library + provided + diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index aae3f576e..a2ef9e3aa 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -28,18 +28,18 @@ import org.apache.auron.util.SemanticVersion class IcebergConvertProvider extends AuronConvertProvider with Logging { - override def isEnabled: Boolean = { - val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() - if (!enabled) { - return false - } - if (!sparkCompatible) { - logWarning( - s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " + + override def isEnabled(exec: SparkPlan): Boolean = { + exec match { + case _: BatchScanExec => + val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() + assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") + assert( + sparkCompatible, s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") - return false + enabled + case _ => false } - true + } override def isSupported(exec: SparkPlan): Boolean = { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 1d9efbc3b..78f42cb10 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -47,36 +47,33 @@ object IcebergScanSupport extends Logging { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. - if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { - return None - } + assert(scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not iceberg scans.") // Changelog scan carries row-level changes; not supported by native COW-only path. - if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { - return None - } + assert( + !(scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan"), + "Not iceberg cow table.") val readSchema = scan.readSchema val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) // Native scan can project file-level metadata columns such as _file via partition values. // Metadata columns that require per-row materialization (for example _pos) still fallback. - if (unsupportedMetadataColumns.nonEmpty) { - return None - } + assert( + !(unsupportedMetadataColumns.nonEmpty), + "Has per-row materialization (for example _pos).") val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) // Supported metadata columns are materialized via per-file constant values rather than // read from the Iceberg data file payload. val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) - if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { - return None - } + assert( + fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + "Has iceberg data file payload.") - if (!partitionSchema.fields.forall(field => - NativeConverters.isTypeSupported(field.dataType))) { - return None - } + assert( + partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + "Has unsupported schema type.") val partitions = inputPartitions(exec) // Empty scan (e.g. empty table) should still build a plan to return no rows. @@ -94,27 +91,25 @@ object IcebergScanSupport extends Logging { val icebergPartitions = partitions.flatMap(icebergPartition) // All partitions must be Iceberg SparkInputPartition; otherwise fallback. - if (icebergPartitions.size != partitions.size) { - return None - } + assert( + icebergPartitions.size == partitions.size, + "All partitions must be Iceberg SparkInputPartition.") val fileTasks = icebergPartitions.flatMap(_.fileTasks) // Native scan does not apply delete files; only allow pure data files (COW). - if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { - return None - } + assert( + fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty), + "Not iceberg cow table.") // Native scan handles a single file format; mixed formats must fallback. val formats = fileTasks.map(_.file().format()).distinct - if (formats.size > 1) { - return None - } + assert(!(formats.size > 1), "Not all data file format is a single file format.") val format = formats.headOption.getOrElse(FileFormat.PARQUET) - if (format != FileFormat.PARQUET && format != FileFormat.ORC) { - return None - } + assert( + !(format != FileFormat.PARQUET && format != FileFormat.ORC), + "Only support parquet or orc.") val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) Some( diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 0c5b752a8..6565f2332 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -26,12 +26,43 @@ import org.apache.iceberg.deletes.PositionDelete import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.auron.iceberg.IcebergScanSupport +import org.apache.spark.sql.catalyst.trees.TreeNodeTag +import org.apache.spark.sql.execution.ExplainUtils.collectFirst import org.apache.spark.sql.execution.datasources.v2.BatchScanExec class AuronIcebergIntegrationSuite extends org.apache.spark.sql.QueryTest with BaseAuronIcebergSuite { + test("iceberg native scan with auron.enable.iceberg.scan=false") { + withTable("local.db.t2") { + withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "false") { + sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v") + val df = sql("select * from local.db.t2") + df.collect() + val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason") + assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec => + batchScanExec.getTagValue(neverConvertReasonTag) + }.get.get.equals("Conversion disabled: auron.enable.iceberg.scan=false.")) + } + } + } + + test( + "iceberg scan falls back when reading unsupported metadata columns and check never convert reason") { + withTable("local.db.t4_pos") { + sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") { + val df = sql("select _pos from local.db.t4_pos") + df.collect() + val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason") + assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec => + batchScanExec.getTagValue(neverConvertReasonTag) + }.get.get.equals("Has per-row materialization (for example _pos).")) + } + } + } + test("test iceberg integrate ") { withTable("local.db.t1") { sql( diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala index f9d75c439..21b2ed03a 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala @@ -28,7 +28,14 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration class PaimonConvertProvider extends AuronConvertProvider with Logging { - override def isEnabled: Boolean = SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get() + override def isEnabled(exec: SparkPlan): Boolean = { + exec match { + case _: HiveTableScanExec => + SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get() + case _ => false + } + + } override def isSupported(exec: SparkPlan): Boolean = { exec match {