-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-33902][SQL] Support CREATE TABLE LIKE for V2 #54809
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
ef87ac0
c6c7265
2d5dac7
fbf9ad1
88f68d9
5c7a0eb
6e3053c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -293,6 +293,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) | |
| c | ||
| } | ||
|
|
||
| // For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session | ||
| // catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall | ||
| // through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy). | ||
| case CreateTableLike( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this mean for DSv2 connectors that override the session catalog?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An example is Iceberg session catalog.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, agree, we should add a test for sessionCatalog |
||
| ResolvedV1Identifier(targetIdent), | ||
| ResolvedV1TableOrViewIdentifier(sourceIdent), | ||
| fileFormat, provider, properties, ifNotExists) => | ||
| CreateTableLikeCommand( | ||
| targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists) | ||
|
|
||
| case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command => | ||
| DropTableCommand(ident, ifExists, isView = false, purge = purge) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.v2 | ||
|
|
||
| import scala.jdk.CollectionConverters._ | ||
|
|
||
| import org.apache.spark.internal.LogKeys.TABLE_NAME | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTableType, CatalogUtils} | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.catalyst.util.CharVarcharUtils | ||
| import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table} | ||
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ | ||
| import org.apache.spark.sql.errors.QueryCompilationErrors | ||
|
|
||
| /** | ||
| * Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog. | ||
| * | ||
| * Copies schema (columns) and partitioning from `sourceTable`. The following properties of the | ||
| * source table are intentionally NOT copied (matching v1 behavior): | ||
| * - Table-level comments | ||
| * - Source table's TBLPROPERTIES (user-specified `properties` are used instead) | ||
| * - Statistics, owner, create time | ||
| */ | ||
| case class CreateTableLikeExec( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have V1 -> V2 within as well across catalog tests?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — "v2 target, v1 source: schema and partitioning are copied" tests V1 source (default.src in session catalog) → V2 target (testcat.dst). The "cross-catalog" and "3-part name" tests cover V2→V2 across catalogs. |
||
| targetCatalog: TableCatalog, | ||
| targetIdent: Identifier, | ||
| sourceTable: Table, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this mean it would only work for creating V2 table from another V2 table?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, this can be
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct on both. sourceTable: Table is the V2 Table interface, which can be any implementation. For session catalog sources, ResolveRelations wraps the CatalogTable in a V1Table, which implements Table. So V1→V2 works: the source is a V1Table and we handle it explicitly in the match block at line 57 to preserve CHAR/VARCHAR types. |
||
| fileFormat: CatalogStorageFormat, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| provider: Option[String], | ||
| properties: Map[String, String], | ||
| ifNotExists: Boolean) extends LeafV2CommandExec { | ||
|
|
||
| override def output: Seq[Attribute] = Seq.empty | ||
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| if (!targetCatalog.tableExists(targetIdent)) { | ||
| // 1. Extract columns from source. For V1Table sources use the raw schema so that | ||
| // CHAR/VARCHAR types are preserved as declared (without internal metadata expansion). | ||
| val columns = sourceTable match { | ||
| case v1: V1Table => | ||
| val rawSchema = CharVarcharUtils.getRawSchema(v1.catalogTable.schema) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have tests for this?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — the test "CHAR and VARCHAR types are preserved from v1 source to v2 target" in CreateTableLikeSuite covers this. It creates a V1 source with CHAR(10) and VARCHAR(20), runs CREATE TABLE testcat.dst LIKE src, and asserts schema("name").dataType === CharType(10) and schema("tag").dataType === VarcharType(20). |
||
| CatalogV2Util.structTypeToV2Columns(rawSchema) | ||
| case _ => | ||
| sourceTable.columns() | ||
| } | ||
|
|
||
| // 2. Extract partitioning from source (includes both partition columns and bucket spec | ||
| // for V1Table, as V1Table.partitioning encodes both). | ||
| val partitioning = sourceTable.partitioning | ||
|
|
||
| // 3. Resolve provider: USING clause overrides, else copy from source. | ||
| val resolvedProvider = provider.orElse { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this source provider but not target? Can we actually populate this?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does DSv1 do and is it applicable?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is the source provider being copied to the target — which is exactly the semantics of CREATE TABLE LIKE: the target inherits the source's format unless overridden by a USING clause. This matches V1 CreateTableLikeCommand behavior, which also copies the source provider. The copied provider goes into PROP_PROVIDER in finalProps and is passed to catalog.createTable. Whether the target catalog uses it is catalog-specific: InMemoryCatalog stores it as-is; V2SessionCatalog validates it via DataSource.lookupDataSource. |
||
| sourceTable match { | ||
| case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW => | ||
| // When the source is a view, default to the session's default data source. | ||
| // This matches V1 CreateTableLikeCommand behavior. | ||
| Some(session.sessionState.conf.defaultDataSourceName) | ||
| case v1: V1Table => | ||
| v1.catalogTable.provider | ||
| case _ => | ||
| Option(sourceTable.properties.get(TableCatalog.PROP_PROVIDER)) | ||
| } | ||
| } | ||
|
|
||
| // 4. Build final properties. User-specified TBLPROPERTIES are used as-is; source table | ||
| // properties are NOT copied. Provider and location are added if determined above. | ||
| val locationProp: Option[(String, String)] = | ||
| fileFormat.locationUri.map(uri => | ||
| TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) | ||
|
|
||
| val finalProps = properties ++ | ||
| resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++ | ||
| locationProp | ||
|
|
||
| try { | ||
| // Constraints from the source table are intentionally NOT copied for several reasons: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is too long to be included here, let's shorten it?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. I'll shorten it. |
||
| // 1. V1 tables (CatalogTable) have no constraint objects — CHECK, PRIMARY KEY, | ||
| // UNIQUE and FOREIGN KEY are V2-only concepts (introduced in Spark 4.1.0). | ||
| // CreateTableLikeCommand had nothing to copy, so not copying here preserves | ||
| // behavioral parity with the V1 path. | ||
| // 2. ForeignKey constraints carry a refTable Identifier bound to the source catalog; | ||
| // copying them to a different catalog creates dangling cross-catalog references. | ||
| // 3. Constraint names must be unique within a namespace; blindly copying them risks | ||
| // collisions with existing constraints in the target namespace. | ||
| // 4. Validation status (VALID/INVALID/UNVALIDATED) is tied to the source table's | ||
| // existing data and is meaningless on a newly created empty target table. | ||
| // 5. NOT NULL is already captured in Column.nullable() and copied via withColumns. | ||
| // 6. Consistent with PostgreSQL semantics: CREATE TABLE LIKE does not include | ||
| // constraints by default; users must explicitly opt in via INCLUDING CONSTRAINTS. | ||
| // If constraint copying is desired, use ALTER TABLE ADD CONSTRAINT after creation. | ||
| // If we wanted to support them in the future, the right approach would be to add an | ||
| // INCLUDING CONSTRAINTS clause (as PostgreSQL does) rather than copying blindly. | ||
| val tableInfo = new TableInfo.Builder() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Owner? |
||
| .withColumns(columns) | ||
| .withPartitions(partitioning) | ||
| .withProperties(finalProps.asJava) | ||
| .build() | ||
| targetCatalog.createTable(targetIdent, tableInfo) | ||
| } catch { | ||
| case _: TableAlreadyExistsException if ifNotExists => | ||
| logWarning( | ||
| log"Table ${MDC(TABLE_NAME, targetIdent.quoted)} was created concurrently. Ignoring.") | ||
| } | ||
| } else if (!ifNotExists) { | ||
| throw QueryCompilationErrors.tableAlreadyExistsError(targetIdent) | ||
| } | ||
|
|
||
| Seq.empty | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,16 +24,16 @@ import org.apache.hadoop.fs.Path | |
| import org.apache.spark.{SparkException, SparkIllegalArgumentException} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.LogKeys.EXPR | ||
| import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable} | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogUtils | ||
| import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView} | ||
| import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils} | ||
| import org.apache.spark.sql.catalyst.expressions | ||
| import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression} | ||
| import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral | ||
| import org.apache.spark.sql.catalyst.planning.PhysicalOperation | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder} | ||
| import org.apache.spark.sql.classic.SparkSession | ||
| import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable} | ||
| import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table} | ||
| import org.apache.spark.sql.connector.catalog.TableChange | ||
| import org.apache.spark.sql.connector.catalog.index.SupportsIndex | ||
| import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} | ||
|
|
@@ -240,6 +240,33 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat | |
| qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil | ||
| } | ||
|
|
||
| // CREATE TABLE ... LIKE ... for a v2 catalog target. | ||
| // Source is an already-resolved Table object; no extra catalog round-trip is needed. | ||
| case CreateTableLike( | ||
| ResolvedIdentifier(catalog, ident), | ||
| ResolvedTable(_, _, table, _), | ||
| fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => | ||
| CreateTableLikeExec( | ||
| catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The three case CreateTableLike(
ResolvedIdentifier(catalog, ident), source,
fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) =>
val table = source match {
case ResolvedTable(_, _, t, _) => t
case ResolvedPersistentView(_, _, meta) => V1Table(meta)
case ResolvedTempView(_, meta) => V1Table(meta)
}
CreateTableLikeExec(
catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil |
||
|
|
||
| // Source is a persistent or temporary view; wrap its CatalogTable in V1Table so the | ||
| // exec can extract schema and provider uniformly. | ||
| case CreateTableLike( | ||
| ResolvedIdentifier(catalog, ident), | ||
| ResolvedPersistentView(_, _, meta), | ||
| fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => | ||
| CreateTableLikeExec( | ||
| catalog.asTableCatalog, ident, V1Table(meta), | ||
| fileFormat, provider, properties, ifNotExists) :: Nil | ||
|
|
||
| case CreateTableLike( | ||
| ResolvedIdentifier(catalog, ident), | ||
| ResolvedTempView(_, meta), | ||
| fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) => | ||
| CreateTableLikeExec( | ||
| catalog.asTableCatalog, ident, V1Table(meta), | ||
| fileFormat, provider, properties, ifNotExists) :: Nil | ||
|
|
||
| case RefreshTable(r: ResolvedTable) => | ||
| RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have one single command (UnaryRunnableCommand)? I thought that's the preferred way now to reduce plan complexity in the different stages