Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,12 @@ class Analyzer(
// to resolve column "DEFAULT" in the child plans so that they must be unresolved.
case s: SetVariable => resolveColumnDefaultInCommandInputQuery(s)

// SPARK-43752: resolve column "DEFAULT" in V2 write commands before the
// query is fully resolved, matching the InsertIntoStatement behavior above.
case v2: V2WriteCommand
if v2.table.resolved && v2.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
resolveColumnDefaultInCommandInputQuery(v2)

// Skip FetchCursor - let ResolveFetchCursor handle variable resolution
// This prevents ResolveReferences from trying to resolve target variables as columns
case s: SingleStatement if s.parsedPlan.isInstanceOf[FetchCursor] => s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.sql.types.StructField
class ResolveColumnDefaultInCommandInputQuery(val catalogManager: CatalogManager)
extends SQLConfHelper with ColumnResolutionHelper {

// TODO (SPARK-43752): support v2 write commands as well.
def apply(plan: LogicalPlan): LogicalPlan = plan match {
case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
Expand Down Expand Up @@ -90,6 +89,12 @@ class ResolveColumnDefaultInCommandInputQuery(val catalogManager: CatalogManager
// defined for the n-th variable of the SET.
s.withNewChildren(Seq(resolveColumnDefault(s.sourceQuery, expectedQuerySchema)))

// SPARK-43752: resolve column "DEFAULT" in V2 write commands.
case v2: V2WriteCommand if conf.enableDefaultColumns && v2.table.resolved &&
v2.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
val expectedQuerySchema = v2.table.schema
v2.withNewQuery(resolveColumnDefault(v2.query, expectedQuerySchema))

case _ => plan
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,28 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(0, user)))
}
}

test("SPARK-43752: column DEFAULT in V2 write commands") {
withSQLConf(
"spark.sql.catalog.testcat" ->
classOf[connector.catalog.InMemoryTableCatalog].getName) {
sql("CREATE TABLE testcat.t(c1 INT DEFAULT 42, c2 STRING DEFAULT 'hello')")
// INSERT INTO (AppendData) with DEFAULT
sql("INSERT INTO testcat.t VALUES (1, DEFAULT)")
checkAnswer(
sql("SELECT * FROM testcat.t"),
Row(1, "hello"))
// INSERT INTO with all DEFAULT
sql("INSERT INTO testcat.t VALUES (DEFAULT, DEFAULT)")
checkAnswer(
sql("SELECT * FROM testcat.t ORDER BY c1"),
Seq(Row(1, "hello"), Row(42, "hello")))
// INSERT OVERWRITE (OverwriteByExpression) with DEFAULT
sql("INSERT OVERWRITE testcat.t VALUES (100, DEFAULT)")
checkAnswer(
sql("SELECT * FROM testcat.t"),
Row(100, "hello"))
sql("DROP TABLE testcat.t")
}
}
}