diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 221741a56fe09..d5260cad9c6ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -191,6 +191,16 @@ case class GroupPartitionsExec( super.outputOrdering } } + + override def simpleString(maxFields: Int): String = { + val joinKeyPositionsString = + joinKeyPositions.map(p => s" JoinKeyPositions: ${p.mkString("[", ",", "]")}").getOrElse("") + val expectedPartitionKeysString = + expectedPartitionKeys.map(ks => s" ExpectedPartitionKeys: ${ks.size}").getOrElse("") + val reducersString = reducers.map(r => s" Reducers: ${r.count(_.isDefined)}").getOrElse("") + s"$nodeName$joinKeyPositionsString$expectedPartitionKeysString$reducersString " + + s"DistributePartitions: $distributePartitions" + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index ace0040049efe..f2eb02e03846e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -20,7 +20,7 @@ import java.sql.Timestamp import java.util.Collections import org.apache.spark.SparkConf -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical @@ -38,7 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.types._ -class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { +class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with ExplainSuiteHelper { private val functions = Seq( UnboundYearsFunction, UnboundDaysFunction, @@ -3216,4 +3216,30 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase { } } } + + test("SPARK-55992: GroupPartitions textual representatin in plans") { + val items_partitions = Array(bucket(4, "id"), years("arrive_time")) + createTable(items, itemsColumns, items_partitions) + + sql(s"INSERT INTO testcat.ns.$items VALUES (1, 'aa', 10.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(bucket(6, "item_id"), years("time")) + createTable(purchases, purchasesColumns, purchases_partitions) + + sql(s"INSERT INTO testcat.ns.$purchases VALUES (2, 10.0, cast('2021-01-01' as timestamp))") + + withSQLConf( + SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false", + SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true", + SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") { + val df = sql( + s""" + |SELECT * + |FROM testcat.ns.$items i + |JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + checkKeywordsExistsInExplain(df, keywords = "GroupPartitions JoinKeyPositions: [0] " + + "ExpectedPartitionKeys: 2 Reducers: 1 DistributePartitions: false") + } + } }