Skip to content

[SPARK-56174][SQL] Complete V2 file write path for DataFrame API#55091

Draft
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-56174
Draft

[SPARK-56174][SQL] Complete V2 file write path for DataFrame API#55091
LuciferYang wants to merge 3 commits intoapache:masterfrom
LuciferYang:SPARK-56174

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR is part of SPARK-56170. It removes the remaining V1 fallbacks in DataFrameWriter.lookupV2Provider() so that all save modes (ErrorIfExists, Ignore, Append, Overwrite) and partitioned writes go through the V2 file write path, and enables INSERT INTO format.\path`` syntax via V2.

1. ErrorIfExists/Ignore via V2

Routes ErrorIfExists and Ignore save modes into the V2 write path with a pre-write path-existence check matching V1 semantics (InsertIntoHadoopFsRelationCommand):

  • ErrorIfExists + path exists: throws PATH_ALREADY_EXISTS
  • Ignore + path exists: returns LocalRelation (no-op)
  • Path doesn't exist: writes via AppendData

2. Partitioned writes via V2

  • FileDataSourceV2.getTable: extracts partition column names from Transform array and sets FileTable.userSpecifiedPartitioning
  • FileTable.createFileWriteBuilder: case-insensitive partition column lookup with .copy(name = c) to preserve partitionBy argument case for directory names
  • FileWrite.createWriteJobDescription: uses partition column names from partitionSchema (not allColumns) to ensure directory names match partitionBy case

3. FileWrite improvements

  • Implements RequiresDistributionAndOrdering with ascending sort by partition columns, ensuring DynamicPartitionDataSingleWriter sees contiguous partition values
  • Skips partition columns in data type validation (partition columns are directory names, not data values)
  • Name-based column filtering (partitionSet.contains(col.name)) instead of object identity
  • Evaluates description before setupJob (fixes Parquet summary configuration ordering)

4. INSERT INTO format.`path` via V2

Updates ResolveSQLOnFile to resolve INSERT INTO parquet.\/path`andSELECT FROM format.`path`via V2 when aFileDataSourceV2` provider is available.

5. Aggregate pushdown case sensitivity fix

Fixes AggregatePushDownUtils.isPartitionCol and getStructFieldForCol to use case-insensitive matching, resolving aggregate pushdown failures when partition directory names differ in case from query column references.

6. CSV-specific fixes

  • CSVTable.allowDuplicatedColumnNames = true: allows CSV writes with duplicate column names (matching V1 CSVFileFormat.allowDuplicatedColumnNames)
  • CSVTable.supportsWriteDataType: rejects VariantType for writes while allowing it for reads (matching V1's supportDataType vs supportReadDataType distinction)

7. Infrastructure fixes

  • DataSourceV2Strategy.refreshCache: propagates data source options to Hadoop config via r.options, guards against empty rootPaths
  • FileDataSourceV2.getTable: reuses cached table from inferSchema() when available for correct MetadataLogFileIndex behavior (streaming sink output)
  • FileTable.schema: skips column name duplication check for formats with allowDuplicatedColumnNames; validates data types for non-partition columns only

Why are the changes needed?

This completes the V2 file write path for the DataFrame API, eliminating the last V1 fallbacks in DataFrameWriter.lookupV2Provider(). All file source writes now go through the V2 infrastructure, enabling consistent behavior and paving the way for future V2 enhancements (streaming writes, bucketing, etc.).

Does this PR introduce any user-facing change?

No. The behavior is functionally equivalent to V1 for all save modes and partitioned writes. Directory names now use the partitionBy argument case (matching V1 behavior).

How was this patch tested?

  • New tests: ErrorIfExists mode, Ignore mode, INSERT INTO format.\path`, SELECT FROM format.`path``

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 4.6

…Frame API writes and delete FallBackFileSourceV2

Key changes:
- FileWrite: added partitionSchema, customPartitionLocations,
  dynamicPartitionOverwrite, isTruncate; path creation and truncate
  logic; dynamic partition overwrite via FileCommitProtocol
- FileTable: createFileWriteBuilder with SupportsDynamicOverwrite
  and SupportsTruncate; capabilities now include TRUNCATE and
  OVERWRITE_DYNAMIC; fileIndex skips file existence checks when
  userSpecifiedSchema is provided (write path)
- All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use
  createFileWriteBuilder with partition/truncate/overwrite support
- DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for
  non-partitioned Append and Overwrite via df.write.save(path)
- DataFrameWriter.insertInto: V1 fallback for file sources
  (TODO: SPARK-56175)
- DataFrameWriter.saveAsTable: V1 fallback for file sources
  (TODO: SPARK-56230, needs StagingTableCatalog)
- DataSourceV2Utils.getTableProvider: V1 fallback for file sources
  (TODO: SPARK-56175)
- Removed FallBackFileSourceV2 rule
- V2SessionCatalog.createTable: V1 FileFormat data type validation
@LuciferYang LuciferYang marked this pull request as draft March 30, 2026 09:11
@LuciferYang
Copy link
Copy Markdown
Contributor Author

This is the third patch, modified on the basis of #54998 and #55034. The changes for this patch are in commit 6f4b9f3.

@LuciferYang
Copy link
Copy Markdown
Contributor Author

#55034 can be improved. Let me go back and revise it first, then come back to update this PR.

…catalog table loading, and gate removal

Key changes:
- FileTable extends SupportsPartitionManagement with createPartition,
  dropPartition, listPartitionIdentifiers, partitionSchema
- Partition operations sync to catalog metastore (best-effort)
- V2SessionCatalog.loadTable returns FileTable instead of V1Table,
  sets catalogTable and useCatalogFileIndex on FileTable
- V2SessionCatalog.getDataSourceOptions includes storage.properties
  for proper option propagation (header, ORC bloom filter, etc.)
- V2SessionCatalog.createTable validates data types via FileTable
- FileTable.columns() restores NOT NULL constraints from catalogTable
- FileTable.partitioning() falls back to userSpecifiedPartitioning
  or catalog partition columns
- FileTable.fileIndex uses CatalogFileIndex when catalog has
  registered partitions (custom partition locations)
- FileTable.schema checks column name duplication for non-catalog
  tables only
- DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate
- DataFrameWriter.insertInto: enabled V2 for file sources
- DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230)
- ResolveSessionCatalog: V1 fallback for FileTable-backed commands
  (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition,
  ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions,
  DropPartitions, SetTableLocation, CREATE TABLE validation,
  REPLACE TABLE blocking)
- FindDataSourceTable: streaming V1 fallback for FileTable
  (TODO: SPARK-56233)
- DataSource.planForWritingFileFormat: graceful V2 handling
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant