Skip to content

Commit 909ea96

Browse files
ueshincloud-fan
authored andcommitted
[SPARK-51451][SQL] Fix ExtractGenerator to wait for UnresolvedStarWithColumns to be resolved
### What changes were proposed in this pull request? Fixes `ExtractGenerator` to wait for `UnresolvedStarWithColumns` to be resolved. ### Why are the changes needed? `df.withColumn` is now analyzed in the analyzer, it causes `ExtractGenerator` rule to misunderstand that the generator is nested. This happens with Spark Connect more often because Spark Classic usually can resolve `UnresolvedStarWithColumns` before `ExtractGenerator` rule, whereas Spark Connect sometimes needs several iteration of resolving rules. ```py from pyspark.sql.functions import * df = spark.createDataFrame([("082017",)], ['dt']) df_dt = df.select(date_format(to_date(col("dt"), "MMyyyy"), "MM/dd/yyyy").alias("dt")) monthArray = [lit(x) for x in range(0, 12)] df_month_y = df_dt.withColumn("month_y", explode(array(monthArray))) df_month_y.show() ``` ``` pyspark.errors.exceptions.connect.AnalysisException: [UNSUPPORTED_GENERATOR.NESTED_IN_EXPRESSIONS] The generator is not supported: nested in expressions "unresolvedstarwithcolumns(explode(array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)))". SQLSTATE: 42K0E ``` Its parsed plan is: ``` == Parsed Logical Plan == 'Project [unresolvedstarwithcolumns(month_y, 'explode('array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)), Some(List({})))] +- 'Project ['date_format('to_date('dt, MMyyyy), MM/dd/yyyy) AS dt#5] +- 'UnresolvedSubqueryColumnAliases [dt] +- LocalRelation [dt#4] ``` Here `explode` is nested in `unresolvedstarwithcolumns`. ### Does this PR introduce _any_ user-facing change? Yes, `df.withColumn` with generators will be back available. ### How was this patch tested? Added the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50286 from ueshin/issues/SPARK-51451/with_column_generator. Authored-by: Takuya Ueshin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 0de6f29) Signed-off-by: Wenchen Fan <[email protected]>
1 parent f96ae0c commit 909ea96

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

python/pyspark/sql/tests/test_dataframe.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from contextlib import redirect_stdout
2727

2828
from pyspark.sql import Row, functions, DataFrame
29-
from pyspark.sql.functions import col, lit, count, struct
29+
from pyspark.sql.functions import col, lit, count, struct, date_format, to_date, array, explode
3030
from pyspark.sql.types import (
3131
StringType,
3232
IntegerType,
@@ -1076,6 +1076,32 @@ def test_metadata_column(self):
10761076
[Row(0), Row(0), Row(0)],
10771077
)
10781078

1079+
def test_with_column_and_generator(self):
1080+
# SPARK-51451: Generators should be available with withColumn
1081+
df = self.spark.createDataFrame([("082017",)], ["dt"]).select(
1082+
to_date(col("dt"), "MMyyyy").alias("dt")
1083+
)
1084+
df_dt = df.withColumn("dt", date_format(col("dt"), "MM/dd/yyyy"))
1085+
monthArray = [lit(x) for x in range(0, 12)]
1086+
df_month_y = df_dt.withColumn("month_y", explode(array(monthArray)))
1087+
1088+
assertDataFrameEqual(
1089+
df_month_y,
1090+
[Row(dt="08/01/2017", month_y=i) for i in range(12)],
1091+
)
1092+
1093+
df_dt_month_y = df.withColumns(
1094+
{
1095+
"dt": date_format(col("dt"), "MM/dd/yyyy"),
1096+
"month_y": explode(array(monthArray)),
1097+
}
1098+
)
1099+
1100+
assertDataFrameEqual(
1101+
df_dt_month_y,
1102+
[Row(dt="08/01/2017", month_y=i) for i in range(12)],
1103+
)
1104+
10791105

10801106
class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
10811107
def test_query_execution_unsupported_in_classic(self):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2999,6 +2999,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
29992999

30003000
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
30013001
_.containsPattern(GENERATOR), ruleId) {
3002+
case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) =>
3003+
// UnresolvedStarWithColumns should be resolved before extracting.
3004+
p
3005+
30023006
case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
30033007
val nestedGenerator = projectList.find(hasNestedGenerator).get
30043008
throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,8 @@ case class UnresolvedStarWithColumns(
776776

777777
replacedAndExistingColumns ++ newColumns
778778
}
779+
780+
override def toString: String = super[Expression].toString
779781
}
780782

781783
/**
@@ -812,6 +814,8 @@ case class UnresolvedStarWithColumnsRenames(
812814
)
813815
}
814816
}
817+
818+
override def toString: String = super[LeafExpression].toString
815819
}
816820

817821
/**

0 commit comments

Comments
 (0)