Skip to content

Spark: RemoveOrphanFiles prefix_listing enumerates the whole table serially on the driver #16932

Description

@arifazmidd

Feature Request / Improvement

remove_orphan_files (via DeleteOrphanFilesSparkAction) supports two listing strategies, but only one of them parallelizes across executors. The prefix_listing (FileIO) path enumerates the entire table on a single driver thread, which makes it unusable for large object-store tables.

Background: the two listing paths

  • prefix_listing = false (Hadoop / listStatus)FileSystemWalker.listDirRecursivelyWithHadoop walks the directory tree using delimiter-based listStatus, and the Spark action already fans deep sub-directories out across executors via parallelize(...).mapPartitions(...). It parallelizes, but it issues roughly one LIST call per directory. On a table with hundreds of thousands of partition directories that is an enormous number of latency-bound round-trips; even spread across executors it can fail to finish (in our case it timed out at 8h having listed ~38% of the table).

  • prefix_listing = true (FileIO / listPrefix)FileSystemWalker.listDirRecursivelyWithFileIO calls SupportsPrefixOperations.listPrefix(location), a flat recursive listing (~1000 keys per page, no delimiter), which needs roughly an order of magnitude fewer LIST calls for the same table. But it is iterated serially on the driver, then parallelize(matchingFiles, 1) (a single partition). For a table with tens of millions of files the driver never finishes listing and remove_orphan_files hangs before it ever reaches the deletion phase.

So today neither mode works well at scale: the Hadoop path parallelizes but makes too many calls, and the cheaper prefix path makes few calls but doesn't parallelize.

Proposal

Give the prefix-listing path the same executor fan-out the Hadoop path already has, so it gets the low call count of listPrefix and cluster parallelism:

  1. On the driver, discover shallow sub-prefixes (depth-limited), reusing the existing listDirRecursivelyWithHadoop discovery the Hadoop path already uses. (SupportsPrefixOperations.listPrefix is recursive-only and cannot enumerate a single directory level, so discovery needs a delimiter-capable step.)
  2. Distribute those sub-prefixes across executors with parallelize(subDirs).mapPartitions(...), each task running the existing listDirRecursivelyWithFileIO on its assigned sub-prefix, with the FileIO obtained from a broadcast SerializableTable.
  3. Gate behind a new parallel-prefix-listing option (default true), with the current serial path as the fallback.

This mirrors the Hadoop path's proven driver-discovery + executor-dispatch structure, but performs the heavy deep listing with the S3-native flat listPrefix.

Real-world motivation

On an S3-backed table with ~40M files / hundreds of thousands of partitions, remove_orphan_files with prefix_listing => true hung indefinitely on the driver in listDirRecursivelyWithFileIO. With the proposed parallel listing it completed: ~10k sub-prefix listing tasks across 30 executors finished in minutes instead of hanging, and the job went on to delete the orphan files. (Note: heavily concurrent listPrefix can trigger S3 503 throttling, so raising s3.retry.num-retries / s3.retry.max-wait-ms is advisable; worth documenting alongside the feature.)

I have a Spark 3.5 implementation ready and would open a PR.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions