Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -299,24 +299,22 @@ default Table createTable(Identifier ident, TableInfo tableInfo)
* Create a table in the catalog by copying metadata from an existing source table.
* <p>
* This method is called for {@code CREATE TABLE ... LIKE ...} statements targeting this catalog.
* The {@code userSpecifiedOverrides} parameter contains strictly user-specified overrides:
* TBLPROPERTIES, LOCATION, and the USING provider (only if explicitly specified).
* It does NOT contain schema, partitioning, properties, constraints, or owner from the source
* table. Connectors must read all source metadata directly from {@code sourceTable}, including
* columns ({@link Table#columns()}), partitioning ({@link Table#partitioning()}),
* constraints ({@link Table#constraints()}), and format-specific properties
* ({@link Table#properties()}). Connectors are also responsible for setting the owner of the
* new table (e.g. via {@code org.apache.spark.sql.catalyst.CurrentUserContext#getCurrentUser}).
* The {@code tableInfo} parameter contains all the explicit information for the new table:
* columns and partitioning copied from the source, any constraints copied from the source,
* user-specified TBLPROPERTIES / LOCATION / USING provider (if given), and
* {@link #PROP_OWNER} set to the current user. Source table properties are intentionally
* excluded from {@code tableInfo}; connectors may read {@code sourceTable.properties()} to
* clone additional format-specific or custom state as appropriate for their implementation.
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors that
* support {@code CREATE TABLE ... LIKE ...} must override this method.
*
* @param ident a table identifier for the new table
* @param sourceTable the resolved source table; connectors read schema, partitioning,
* constraints, properties, and any format-specific metadata from this object
* @param userSpecifiedOverrides strictly user-specified overrides: TBLPROPERTIES, LOCATION,
* and USING provider (if explicitly given); source schema,
* partitioning, provider, constraints, and owner are NOT included
* @param tableInfo complete description of the new table: columns, partitioning, constraints,
* explicit properties (user overrides + owner); source table properties
* are NOT included
* @param sourceTable the resolved source table; connectors may read format-specific properties
* or other custom state from this object to clone additional metadata
* @return metadata for the new table
*
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
Expand All @@ -325,7 +323,7 @@ default Table createTable(Identifier ident, TableInfo tableInfo)
* @since 4.2.0
*/
default Table createTableLike(
Identifier ident, Table sourceTable, TableInfo userSpecifiedOverrides)
Identifier ident, TableInfo tableInfo, Table sourceTable)
throws TableAlreadyExistsException, NoSuchNamespaceException {
throw new UnsupportedOperationException(name() + " does not support CREATE TABLE LIKE");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1389,12 +1389,15 @@ class CatalogSuite extends SparkFunSuite {
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// tableInfo contains only user overrides; schema and partitioning come from sourceTable
// tableInfo has columns/partitions/constraints from source plus user-specified properties
val overrides = Map("user.key" -> "user.value").asJava
val tableInfo = new TableInfo.Builder()
.withColumns(sourceTable.columns())
.withPartitions(sourceTable.partitioning())
.withConstraints(sourceTable.constraints())
.withProperties(overrides)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
catalog.createTableLike(dstIdent, tableInfo, sourceTable)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("user.key") == "user.value",
Expand All @@ -1410,11 +1413,14 @@ class CatalogSuite extends SparkFunSuite {
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// tableInfo contains no overrides; connector reads schema and properties from sourceTable
// tableInfo has columns/partitions/constraints from source; no explicit property overrides
val tableInfo = new TableInfo.Builder()
.withColumns(sourceTable.columns())
.withPartitions(sourceTable.partitioning())
.withConstraints(sourceTable.constraints())
.withProperties(emptyProps)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
catalog.createTableLike(dstIdent, tableInfo, sourceTable)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("format.version") == "2",
Expand All @@ -1432,12 +1438,15 @@ class CatalogSuite extends SparkFunSuite {
catalog.createTable(srcIdent, columns, emptyTrans, srcProps)
val sourceTable = catalog.loadTable(srcIdent)

// user explicitly overrides format.version
// user explicitly overrides format.version; columns/partitions/constraints from source
val overrides = Map("format.version" -> "2").asJava
val tableInfo = new TableInfo.Builder()
.withColumns(sourceTable.columns())
.withPartitions(sourceTable.partitioning())
.withConstraints(sourceTable.constraints())
.withProperties(overrides)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
catalog.createTableLike(dstIdent, tableInfo, sourceTable)

val dst = catalog.loadTable(dstIdent)
assert(dst.properties.asScala("format.version") == "2",
Expand All @@ -1460,14 +1469,18 @@ class CatalogSuite extends SparkFunSuite {
catalog.createTable(srcIdent, srcTableInfo)
val sourceTable = catalog.loadTable(srcIdent)

// tableInfo has columns/partitions/constraints from source (Spark's responsibility now)
val tableInfo = new TableInfo.Builder()
.withColumns(sourceTable.columns())
.withPartitions(sourceTable.partitioning())
.withConstraints(sourceTable.constraints())
.withProperties(emptyProps)
.build()
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
catalog.createTableLike(dstIdent, tableInfo, sourceTable)

val dst = catalog.loadTable(dstIdent)
assert(dst.constraints().toSet == constraints.toSet,
"connector should copy source constraints from sourceTable.constraints()")
"constraints from source should be in tableInfo and applied to the target")
}

test("createTableLike: catalog without createTableLike override throws " +
Expand All @@ -1485,10 +1498,13 @@ class CatalogSuite extends SparkFunSuite {
val sourceTable = catalog.loadTable(srcIdent)

val tableInfo = new TableInfo.Builder()
.withColumns(sourceTable.columns())
.withPartitions(sourceTable.partitioning())
.withConstraints(sourceTable.constraints())
.withProperties(Map.empty[String, String].asJava)
.build()
val ex = intercept[UnsupportedOperationException] {
catalog.createTableLike(dstIdent, sourceTable, tableInfo)
catalog.createTableLike(dstIdent, tableInfo, sourceTable)
}
assert(ex.getMessage.contains("basic"),
"Exception should mention the catalog name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.jdk.CollectionConverters._

import org.apache.spark.sql.catalyst.{CurrentUserContext, InternalRow}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.catalog.procedures.{BoundProcedure, ProcedureParameter, UnboundProcedure}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
Expand Down Expand Up @@ -241,24 +240,15 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp

override def createTableLike(
ident: Identifier,
sourceTable: Table,
userSpecifiedOverrides: TableInfo): Table = {
// Read schema from source. For V1Table sources, apply CharVarcharUtils to preserve
// CHAR/VARCHAR types as declared rather than collapsed to StringType.
val columns = sourceTable match {
case v1: V1Table =>
CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema))
case _ =>
sourceTable.columns()
}
// Merge source properties with user overrides (user overrides win), then set current user
// as owner (overrides source owner). Connectors are responsible for setting the owner.
tableInfo: TableInfo,
sourceTable: Table): Table = {
// columns, partitioning, constraints, explicit properties, and owner are all provided in
// tableInfo by Spark. Merge source properties so that connector-specific custom state
// (e.g. format.version, format.feature) is cloned; tableInfo properties win on conflict.
val mergedProps =
(sourceTable.properties().asScala ++
userSpecifiedOverrides.properties().asScala ++
Map(TableCatalog.PROP_OWNER -> CurrentUserContext.getCurrentUser)).asJava
createTable(ident, columns, sourceTable.partitioning(), mergedProps,
Distributions.unspecified(), Array.empty, None, None, sourceTable.constraints())
(sourceTable.properties().asScala ++ tableInfo.properties().asScala).asJava
createTable(ident, tableInfo.columns(), tableInfo.partitions(), mergedProps,
Distributions.unspecified(), Array.empty, None, None, tableInfo.constraints())
}

override def capabilities: java.util.Set[TableCatalogCapability] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableInfo}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, Table, TableCatalog, TableInfo, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.errors.QueryCompilationErrors

Expand All @@ -38,11 +39,11 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
* must override [[TableCatalog.createTableLike]]; the default implementation throws
* [[UnsupportedOperationException]].
*
* The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains strictly user-specified
* overrides: TBLPROPERTIES, LOCATION, and USING provider (only if explicitly given).
* Schema, partitioning, source provider, source TBLPROPERTIES, constraints, and owner are NOT
* pre-populated; connectors read all source metadata directly from [[sourceTable]] and are
* responsible for setting the owner.
* The [[TableInfo]] passed to [[TableCatalog.createTableLike]] contains all explicit information
* for the new table: columns and partitioning copied from the source, constraints copied from
* the source, user-specified TBLPROPERTIES / LOCATION / USING provider (if given), and
* [[TableCatalog.PROP_OWNER]] set to the current user. Source table properties are intentionally
* excluded so that connectors can decide which custom properties to clone via [[sourceTable]].
*/
case class CreateTableLikeExec(
targetCatalog: TableCatalog,
Expand All @@ -57,23 +58,14 @@ case class CreateTableLikeExec(

override protected def run(): Seq[InternalRow] = {
if (!targetCatalog.tableExists(targetIdent)) {
// Build strictly user-specified overrides: explicit TBLPROPERTIES, LOCATION (if given),
// and USING provider (if given). Provider and owner are not included here; connectors
// are responsible for reading PROP_PROVIDER from sourceTable.properties() and for
// setting the owner via CurrentUserContext.getCurrentUser.
val locationProp: Option[(String, String)] =
location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri))

val finalProps =
properties ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
locationProp

try {
val userSpecifiedOverrides = new TableInfo.Builder()
.withProperties(finalProps.asJava)
val tableInfo = new TableInfo.Builder()
.withColumns(targetColumns)
.withPartitions(sourceTable.partitioning)
.withConstraints(sourceTable.constraints)
.withProperties(targetProperties.asJava)
.build()
targetCatalog.createTableLike(targetIdent, sourceTable, userSpecifiedOverrides)
targetCatalog.createTableLike(targetIdent, tableInfo, sourceTable)
} catch {
case _: TableAlreadyExistsException if ifNotExists =>
logWarning(
Expand All @@ -85,4 +77,22 @@ case class CreateTableLikeExec(

Seq.empty
}

// Derive target columns from source; for V1Table sources apply CharVarcharUtils to preserve
// CHAR/VARCHAR types as declared rather than collapsed to StringType.
private def targetColumns: Array[Column] =
sourceTable match {
case v1: V1Table =>
CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(v1.catalogTable.schema))
case _ =>
sourceTable.columns
}

// Source table properties are intentionally excluded; connectors read sourceTable
// to clone any additional format-specific or custom state they need.
private def targetProperties: Map[String, String] =
CatalogV2Util.withDefaultOwnership(
properties ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
location.map(uri => TableCatalog.PROP_LOCATION -> CatalogUtils.URIToString(uri)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.connector

import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, InMemoryCatalog, TableCatalog}
import org.apache.spark.sql.types.{CharType, IntegerType, LongType, StringType, VarcharType}

class CreateTableLikeSuite extends DatasourceV2SQLBase {
Expand Down Expand Up @@ -167,6 +167,33 @@ class CreateTableLikeSuite extends DatasourceV2SQLBase {
}
}

test("PROP_OWNER is set to current user in TableInfo passed to connector") {
// Spark sets PROP_OWNER in the TableInfo it passes to createTableLike so that
// connectors do not need to call a Catalyst utility to determine the owner.
withTable("testcat.src", "testcat.dst") {
sql("CREATE TABLE testcat.src (id bigint) USING foo")
sql("CREATE TABLE testcat.dst LIKE testcat.src")

val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
assert(dst.properties.containsKey(TableCatalog.PROP_OWNER),
"PROP_OWNER should be set in TableInfo so connectors do not need to infer it")
assert(dst.properties.get(TableCatalog.PROP_OWNER).nonEmpty)
}
}

test("columns and partitioning from source are set in TableInfo passed to connector") {
withTable("src", "testcat.dst") {
sql("CREATE TABLE src (id bigint, data string) USING parquet PARTITIONED BY (data)")
sql("CREATE TABLE testcat.dst LIKE src")

val dst = testCatalog.loadTable(Identifier.of(Array(), "dst"))
val columnNames = dst.columns().map(_.name)
assert(columnNames === Array("id", "data"))
assert(dst.partitioning().nonEmpty,
"partitioning from source should be passed in TableInfo")
}
}

test("user-specified TBLPROPERTIES are applied on target") {
withTable("src", "testcat.dst") {
sql("CREATE TABLE src (id bigint) USING parquet")
Expand Down