Skip to content

Conversation

EnricoMi
Copy link
Contributor

What changes were proposed in this pull request?

Shuffle data of individual shuffles are deleted from the fallback storage during regular shuffle cleanup.

Why are the changes needed?

Currently, the shuffle data are only removed from the fallback storage on Spark context shutdown. Long running Spark jobs accumulate shuffle data, though this data is not used by Spark any more. Those shuffles should be cleaned up while Spark context is running.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests and manual test via reproduction example.

Run the reproduction example without the <<< "$scala". In the Spark shell, execute this code:

import org.apache.spark.sql.SaveMode

val n = 100000000
val j = spark.sparkContext.broadcast(1000)
val x = spark.range(0, n, 1, 100).select($"id".cast("int"))
x.as[Int]
 .mapPartitions { it => if (it.hasNext && it.next < n / 100 * 80) Thread.sleep(2000); it }
 .groupBy($"value" % 1000).as[Int, Int]
 .flatMapSortedGroups($"value"){ case (m, it) => if (it.hasNext && it.next == 0) Thread.sleep(10000); it }
  .write.mode(SaveMode.Overwrite).csv("/tmp/spark.csv")

This writes some data of shuffle 0 to the fallback storage.

Invoking System.gc() removes that shuffle directory from the fallback storage. Exiting the Spark shell removes the whole application directory.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Jun 17, 2025
@EnricoMi EnricoMi force-pushed the fallback-storage-cleanup branch from d96df77 to 602689b Compare September 2, 2025 05:01
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Sep 2, 2025

@dongjoon-hyun @cloud-fan what do you think about this improvement?

.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is already present, how did we clean up shuffle from the fallback storage before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, shuffle data are only removed from fallback storage on application exit (Spark context shutdown).

override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
Future{true.asInstanceOf[T]}
message match {
case RemoveShuffle(shuffleId) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where/when do we send this message to this RPC end point?

Copy link
Contributor Author

@EnricoMi EnricoMi Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an unused shuffle is garbage collected on the driver:

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
case CleanSparkListener(listener) =>
doCleanSparkListener(listener)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}

/** Perform shuffle cleanup. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
// Shuffle must be removed before it's unregistered from the output tracker
// to find blocks served by the shuffle service on deallocated executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} else {
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
}
} catch {
case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e)
}
}

public void removeShuffle(int shuffleId, boolean blocking) {
if (blockManagerMaster == null) {
throw new IllegalStateException("Driver components must be initialized before using");
}
blockManagerMaster.removeShuffle(shuffleId, blocking);
}

/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.failed.foreach(e =>
logWarning(log"Failed to remove shuffle ${MDC(SHUFFLE_ID, shuffleId)} - " +
log"${MDC(ERROR, e.getMessage)}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So FallbackStorageRpcEndpointRef is also attached to the driver block manager master end point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all wiring is there already, all that is missing is calling into existing delete code* when RemoveShuffle message is retrieved.

*after extending the delete-all-app-shuffle-data method to delete-only-a-single-shuffle-id

@cloud-fan
Copy link
Contributor

waiting for @dongjoon-hyun to do a final signoff.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Sep 5, 2025

@cloud-fan thanks for moving this along!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants