diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index e96777f43df76..d5a36cd8bfb86 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -299,24 +299,22 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * Create a table in the catalog by copying metadata from an existing source table. *
* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog. - * The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides: - * TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified). - * It does NOT contain schema, partitioning, properties, constraints, or owner from the source - * table. Connectors must read all source metadata directly from {@code sourceTable}, including - * columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}), - * constraints ({@link Table#constraints()}), and format-specific properties - * ({@link Table#properties()}). Connectors are also responsible for setting the owner of the - * new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}). + * The {@code tableInfo} parameter contains all the explicit information for the new table: + * columns and partitioning copied from the source, any constraints copied from the source, + * user-specified TBLPROPERTIES / LOCATION / USING provider (if given), and + * {@link #PROP_OWNER} set to the current user. Source table properties are intentionally + * excluded from {@code tableInfo}; connectors may read {@code sourceTable.properties()} to + * clone additional format-specific or custom state as appropriate for their implementation. *
* The default implementation throws {@link UnsupportedOperationException}. Connectors that * support {@code CREATE TABLE ... LIKE ...} must override this method. * * @param ident a table identifier for the new table - * @param sourceTable the resolved source table; connectors read schema, partitioning, - * constraints, properties, and any format-specific metadata from this object - * @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION, - * and USING provider (if explicitly given); source schema, - * partitioning, provider, constraints, and owner are NOT included + * @param tableInfo complete description of the new table: columns, partitioning, constraints, + * explicit properties (user overrides + owner); source table properties + * are NOT included + * @param sourceTable the resolved source table; connectors may read format-specific properties + * or other custom state from this object to clone additional metadata * @return metadata for the new table * * @throws TableAlreadyExistsException If a table or view already exists for the identifier @@ -325,7 +323,7 @@ default Table createTable(Identifier ident, TableInfo tableInfo) * @since 4.2.0 */ default Table createTableLike( - Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides) + Identifier ident, TableInfo tableInfo, Table sourceTable) throws TableAlreadyExistsException, NoSuchNamespaceException { throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE"); } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 4fd9f6f6da913..ad9c2655023fc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1389,12 +1389,15 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, columns, emptyTrans, srcProps) val sourceTable = catalog.loadTable(srcIdent) - // tableInfo contains only user overrides; schema and partitioning come from sourceTable + // tableInfo has columns/partitions/constraints from source plus user-specified properties val overrides = Map("user.key" -> "user.value").asJava val tableInfo = new TableInfo.Builder() + .withColumns(sourceTable.columns()) + .withPartitions(sourceTable.partitioning()) + .withConstraints(sourceTable.constraints()) .withProperties(overrides) .build() - catalog.createTableLike(dstIdent, sourceTable, tableInfo) + catalog.createTableLike(dstIdent, tableInfo, sourceTable) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("user.key") == "user.value", @@ -1410,11 +1413,14 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, columns, emptyTrans, srcProps) val sourceTable = catalog.loadTable(srcIdent) - // tableInfo contains no overrides; connector reads schema and properties from sourceTable + // tableInfo has columns/partitions/constraints from source; no explicit property overrides val tableInfo = new TableInfo.Builder() + .withColumns(sourceTable.columns()) + .withPartitions(sourceTable.partitioning()) + .withConstraints(sourceTable.constraints()) .withProperties(emptyProps) .build() - catalog.createTableLike(dstIdent, sourceTable, tableInfo) + catalog.createTableLike(dstIdent, tableInfo, sourceTable) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("format.version") == "2", @@ -1432,12 +1438,15 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, columns, emptyTrans, srcProps) val sourceTable = catalog.loadTable(srcIdent) - // user explicitly overrides format.version + // user explicitly overrides format.version; columns/partitions/constraints from source val overrides = Map("format.version" -> "2").asJava val tableInfo = new TableInfo.Builder() + .withColumns(sourceTable.columns()) + .withPartitions(sourceTable.partitioning()) + .withConstraints(sourceTable.constraints()) .withProperties(overrides) .build() - catalog.createTableLike(dstIdent, sourceTable, tableInfo) + catalog.createTableLike(dstIdent, tableInfo, sourceTable) val dst = catalog.loadTable(dstIdent) assert(dst.properties.asScala("format.version") == "2", @@ -1460,14 +1469,18 @@ class CatalogSuite extends SparkFunSuite { catalog.createTable(srcIdent, srcTableInfo) val sourceTable = catalog.loadTable(srcIdent) + // tableInfo has columns/partitions/constraints from source (Spark's responsibility now) val tableInfo = new TableInfo.Builder() + .withColumns(sourceTable.columns()) + .withPartitions(sourceTable.partitioning()) + .withConstraints(sourceTable.constraints()) .withProperties(emptyProps) .build() - catalog.createTableLike(dstIdent, sourceTable, tableInfo) + catalog.createTableLike(dstIdent, tableInfo, sourceTable) val dst = catalog.loadTable(dstIdent) assert(dst.constraints().toSet == constraints.toSet, - "connector should copy source constraints from sourceTable.constraints()") + "constraints from source should be in tableInfo and applied to the target") } test("createTableLike: catalog without createTableLike override throws " + @@ -1485,10 +1498,13 @@ class CatalogSuite extends SparkFunSuite { val sourceTable = catalog.loadTable(srcIdent) val tableInfo = new TableInfo.Builder() + .withColumns(sourceTable.columns()) + .withPartitions(sourceTable.partitioning()) + .withConstraints(sourceTable.constraints()) .withProperties(Map.empty[String, String].asJava) .build() val ex = intercept[UnsupportedOperationException] { - catalog.createTableLike(dstIdent, sourceTable, tableInfo) + catalog.createTableLike(dstIdent, tableInfo, sourceTable) } assert(ex.getMessage.contains("basic"), "Exception should mention the catalog name") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 81f4717fc4dfc..0e44a2ac50dd7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -24,9 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ -import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow} +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure} import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} @@ -241,24 +240,15 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp override def createTableLike( ident: Identifier, - sourceTable: Table, - userSpecifiedOverrides: TableInfo): Table = { - // Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve - // CHAR/VARCHAR types as declared rather than collapsed to StringType. - val columns = sourceTable match { - case v1: V1Table => - CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema)) - case _ => - sourceTable.columns() - } - // Merge source properties with user overrides (user overrides win), then set current user - // as owner (overrides source owner). Connectors are responsible for setting the owner. + tableInfo: TableInfo, + sourceTable: Table): Table = { + // columns, partitioning, constraints, explicit properties, and owner are all provided in + // tableInfo by Spark. Merge source properties so that connector-specific custom state + // (e.g. format.version, format.feature) is cloned; tableInfo properties win on conflict. val mergedProps = - (sourceTable.properties().asScala ++ - userSpecifiedOverrides.properties().asScala ++ - Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava - createTable(ident, columns, sourceTable.partitioning(), mergedProps, - Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints()) + (sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava + createTable(ident, tableInfo.columns(), tableInfo.partitions(), mergedProps, + Distributions.unspecified(), Array.empty, None, None, tableInfo.constraints()) } override def capabilities: java.util.Set[TableCatalogCapability] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala index c4930b45375b4..7a815d7f418e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableLikeExec.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableInfo} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, Table, TableCatalog, TableInfo, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors @@ -38,11 +39,11 @@ import org.apache.spark.sql.errors.QueryCompilationErrors * must override [[TableCatalog.createTableLike]]; the default implementation throws * [[UnsupportedOperationException]]. * - * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified - * overrides: TBLPROPERTIES, LOCATION, and USING provider (only if explicitly given). - * Schema, partitioning, source provider, source TBLPROPERTIES, constraints, and owner are NOT - * pre-populated; connectors read all source metadata directly from [[sourceTable]] and are - * responsible for setting the owner. + * The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains all explicit information + * for the new table: columns and partitioning copied from the source, constraints copied from + * the source, user-specified TBLPROPERTIES / LOCATION / USING provider (if given), and + * [[TableCatalog.PROP_OWNER]] set to the current user. Source table properties are intentionally + * excluded so that connectors can decide which custom properties to clone via [[sourceTable]]. */ case class CreateTableLikeExec( targetCatalog: TableCatalog, @@ -57,23 +58,14 @@ case class CreateTableLikeExec( override protected def run(): Seq[InternalRow] = { if (!targetCatalog.tableExists(targetIdent)) { - // Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given), - // and USING provider (if given). Provider and owner are not included here; connectors - // are responsible for reading PROP_PROVIDER from sourceTable.properties() and for - // setting the owner via CurrentUserContext.getCurrentUser. - val locationProp: Option[(String, String)] = - location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)) - - val finalProps = - properties ++ - provider.map(TableCatalog.PROP_PROVIDER -> _) ++ - locationProp - try { - val userSpecifiedOverrides = new TableInfo.Builder() - .withProperties(finalProps.asJava) + val tableInfo = new TableInfo.Builder() + .withColumns(targetColumns) + .withPartitions(sourceTable.partitioning) + .withConstraints(sourceTable.constraints) + .withProperties(targetProperties.asJava) .build() - targetCatalog.createTableLike(targetIdent, sourceTable, userSpecifiedOverrides) + targetCatalog.createTableLike(targetIdent, tableInfo, sourceTable) } catch { case _: TableAlreadyExistsException if ifNotExists => logWarning( @@ -85,4 +77,22 @@ case class CreateTableLikeExec( Seq.empty } + + // Derive target columns from source; for V1Table sources apply CharVarcharUtils to preserve + // CHAR/VARCHAR types as declared rather than collapsed to StringType. + private def targetColumns: Array[Column] = + sourceTable match { + case v1: V1Table => + CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema)) + case _ => + sourceTable.columns + } + + // Source table properties are intentionally excluded; connectors read sourceTable + // to clone any additional format-specific or custom state they need. + private def targetProperties: Map[String, String] = + CatalogV2Util.withDefaultOwnership( + properties ++ + provider.map(TableCatalog.PROP_PROVIDER -> _) ++ + location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala index a67831cf63503..77028e25d2f4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/CreateTableLikeSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog, TableCatalog} import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType} class CreateTableLikeSuite extends DatasourceV2SQLBase { @@ -167,6 +167,33 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase { } } + test("PROP_OWNER is set to current user in TableInfo passed to connector") { + // Spark sets PROP_OWNER in the TableInfo it passes to createTableLike so that + // connectors do not need to call a Catalyst utility to determine the owner. + withTable("testcat.src", "testcat.dst") { + sql("CREATE TABLE testcat.src (id bigint) USING foo") + sql("CREATE TABLE testcat.dst LIKE testcat.src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + assert(dst.properties.containsKey(TableCatalog.PROP_OWNER), + "PROP_OWNER should be set in TableInfo so connectors do not need to infer it") + assert(dst.properties.get(TableCatalog.PROP_OWNER).nonEmpty) + } + } + + test("columns and partitioning from source are set in TableInfo passed to connector") { + withTable("src", "testcat.dst") { + sql("CREATE TABLE src (id bigint, data string) USING parquet PARTITIONED BY (data)") + sql("CREATE TABLE testcat.dst LIKE src") + + val dst = testCatalog.loadTable(Identifier.of(Array(), "dst")) + val columnNames = dst.columns().map(_.name) + assert(columnNames === Array("id", "data")) + assert(dst.partitioning().nonEmpty, + "partitioning from source should be passed in TableInfo") + } + } + test("user-specified TBLPROPERTIES are applied on target") { withTable("src", "testcat.dst") { sql("CREATE TABLE src (id bigint) USING parquet")