Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions SPARK-56170-static-partition-overwrite-jira.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
### Title

[SQL] Support static partition overwrite for V2 file tables (SupportsOverwriteV2)

### Description

#### Background

After SPARK-56171 removed the V2 file write gate, `INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...` on file tables goes through the V2 write path. This creates an `OverwriteByExpression` with a partition filter predicate (e.g., `p <=> 1`), which requires the WriteBuilder to implement `SupportsOverwriteV2` and the table to declare `OVERWRITE_BY_FILTER` capability.

Currently, FileTable's WriteBuilder only implements `SupportsTruncate` (full table overwrite) and `SupportsDynamicOverwrite`, so static partition overwrite fails with:

```
[UNSUPPORTED_OVERWRITE.TABLE] Can't overwrite the target table ... by a filter other than "true".
```

#### Proposed Changes

1. **FileTable.createFileWriteBuilder**: Add `SupportsOverwriteV2` to the WriteBuilder. Implement `canOverwrite()` to accept partition-column equality predicates, and `overwrite()` to store the predicates for use at write time.

2. **FileWrite.toBatch()**: Extend overwrite logic to support partition-level directory deletion. Parse the stored predicates to extract the static partition spec, compute the matching partition path (e.g., `/table/p=1`), and delete only that subdirectory before writing — matching V1's `InsertIntoHadoopFsRelationCommand.deleteMatchingPartitions` behavior.

3. **FileTable.CAPABILITIES**: Add `OVERWRITE_BY_FILTER`.

4. **Format Write classes**: Plumb the overwrite predicates parameter through all 6 format Write classes (Parquet, ORC, CSV, JSON, Text, Avro), following the same pattern as `bucketSpec` in SPARK-56177.

#### Why

This is a prerequisite for SPARK-56304 (V2 ifPartitionNotExists for INSERT INTO), and completes the V2 static partition overwrite support that was missing after removing the file source V2 gate (SPARK-56170).

#### Scope

~4-6 source files changed, ~100-150 lines added. Low risk — follows existing V1 patterns and V2 SupportsOverwriteV2 contract.
146 changes: 146 additions & 0 deletions SPARK-56170-status.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# SPARK-56170: Remove File Source V2 Gate — Status & TODOs

## Current Branch State

Worktree: `.claude/worktrees/spark-56171-combined`
Branch: `SPARK-56231`

```
16837d37a68 [SPARK-56231][SQL] Bucket pruning and bucket join optimization for V2 file read path
8ea17ebebf8 [SPARK-56304][SQL] V2 ifPartitionNotExists support for file table INSERT INTO
d43b2694f8f [SPARK-56316][SQL] Support static partition overwrite for V2 file tables
a1742f5445c [SPARK-56178][SQL] MSCK REPAIR TABLE for V2 file tables
0a28b6fb9c7 [SPARK-56177][SQL] V2 file bucketing write support
ccd72729c13 [SPARK-56176][SQL] V2-native ANALYZE TABLE/COLUMN with stats propagation to FileScan
ef5bc4b4ccf [SPARK-56174][SQL] Complete V2 file write path for DataFrame API
6f858eee283 [SPARK-56175][SQL] FileTable implements SupportsPartitionManagement, catalog table loading, and gate removal
677a482be7f [SPARK-56171][SQL] Enable V2 file write path for non-partitioned DataFrame API writes and delete FallBackFileSourceV2
```

---

## Completed Subtasks

| Ticket | Description | Commit |
|--------|-------------|--------|
| SPARK-56171 | V2 file write foundation (FileWrite, createFileWriteBuilder, capabilities) | `677a482` |
| SPARK-56175 | SupportsPartitionManagement, V2 catalog table loading, saveAsTable, syncNewPartitionsToCatalog, V2-native partition DDL | `6f858ee` |
| SPARK-56174 | ErrorIfExists/Ignore modes, partitioned writes, INSERT INTO format.path, aggregate pushdown fix | `ef5bc4b` |
| SPARK-56176 | V2-native AnalyzeTableExec/AnalyzeColumnExec, stats propagation to FileScan via NUM_ROWS_KEY, auto-update totalSize after write | `ccd7272` |
| SPARK-56177 | V2 file bucketing write support via catalogTable.bucketSpec, DynamicPartitionDataConcurrentWriter for hash-based bucketing | `0a28b6f` |
| SPARK-56178 | MSCK REPAIR TABLE / ALTER TABLE RECOVER PARTITIONS for V2 file tables via RepairTableExec | `605f51f` |
| SPARK-56316 | Static partition overwrite (SupportsOverwriteV2) for V2 file tables | `d43b269` |
| SPARK-56304 | V2 ifPartitionNotExists for file table INSERT INTO | `8ea17eb` |
| SPARK-56231 | Bucket pruning and bucket join optimization for V2 file read path | `16837d3` |

### Merged into other tickets
| Original Ticket | Merged Into | Description |
|-----------------|-------------|-------------|
| SPARK-56232 | SPARK-56176 | Stats propagation to FileScan via NUM_ROWS_KEY |
| SPARK-56230 | SPARK-56175 | V2 saveAsTable for file sources |
| SPARK-56231 (partition DDL) | SPARK-56175 | V2-native partition DDL without V1 fallback |

---

## Remaining TODOs

### Codebase TODOs
```
sql/core/.../DataSourceStrategy.scala:363:
// TODO(SPARK-56233): Add MICRO_BATCH_READ capability to FileTable
```

### Open Subtasks Under SPARK-56170

Phase 2 — Write path completeness:

| Ticket | Description | Status | Complexity |
|--------|-------------|--------|------------|
| SPARK-56316 | Static partition overwrite (SupportsOverwriteV2) for V2 file tables | ✅ Done (`d43b269`) | M |
| SPARK-56304 | V2 ifPartitionNotExists for file table INSERT INTO | ✅ Done (`8ea17eb`) | M |
| SPARK-56230 / SPARK-43752 | DEFAULT column value support for V2 write commands | PR submitted: LuciferYang/spark#SPARK-43752 | M (catalyst-level, separate PR) |

Phase 3 — Read path enhancements:

| Ticket | Description | Status | Complexity |
|--------|-------------|--------|------------|
| SPARK-56231 | Bucket pruning and bucket join optimization for V2 file read path | ✅ Done (`16837d3`) | L — 26 files, 603 lines |

Phase 4 — Streaming support (~700-1200 LOC, no dsv2-8patches reference):

| Ticket | Description | Status | Complexity |
|--------|-------------|--------|------------|
| SPARK-56232 | V2 streaming read for FileTable (MICRO_BATCH_READ) | Not started | L |
| SPARK-56233 | V2 streaming write for FileTable (STREAMING_WRITE) | Not started | L (depends on SPARK-56232) |

---

## Deferred Improvements

### Pending reviewer feedback
- **SPARK-56175**: Refactor mutable `var` fields on `FileTable` (`catalogTable`, `useCatalogFileIndex`, `userSpecifiedPartitioning`) to constructor injection or builder pattern

### Out of scope for SPARK-56170
- **SPARK-56230 / SPARK-43752**: DEFAULT column value support for V2 write commands. PR submitted as separate change against upstream master. Once merged, SPARK-56170 branch inherits it after rebase.

---

## PR Status

| PR | Ticket | URL | Status |
|----|--------|-----|--------|
| #55034 | SPARK-56175 | https://github.com/apache/spark/pull/55034 | Draft |
| — | SPARK-56174 | — | Not submitted |
| — | SPARK-56176 | — | Not submitted |
| — | SPARK-56177 | — | Not submitted |
| — | SPARK-56178 | — | Not submitted |
| #55138 | SPARK-56316 | https://github.com/apache/spark/pull/55138 | Waiting CI |
| — | SPARK-56304 | — | Depends on SPARK-56316 |
| #55127 | SPARK-43752 | https://github.com/apache/spark/pull/55127 | Submitted (separate PR, upstream master) |

---

## Key Design Decisions

1. **V2-native partition DDL** — No V1 fallbacks in ResolveSessionCatalog for partition commands. FileTable's SupportsPartitionManagement handles ADD/DROP/SHOW/TRUNCATE/RENAME PARTITION natively via DataSourceV2Strategy.

2. **saveAsTable Overwrite** — Uses `OverwriteByExpression(Literal(true))` for existing file tables instead of `ReplaceTableAsSelect` (which requires StagingTableCatalog). Creates table via `CreateTableAsSelect` when table doesn't exist.

3. **Stats propagation** — ANALYZE TABLE stores stats via `TableCatalog.alterTable(TableChange.setProperty(...))`. Row count injected into FileScan via `FileTable.NUM_ROWS_KEY` option. Table size auto-updated after every V2 file write.

4. **syncNewPartitionsToCatalog** — After V2 file writes, auto-discovers new partitions on disk and registers them in catalog metastore (best-effort with warning logging).

5. **Streaming fallback** — FileTable lacks MICRO_BATCH_READ/STREAMING_WRITE. Streaming reads fall back to V1 via FindDataSourceTable. Estimated 700-1200 LOC to implement V2-native streaming (SPARK-56232/56233).

6. **Bucketed writes** — Uses DynamicPartitionDataConcurrentWriter because V2's RequiresDistributionAndOrdering cannot express hash-based ordering needed by DynamicPartitionDataSingleWriter. Bucket pruning/join optimization (read path) deferred to SPARK-56231.

7. **MSCK REPAIR TABLE** — RepairTableExec scans filesystem partitions via FileTable.listPartitionIdentifiers(), diffs with catalog, and adds/drops as needed. Also handles ALTER TABLE RECOVER PARTITIONS (add-only, no drop).

## Investigation Notes

### SPARK-56231: Bucket pruning/join for V2 read path

**Implementation plan (two phases):**

**Phase A — Bucket pruning (filter files by bucket ID):**
1. Add `bucketSpec: Option[BucketSpec]` to `FileScan` trait
2. Expose `bucketSpec` accessor on `FileTable`
3. Thread `bucketSpec` through `FileScanBuilder` to all 6 format Scan classes
4. Extract `getExpressionBuckets`/`genBucketSet` from `FileSourceStrategy` to `BucketingUtils` (shared utility)
5. Add `bucketedScan`, `disableBucketedScan`, `optionalBucketSet` fields to `FileScan`
6. Implement bucket pruning in `FileScan.partitions` — filter files by bucket ID, group by bucket

**Phase B — Bucket join (HashPartitioning + optimizer rules):**
7. Override `outputPartitioning` in `BatchScanExec` to return `HashPartitioning` for bucketed scans
8. Group files by bucket ID in `FileScan.partitions` (one `FilePartition` per bucket)
9. Extend `DisableUnnecessaryBucketedScan` to handle `BatchScanExec` with bucketed `FileScan`
10. Extend `CoalesceBucketsInJoin` to handle V2 bucketed scans
11. Add `withDisableBucketedScan`/`withNumCoalescedBuckets` methods to `FileScan` + all concrete Scans

**Key files:** FileScan.scala, FileTable.scala, BatchScanExec.scala, BucketingUtils.scala, DisableUnnecessaryBucketedScan.scala, CoalesceBucketsInJoin.scala, + all 6 format ScanBuilder/Scan classes

**Branch:** `SPARK-56231` in worktree `spark-56171-combined`, on top of SPARK-56304. Not started yet.

### SPARK-56230: DEFAULT column values for V2 writes
- **Gap**: `ResolveColumnDefaultInCommandInputQuery` (catalyst module) handles `InsertIntoStatement` but not V2 write commands (`AppendData`, `OverwriteByExpression`, `OverwritePartitionsDynamic`). Has existing TODO: SPARK-43752.
- **Decision**: Deferred — catalyst-level change affecting all V2 sources, not file-table-specific.
31 changes: 31 additions & 0 deletions SPARK-56177-pr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
### Title

[SPARK-56177][SQL] V2 file bucketing write support

### Description

#### What changes were proposed in this pull request?

Enable bucketed writes for V2 file tables via catalog `BucketSpec`.

Changes:
- `FileWrite`: add `bucketSpec` field, use `V1WritesUtils.getWriterBucketSpec()` instead of hardcoded `None`
- `FileTable.createFileWriteBuilder`: extract `catalogTable.bucketSpec` and pass to the write pipeline
- `FileDataSourceV2.getTable`: use `collect` to skip `BucketTransform` (handled via `catalogTable.bucketSpec`)
- `FileWriterFactory`: use `DynamicPartitionDataConcurrentWriter` for bucketed writes since V2's `RequiresDistributionAndOrdering` cannot express hash-based ordering
- All 6 format Write/Table classes (Parquet, ORC, CSV, JSON, Text, Avro) updated with `BucketSpec` parameter

#### Why are the changes needed?

After SPARK-56171 removed the V2 file write gate, `INSERT INTO` a bucketed file table goes through the V2 write path. Without this change, `WriteJobDescription.bucketSpec` is always `None`, so bucketed tables produce non-bucketed files.

#### Does this PR introduce _any_ user-facing change?

No. Bucketed file tables now correctly produce bucketed files via the V2 write path, matching V1 behavior.

#### How was this patch tested?

Added tests in `FileDataSourceV2WriteSuite`:
- Bucketed write with bucket ID verification via `BucketingUtils.getBucketId`
- Partitioned + bucketed write with partition directory and bucket ID verification
- All existing `BucketedWriteWithoutHiveSupportSuite` tests pass (15 total)
26 changes: 26 additions & 0 deletions SPARK-56178-pr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
### Title

[SPARK-56178][SQL] MSCK REPAIR TABLE for V2 file tables

### Description

#### What changes were proposed in this pull request?

Implement `RepairTableExec` for V2 file tables to sync filesystem partition directories with the catalog metastore.

Changes:
- New `RepairTableExec`: scans filesystem partitions via `FileTable.listPartitionIdentifiers()`, compares with catalog, registers missing partitions and drops orphaned entries
- `DataSourceV2Strategy`: route `RepairTable` and `RecoverPartitions` for `FileTable` to the new V2 exec node (non-FileTable V2 tables still throw)

#### Why are the changes needed?

After SPARK-56175 changed `V2SessionCatalog.loadTable` to return `FileTable` instead of `V1Table`, `MSCK REPAIR TABLE` and `ALTER TABLE RECOVER PARTITIONS` on file tables hit the V2 error path (`repairTableNotSupportedForV2TablesError`). A V2-native implementation is needed.

#### Does this PR introduce _any_ user-facing change?

No. `MSCK REPAIR TABLE` and `ALTER TABLE RECOVER PARTITIONS` continue to work on file tables, now via the V2 path.

#### How was this patch tested?

Added test in `FileDataSourceV2WriteSuite`:
- Create partitioned table, write data directly to filesystem partition directories, verify catalog has no partitions before repair, run `MSCK REPAIR TABLE`, verify 3 partitions registered in catalog and data is readable
30 changes: 30 additions & 0 deletions SPARK-56316-pr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
### Title

[SPARK-56316][SQL] Support static partition overwrite for V2 file tables

### Description

#### What changes were proposed in this pull request?

Implement `SupportsOverwriteV2` for V2 file tables to support static partition overwrite (`INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...`).

Changes:
- `FileTable.createFileWriteBuilder`: replace `SupportsTruncate` with `SupportsOverwriteV2`, implement `overwrite(predicates)` to store overwrite predicates
- `FileWrite.toBatch()`: extend overwrite logic to delete only the matching partition directory (ordered by `partitionSchema`) instead of the entire table
- `FileTable.CAPABILITIES`: add `OVERWRITE_BY_FILTER`
- All 6 format Write/Table classes: plumb `overwritePredicates` parameter

#### Why are the changes needed?

After SPARK-56171 removed the V2 file write gate, `INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...` goes through the V2 path, which creates an `OverwriteByExpression` with a partition filter. This requires `OVERWRITE_BY_FILTER` capability and `SupportsOverwriteV2` on the write builder, neither of which FileTable previously had.

This is also a prerequisite for SPARK-56304 (V2 ifPartitionNotExists for INSERT INTO).

#### Does this PR introduce _any_ user-facing change?

No. Static partition overwrite on file tables now works via the V2 path, matching V1 behavior.

#### How was this patch tested?

Added test in `FileDataSourceV2WriteSuite`:
- Insert data into two partitions, overwrite one partition with new data, verify only the target partition is replaced while the other remains unchanged
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -43,13 +43,14 @@ case class AvroTable(
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
override def build(): Write =
AvroWrite(paths, formatName, supportsDataType, mergedWriteInfo(info))
createFileWriteBuilder(info) {
(mergedInfo, partSchema, customLocs, dynamicOverwrite, truncate) =>
AvroWrite(paths, formatName, supportsDataType, mergedInfo, partSchema, customLocs,
dynamicOverwrite, truncate)
}
}

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

override def formatName: String = "AVRO"
override def formatName: String = "Avro"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ case class AvroWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends FileWrite {
info: LogicalWriteInfo,
partitionSchema: StructType,
override val customPartitionLocations: Map[Map[String, String], String] = Map.empty,
override val dynamicPartitionOverwrite: Boolean,
override val isTruncate: Boolean) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Loading