Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.auron
import org.apache.spark.sql.execution.SparkPlan

trait AuronConvertProvider {
def isEnabled: Boolean
def isEnabled(exec: SparkPlan): Boolean

def isSupported(exec: SparkPlan): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object AuronConverters extends Logging {
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
extConvertProviders.find(p => p.isEnabled && p.isSupported(e)) match {
extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match {
case Some(provider) => tryConvert(e, provider.convert)
case None => tryConvert(e, convertFileSourceScanExec)
}
Expand Down Expand Up @@ -239,23 +239,34 @@ object AuronConverters extends Logging {

case exec: ForceNativeExecutionWrapperBase => exec
case exec =>
extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match {
case Some(provider) => tryConvert(exec, provider.convert)
case None =>
Shims.get.convertMoreSparkPlan(exec) match {
case Some(exec) =>
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
case None =>
if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
try {
extConvertProviders.find(h => h.isEnabled(exec) && h.isSupported(exec)) match {
case Some(provider) => tryConvert(exec, provider.convert)
case None =>
Shims.get.convertMoreSparkPlan(exec) match {
case Some(exec) =>
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
} else {
addNeverConvertReasonTag(exec)
}
}
case None =>
if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
} else {
addNeverConvertReasonTag(exec)
}
}
}
Comment on lines +242 to +260
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
exec.setTagValue(convertToNonNativeTag, true)
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(
neverConvertReasonTag,
s"${e.getMessage.replaceFirst("^assertion failed: ?", "")}")
Comment on lines +263 to +268
exec
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration

class HudiConvertProvider extends AuronConvertProvider with Logging {

override def isEnabled: Boolean = {
val sparkVersion = org.apache.spark.SPARK_VERSION
val versionParts = sparkVersion.split("[\\.-]", 3)
val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption)
val maybeMinor =
if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None
val supported = (for {
major <- maybeMajor
minor <- maybeMinor
} yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
override def isEnabled(exec: SparkPlan): Boolean = {
exec match {
case _: FileSourceScanExec =>
// Only handle Hudi-backed file scans; other scans fall through.
val sparkVersion = org.apache.spark.SPARK_VERSION
val versionParts = sparkVersion.split("[\\.-]", 3)
val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption)
val maybeMinor =
if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None
val supported = (for {
major <- maybeMajor
minor <- maybeMinor
} yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
case _ => false
}
}

override def isSupported(exec: SparkPlan): Boolean = {
Expand Down
5 changes: 5 additions & 0 deletions thirdparty/auron-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ import org.apache.auron.util.SemanticVersion

class IcebergConvertProvider extends AuronConvertProvider with Logging {

override def isEnabled: Boolean = {
val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
if (!enabled) {
return false
}
if (!sparkCompatible) {
logWarning(
s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " +
override def isEnabled(exec: SparkPlan): Boolean = {
exec match {
case _: BatchScanExec =>
val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.")
assert(
sparkCompatible,
s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")
return false
enabled
case _ => false
Comment on lines +31 to +40
}
true

}

override def isSupported(exec: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,33 @@ object IcebergScanSupport extends Logging {
val scan = exec.scan
val scanClassName = scan.getClass.getName
// Only handle Iceberg scans; other sources must stay on Spark's path.
if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
return None
}
assert(scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not iceberg scans.")

// Changelog scan carries row-level changes; not supported by native COW-only path.
if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") {
return None
}
assert(
!(scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan"),
"Not iceberg cow table.")

val readSchema = scan.readSchema
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
return None
}
assert(
!(unsupportedMetadataColumns.nonEmpty),
"Has per-row materialization (for example _pos).")
Comment on lines +61 to +63
Comment on lines +61 to +63

val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
// Supported metadata columns are materialized via per-file constant values rather than
// read from the Iceberg data file payload.
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))

if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
return None
}
assert(
fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
"Has iceberg data file payload.")

if (!partitionSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
return None
}
assert(
partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
"Has unsupported schema type.")

val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
Expand All @@ -94,27 +91,25 @@ object IcebergScanSupport extends Logging {

val icebergPartitions = partitions.flatMap(icebergPartition)
// All partitions must be Iceberg SparkInputPartition; otherwise fallback.
if (icebergPartitions.size != partitions.size) {
return None
}
assert(
icebergPartitions.size == partitions.size,
"All partitions must be Iceberg SparkInputPartition.")

val fileTasks = icebergPartitions.flatMap(_.fileTasks)

// Native scan does not apply delete files; only allow pure data files (COW).
if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) {
return None
}
assert(
fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty),
"Not iceberg cow table.")

// Native scan handles a single file format; mixed formats must fallback.
val formats = fileTasks.map(_.file().format()).distinct
if (formats.size > 1) {
return None
}
assert(!(formats.size > 1), "Not all data file format is a single file format.")

val format = formats.headOption.getOrElse(FileFormat.PARQUET)
if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
return None
}
assert(
!(format != FileFormat.PARQUET && format != FileFormat.ORC),
"Only support parquet or orc.")

val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,43 @@ import org.apache.iceberg.deletes.PositionDelete
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.auron.iceberg.IcebergScanSupport
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution.ExplainUtils.collectFirst
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

class AuronIcebergIntegrationSuite
extends org.apache.spark.sql.QueryTest
with BaseAuronIcebergSuite {

test("iceberg native scan with auron.enable.iceberg.scan=false") {
withTable("local.db.t2") {
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "false") {
sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v")
val df = sql("select * from local.db.t2")
df.collect()
val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec =>
batchScanExec.getTagValue(neverConvertReasonTag)
}.get.get.equals("Conversion disabled: auron.enable.iceberg.scan=false."))
Comment on lines +43 to +46
}
}
}

test(
"iceberg scan falls back when reading unsupported metadata columns and check never convert reason") {
withTable("local.db.t4_pos") {
sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v")
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
val df = sql("select _pos from local.db.t4_pos")
df.collect()
val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec =>
batchScanExec.getTagValue(neverConvertReasonTag)
}.get.get.equals("Has per-row materialization (for example _pos)."))
}
}
}

test("test iceberg integrate ") {
withTable("local.db.t1") {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration

class PaimonConvertProvider extends AuronConvertProvider with Logging {

override def isEnabled: Boolean = SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
override def isEnabled(exec: SparkPlan): Boolean = {
exec match {
case _: HiveTableScanExec =>
SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
case _ => false
}

}

override def isSupported(exec: SparkPlan): Boolean = {
exec match {
Expand Down
Loading