@@ -37,7 +37,7 @@ import org.apache.spark.util.Utils
3737import org .apache .hadoop .hive .ql .io .orc .OrcInputFormat
3838import org .apache .hadoop .hive .ql .io .parquet .MapredParquetInputFormat
3939import org .apache .hadoop .hive .ql .plan .TableDesc
40- import org .apache .hadoop .mapred .TextInputFormat
40+ import org .apache .hadoop .mapred .{ InputFormat , TextInputFormat }
4141
4242import java .net .URI
4343
@@ -76,17 +76,46 @@ case class HiveTableScanExecTransformer(
7676 override def getPartitionWithReadFileFormats : Seq [(Partition , ReadFileFormat )] =
7777 partitionWithReadFileFormats
7878
79- override def getDistinctPartitionReadFileFormats : Set [ReadFileFormat ] =
80- if (
81- relation.isPartitioned &&
82- basePrunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass)
83- ) {
84- basePrunedPartitions.map {
85- partition => getReadFileFormat(HiveClientImpl .fromHivePartition(partition).storage)
86- }.toSet
87- } else {
79+ // Only used for file format validation on the driver side. Must not trigger subquery execution.
80+ override def getDistinctPartitionReadFileFormats : Set [ReadFileFormat ] = {
81+ if (! relation.isPartitioned) {
82+ return Set (fileFormat)
83+ }
84+ // Single pass: use getInputFormatClass (cheap) to classify each partition.
85+ // formatCache deduplicates by (InputFormatClass, Option[serdeClass]) so that
86+ // HiveClientImpl.fromHivePartition is called at most once per distinct format combination.
87+ // The serde is included in the key only for TextInputFormat, since its ReadFileFormat
88+ // depends on both the InputFormat and the serde (e.g. JsonSerDe -> JsonReadFormat).
89+ val tableInputFormatClass = tableDesc.getInputFileFormatClass
90+ var hasTableFormatPartitions = false
91+ val formatCache =
92+ collection.mutable.Map [(Class [_ <: InputFormat [_, _]], Option [String ]), ReadFileFormat ]()
93+ basePrunedPartitions.foreach {
94+ partition =>
95+ val cls = partition.getInputFormatClass
96+ if (cls == tableInputFormatClass) {
97+ hasTableFormatPartitions = true
98+ } else {
99+ val serdeKey =
100+ if (TEXT_INPUT_FORMAT_CLASS .isAssignableFrom(cls)) {
101+ Option (partition.getTPartition.getSd.getSerdeInfo.getSerializationLib)
102+ } else {
103+ None
104+ }
105+ formatCache.getOrElseUpdate(
106+ (cls, serdeKey),
107+ getReadFileFormat(HiveClientImpl .fromHivePartition(partition).storage))
108+ }
109+ }
110+ val otherFormats = formatCache.values.toSet
111+ if (otherFormats.isEmpty) {
88112 Set (fileFormat)
113+ } else if (hasTableFormatPartitions) {
114+ otherFormats + fileFormat
115+ } else {
116+ otherFormats
89117 }
118+ }
90119
91120 override def getPartitionSchema : StructType = relation.tableMeta.partitionSchema
92121
0 commit comments