From 60c6f2e209b9ec43a36e288fb3d02377199b9c15 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Apr 2026 11:32:27 +0800 Subject: [PATCH] fix --- .../sql/catalyst/analysis/Analyzer.scala | 6 +++++ ...olveColumnDefaultInCommandInputQuery.scala | 7 +++++- .../sql/ResolveDefaultColumnsSuite.scala | 24 +++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8ef7e8bdc083f..147fadcb46e3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1408,6 +1408,12 @@ class Analyzer( // to resolve column "DEFAULT" in the child plans so that they must be unresolved. case s: SetVariable => resolveColumnDefaultInCommandInputQuery(s) + // SPARK-43752: resolve column "DEFAULT" in V2 write commands before the + // query is fully resolved, matching the InsertIntoStatement behavior above. + case v2: V2WriteCommand + if v2.table.resolved && v2.query.containsPattern(UNRESOLVED_ATTRIBUTE) => + resolveColumnDefaultInCommandInputQuery(v2) + // Skip FetchCursor - let ResolveFetchCursor handle variable resolution // This prevents ResolveReferences from trying to resolve target variables as columns case s: SingleStatement if s.parsedPlan.isInstanceOf[FetchCursor] => s diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInCommandInputQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInCommandInputQuery.scala index 47cf817c2a862..03db3fae2fc85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInCommandInputQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInCommandInputQuery.scala @@ -43,7 +43,6 @@ import org.apache.spark.sql.types.StructField class ResolveColumnDefaultInCommandInputQuery(val catalogManager: CatalogManager) extends SQLConfHelper with ColumnResolutionHelper { - // TODO (SPARK-43752): support v2 write commands as well. def apply(plan: LogicalPlan): LogicalPlan = plan match { case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved && i.query.containsPattern(UNRESOLVED_ATTRIBUTE) => @@ -90,6 +89,12 @@ class ResolveColumnDefaultInCommandInputQuery(val catalogManager: CatalogManager // defined for the n-th variable of the SET. s.withNewChildren(Seq(resolveColumnDefault(s.sourceQuery, expectedQuerySchema))) + // SPARK-43752: resolve column "DEFAULT" in V2 write commands. + case v2: V2WriteCommand if conf.enableDefaultColumns && v2.table.resolved && + v2.query.containsPattern(UNRESOLVED_ATTRIBUTE) => + val expectedQuerySchema = v2.table.schema + v2.withNewQuery(resolveColumnDefault(v2.query, expectedQuerySchema)) + case _ => plan } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala index cb9d0909554b4..5ad945dd75b82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala @@ -308,4 +308,28 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(0, user))) } } + + test("SPARK-43752: column DEFAULT in V2 write commands") { + withSQLConf( + "spark.sql.catalog.testcat" -> + classOf[connector.catalog.InMemoryTableCatalog].getName) { + sql("CREATE TABLE testcat.t(c1 INT DEFAULT 42, c2 STRING DEFAULT 'hello')") + // INSERT INTO (AppendData) with DEFAULT + sql("INSERT INTO testcat.t VALUES (1, DEFAULT)") + checkAnswer( + sql("SELECT * FROM testcat.t"), + Row(1, "hello")) + // INSERT INTO with all DEFAULT + sql("INSERT INTO testcat.t VALUES (DEFAULT, DEFAULT)") + checkAnswer( + sql("SELECT * FROM testcat.t ORDER BY c1"), + Seq(Row(1, "hello"), Row(42, "hello"))) + // INSERT OVERWRITE (OverwriteByExpression) with DEFAULT + sql("INSERT OVERWRITE testcat.t VALUES (100, DEFAULT)") + checkAnswer( + sql("SELECT * FROM testcat.t"), + Row(100, "hello")) + sql("DROP TABLE testcat.t") + } + } }