Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ statement
| createTableHeader (LEFT_PAREN tableElementList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=identifierReference
LIKE source=identifierReference
(tableProvider |
rowFormat |
createFileFormat |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, columns)
r.copy(name = resolvedIdentifier)

case c @ CreateTableLike(UnresolvedIdentifier(nameParts, allowTemp), _, _, _, _, _) =>
val resolvedIdentifier = resolveIdentifier(nameParts, allowTemp, Nil)
c.copy(name = resolvedIdentifier)

case UnresolvedIdentifier(nameParts, allowTemp) =>
resolveIdentifier(nameParts, allowTemp, Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.{SparkIllegalArgumentException, SparkUnsupportedOperatio
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, ResolvedProcedure, TypeCheckResult, UnresolvedAttribute, UnresolvedException, UnresolvedProcedure, ViewSchemaMode}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.catalog.{FunctionResource, RoutineLanguage}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, FunctionResource, RoutineLanguage}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
Expand Down Expand Up @@ -514,6 +514,35 @@ case class CreateTable(
}
}

/**
* Create a new table with the same schema/partitioning as an existing table or view,
* for use with a v2 catalog.
*
* @param name Target table identifier. Starts as UnresolvedIdentifier, resolved to
* ResolvedIdentifier by ResolveCatalogs.
* @param source Source table or view. Starts as UnresolvedTableOrView, resolved to
* ResolvedTable / ResolvedPersistentView / ResolvedTempView by ResolveRelations.
* @param fileFormat User-specified STORED AS / ROW FORMAT (Hive-style). Empty if not specified.
* @param provider User-specified USING provider. None if not specified.
* @param properties User-specified TBLPROPERTIES.
* @param ifNotExists IF NOT EXISTS flag.
*/
case class CreateTableLike(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have one single command (UnaryRunnableCommand)? I thought that's the preferred way now to reduce plan complexity in the different stages

name: LogicalPlan,
source: LogicalPlan,
fileFormat: CatalogStorageFormat,
provider: Option[String],
properties: Map[String, String],
ifNotExists: Boolean) extends BinaryCommand {

override def left: LogicalPlan = name
override def right: LogicalPlan = source

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): CreateTableLike =
copy(name = newLeft, source = newRight)
}

/**
* Create a new table from a select query with a v2 catalog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}

// For CREATE TABLE LIKE, use the v1 command if both the target and source are in the session
// catalog (or a V1-compatible catalog extension). If source is in a different catalog, fall
// through to the V2 execution path (CreateTableLikeExec via DataSourceV2Strategy).
case CreateTableLike(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean for DSv2 connectors that override the session catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example is Iceberg session catalog.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, agree, we should add a test for sessionCatalog

ResolvedV1Identifier(targetIdent),
ResolvedV1TableOrViewIdentifier(sourceIdent),
fileFormat, provider, properties, ifNotExists) =>
CreateTableLikeCommand(
targetIdent, sourceIdent, fileFormat, provider, properties, ifNotExists)

case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,6 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) {
val targetTable = visitTableIdentifier(ctx.target)
val sourceTable = visitTableIdentifier(ctx.source)
checkDuplicateClauses(ctx.tableProvider, "PROVIDER", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
Expand Down Expand Up @@ -1188,8 +1186,13 @@ class SparkSqlAstBuilder extends AstBuilder {
val storage = toStorageFormat(location, serdeInfo, ctx)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val cleanedProperties = cleanTableProperties(ctx, properties)
CreateTableLikeCommand(
targetTable, sourceTable, storage, provider, cleanedProperties, ctx.EXISTS != null)
CreateTableLike(
name = withIdentClause(ctx.target, UnresolvedIdentifier(_)),
source = createUnresolvedTableOrView(ctx.source, "CREATE TABLE LIKE", allowTempView = true),
fileFormat = storage,
provider = provider,
properties = cleanedProperties,
ifNotExists = ctx.EXISTS != null)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.jdk.CollectionConverters._

import org.apache.spark.internal.LogKeys.TABLE_NAME
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableInfo, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Physical plan node for CREATE TABLE ... LIKE ... targeting a v2 catalog.
*
* Copies schema (columns) and partitioning from `sourceTable`. The following properties of the
* source table are intentionally NOT copied (matching v1 behavior):
* - Table-level comments
* - Source table's TBLPROPERTIES (user-specified `properties` are used instead)
* - Statistics, owner, create time
*/
case class CreateTableLikeExec(
Copy link
Contributor

@aokolnychyi aokolnychyi Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have V1 -> V2 within as well across catalog tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — "v2 target, v1 source: schema and partitioning are copied" tests V1 source (default.src in session catalog) → V2 target (testcat.dst). The "cross-catalog" and "3-part name" tests cover V2→V2 across catalogs.

targetCatalog: TableCatalog,
targetIdent: Identifier,
sourceTable: Table,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean it would only work for creating V2 table from another V2 table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this can be V1Table that wraps CatalogTable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct on both. sourceTable: Table is the V2 Table interface, which can be any implementation. For session catalog sources, ResolveRelations wraps the CatalogTable in a V1Table, which implements Table. So V1→V2 works: the source is a V1Table and we handle it explicitly in the match block at line 57 to preserve CHAR/VARCHAR types.

fileFormat: CatalogStorageFormat,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileFormat: CatalogStorageFormat carries inputFormat/outputFormat/serde fields, but only locationUri is used (line 84). Consider narrowing the exec's parameter to location: Option[URI] to make the contract explicit, leaving the full CatalogStorageFormat only in the logical plan (where the V1 fallback path needs it).

provider: Option[String],
properties: Map[String, String],
ifNotExists: Boolean) extends LeafV2CommandExec {

override def output: Seq[Attribute] = Seq.empty

override protected def run(): Seq[InternalRow] = {
if (!targetCatalog.tableExists(targetIdent)) {
// 1. Extract columns from source. For V1Table sources use the raw schema so that
// CHAR/VARCHAR types are preserved as declared (without internal metadata expansion).
val columns = sourceTable match {
case v1: V1Table =>
val rawSchema = CharVarcharUtils.getRawSchema(v1.catalogTable.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — the test "CHAR and VARCHAR types are preserved from v1 source to v2 target" in CreateTableLikeSuite covers this. It creates a V1 source with CHAR(10) and VARCHAR(20), runs CREATE TABLE testcat.dst LIKE src, and asserts schema("name").dataType === CharType(10) and schema("tag").dataType === VarcharType(20).

CatalogV2Util.structTypeToV2Columns(rawSchema)
case _ =>
sourceTable.columns()
}

// 2. Extract partitioning from source (includes both partition columns and bucket spec
// for V1Table, as V1Table.partitioning encodes both).
val partitioning = sourceTable.partitioning

// 3. Resolve provider: USING clause overrides, else copy from source.
val resolvedProvider = provider.orElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this source provider but not target? Can we actually populate this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does DSv1 do and is it applicable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the source provider being copied to the target — which is exactly the semantics of CREATE TABLE LIKE: the target inherits the source's format unless overridden by a USING clause. This matches V1 CreateTableLikeCommand behavior, which also copies the source provider. The copied provider goes into PROP_PROVIDER in finalProps and is passed to catalog.createTable. Whether the target catalog uses it is catalog-specific: InMemoryCatalog stores it as-is; V2SessionCatalog validates it via DataSource.lookupDataSource.

sourceTable match {
case v1: V1Table if v1.catalogTable.tableType == CatalogTableType.VIEW =>
// When the source is a view, default to the session's default data source.
// This matches V1 CreateTableLikeCommand behavior.
Some(session.sessionState.conf.defaultDataSourceName)
case v1: V1Table =>
v1.catalogTable.provider
case _ =>
Option(sourceTable.properties.get(TableCatalog.PROP_PROVIDER))
}
}

// 4. Build final properties. User-specified TBLPROPERTIES are used as-is; source table
// properties are NOT copied. Provider and location are added if determined above.
val locationProp: Option[(String, String)] =
fileFormat.locationUri.map(uri =>
TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri))

val finalProps = properties ++
resolvedProvider.map(TableCatalog.PROP_PROVIDER -> _) ++
locationProp

try {
// Constraints from the source table are intentionally NOT copied for several reasons:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is too long to be included here, let's shorten it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'll shorten it.

// 1. V1 tables (CatalogTable) have no constraint objects — CHECK, PRIMARY KEY,
// UNIQUE and FOREIGN KEY are V2-only concepts (introduced in Spark 4.1.0).
// CreateTableLikeCommand had nothing to copy, so not copying here preserves
// behavioral parity with the V1 path.
// 2. ForeignKey constraints carry a refTable Identifier bound to the source catalog;
// copying them to a different catalog creates dangling cross-catalog references.
// 3. Constraint names must be unique within a namespace; blindly copying them risks
// collisions with existing constraints in the target namespace.
// 4. Validation status (VALID/INVALID/UNVALIDATED) is tied to the source table's
// existing data and is meaningless on a newly created empty target table.
// 5. NOT NULL is already captured in Column.nullable() and copied via withColumns.
// 6. Consistent with PostgreSQL semantics: CREATE TABLE LIKE does not include
// constraints by default; users must explicitly opt in via INCLUDING CONSTRAINTS.
// If constraint copying is desired, use ALTER TABLE ADD CONSTRAINT after creation.
// If we wanted to support them in the future, the right approach would be to add an
// INCLUDING CONSTRAINTS clause (as PostgreSQL does) rather than copying blindly.
val tableInfo = new TableInfo.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Owner?

.withColumns(columns)
.withPartitions(partitioning)
.withProperties(finalProps.asJava)
.build()
targetCatalog.createTable(targetIdent, tableInfo)
} catch {
case _: TableAlreadyExistsException if ifNotExists =>
logWarning(
log"Table ${MDC(TABLE_NAME, targetIdent.quoted)} was created concurrently. Ignoring.")
}
} else if (!ifNotExists) {
throw QueryCompilationErrors.tableAlreadyExistsError(targetIdent)
}

Seq.empty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.EXPR
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView, ResolvedTable, ResolvedTempView}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder}
import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
Expand Down Expand Up @@ -240,6 +240,33 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
}

// CREATE TABLE ... LIKE ... for a v2 catalog target.
// Source is an already-resolved Table object; no extra catalog round-trip is needed.
case CreateTableLike(
ResolvedIdentifier(catalog, ident),
ResolvedTable(_, _, table, _),
fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) =>
CreateTableLikeExec(
catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three CreateTableLike match cases (for ResolvedTable, ResolvedPersistentView, ResolvedTempView) are nearly identical. Consider consolidating into a single pattern:

    case CreateTableLike(
        ResolvedIdentifier(catalog, ident), source,
        fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) =>
      val table = source match {
        case ResolvedTable(_, _, t, _) => t
        case ResolvedPersistentView(_, _, meta) => V1Table(meta)
        case ResolvedTempView(_, meta) => V1Table(meta)
      }
      CreateTableLikeExec(
        catalog.asTableCatalog, ident, table, fileFormat, provider, properties, ifNotExists) :: Nil


// Source is a persistent or temporary view; wrap its CatalogTable in V1Table so the
// exec can extract schema and provider uniformly.
case CreateTableLike(
ResolvedIdentifier(catalog, ident),
ResolvedPersistentView(_, _, meta),
fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) =>
CreateTableLikeExec(
catalog.asTableCatalog, ident, V1Table(meta),
fileFormat, provider, properties, ifNotExists) :: Nil

case CreateTableLike(
ResolvedIdentifier(catalog, ident),
ResolvedTempView(_, meta),
fileFormat: CatalogStorageFormat, provider, properties, ifNotExists) =>
CreateTableLikeExec(
catalog.asTableCatalog, ident, V1Table(meta),
fileFormat, provider, properties, ifNotExists) :: Nil

case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r, includeTimeTravel = true)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ DescribeColumnCommand `spark_catalog`.`default`.`char_tbl2`, [spark_catalog, def
-- !query
create table char_tbl3 like char_tbl
-- !query analysis
CreateTableLikeCommand `char_tbl3`, `char_tbl`, Storage(), false
CreateTableLikeCommand `spark_catalog`.`default`.`char_tbl3`, `spark_catalog`.`default`.`char_tbl`, Storage(), false


-- !query
Expand Down
Loading