Spark: Parallelize RemoveOrphanFiles prefix listing across executors#16933
Open
arifazmidd wants to merge 1 commit into
Open
Spark: Parallelize RemoveOrphanFiles prefix listing across executors#16933arifazmidd wants to merge 1 commit into
arifazmidd wants to merge 1 commit into
Conversation
The prefix_listing path enumerated the entire table on a single driver thread via one listPrefix iterator, so remove_orphan_files could hang on large object-store tables before ever reaching the deletion phase. Distribute the listing across executors the way the Hadoop path already does: discover shallow sub-prefixes on the driver, then list each sub-prefix in parallel with listPrefix using a broadcast SerializableTable. Gated by a new parallel-prefix-listing option (default true), with the prior serial path as the fallback.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #16932
Description
DeleteOrphanFilesSparkActionhas two listing strategies, but only the Hadoop one parallelizes:prefix_listing = false(Hadoop /listStatus) is depth-limited on the driver and fans deep sub-directories out across executors. It parallelizes, but issues ~one LIST call per directory. On tables with hundreds of thousands of partition directories that is too many round-trips to finish in a reasonable time even when distributed.prefix_listing = true(FileIO /listPrefix) uses a flat recursive listing that needs ~an order of magnitude fewer LIST calls, but it is iterated serially on the driver and thenparallelize(matchingFiles, 1)(a single partition). For tables with tens of millions of files the driver never finishes listing, soremove_orphan_fileshangs before reaching the deletion phase.This change gives the prefix-listing path the same executor fan-out the Hadoop path already has, so it gets both the low call count of
listPrefixand cluster parallelism.Changes
usePrefixListingbranch oflistedFileDS():FileSystemWalker.listDirRecursivelyWithHadoopdepth-limited discovery (SupportsPrefixOperations.listPrefixis recursive-only and cannot enumerate a single level, so discovery needs a delimiter-capable step).parallelize(subDirs).mapPartitions(...); each task runs the existingFileSystemWalker.listDirRecursivelyWithFileIOon its sub-prefix, withFileIOobtained from a broadcastSerializableTableWithSize.parallel-prefix-listingoption (defaulttrue);falserestores the prior serial driver-side path.FileSystemWalkerchanges, reuses the existing walker methods.Testing
TestRemoveOrphanFilesActionis already parameterized overusePrefixListing, so the full suite exercises the parallel path (186 tests, all passing, against currentmain).prefix_listing => truepreviously hung indefinitely on the driver inlistDirRecursivelyWithFileIO. With this change the listing completed (~10k sub-prefix tasks across 30 executors, finishing in minutes) and the job proceeded to delete the orphan files.Notes
listPrefixcan trigger S3 503 throttling; raisings3.retry.num-retries/s3.retry.max-wait-msmitigates it. Could add a docs note if useful.