Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging {
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf)
FallbackStorage.registerBlockManagerIfNeeded(
_env.blockManager.master, _conf, _hadoopConfiguration)

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -604,23 +604,27 @@ package object config {
"cache block replication should be positive.")
.createWithDefaultString("30s")

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data once individual shuffles are " +
"freed (interval configured via spark.cleaner.periodicGC.interval), and during " +
"shutting down.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
.doc("The location for fallback storage during block manager decommissioning. " +
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
"The storage should be managed by TTL because Spark will not clean it up.")
"The storage will not be cleaned up by Spark unless " +
s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP.key} is true. " +
"Use an external clean up mechanism when false, for instance a TTL.")
.version("3.1.0")
.stringConf
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
.createOptional

private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
.doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -95,15 +96,21 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
}
}

private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf: Configuration)
extends RpcEndpointRef(conf) {
// scalastyle:off executioncontextglobal
import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
override def address: RpcAddress = null
override def name: String = "fallback"
override def send(message: Any): Unit = {}
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
Future{true.asInstanceOf[T]}
message match {
case RemoveShuffle(shuffleId) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where/when do we send this message to this RPC end point?

Copy link
Contributor Author

@EnricoMi EnricoMi Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an unused shuffle is garbage collected on the driver:

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
case CleanSparkListener(listener) =>
doCleanSparkListener(listener)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}

/** Perform shuffle cleanup. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
// Shuffle must be removed before it's unregistered from the output tracker
// to find blocks served by the shuffle service on deallocated executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
} else {
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
}
} catch {
case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e)
}
}

public void removeShuffle(int shuffleId, boolean blocking) {
if (blockManagerMaster == null) {
throw new IllegalStateException("Driver components must be initialized before using");
}
blockManagerMaster.removeShuffle(shuffleId, blocking);
}

/** Remove all blocks belonging to the given shuffle. */
def removeShuffle(shuffleId: Int, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.failed.foreach(e =>
logWarning(log"Failed to remove shuffle ${MDC(SHUFFLE_ID, shuffleId)} - " +
log"${MDC(ERROR, e.getMessage)}", e)
)(ThreadUtils.sameThread)
if (blocking) {
// the underlying Futures will timeout anyway, so it's safe to use infinite timeout here
RpcUtils.INFINITE_TIMEOUT.awaitResult(future)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So FallbackStorageRpcEndpointRef is also attached to the driver block manager master end point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all wiring is there already, all that is missing is calling into existing delete code* when RemoveShuffle message is retrieved.

*after extending the delete-all-app-shuffle-data method to delete-only-a-single-shuffle-id

FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
Future{true.asInstanceOf[T]}
case _ => Future{true.asInstanceOf[T]}
}
}
}

Expand All @@ -120,20 +127,24 @@ private[spark] object FallbackStorage extends Logging {
}

/** Register the fallback block manager and its RPC endpoint. */
def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: SparkConf): Unit = {
def registerBlockManagerIfNeeded(master: BlockManagerMaster,
conf: SparkConf,
hadoopConf: Configuration): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
master.registerBlockManager(
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new NoopRpcEndpointRef(conf))
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0,
new FallbackStorageRpcEndpointRef(conf, hadoopConf))
}
}

/** Clean up the generated fallback location for this app. */
def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
/** Clean up the generated fallback location for this app (and shuffle id if given). */
def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
conf.contains("spark.app.id")) {
val fallbackPath =
val fallbackPath = shuffleId.foldLeft(
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId)
) { case (path, shuffleId) => new Path(path, shuffleId.toString) }
val fallbackUri = fallbackPath.toUri
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
// The fallback directory for this app may not be created yet.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
Expand Down Expand Up @@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
val fallbackStorage = new FallbackStorage(conf)
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)

val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
Expand Down Expand Up @@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
"file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
val fallbackStorage = new FallbackStorage(conf)
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)

val bm = mock(classOf[BlockManager])
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
Expand Down Expand Up @@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(readResult.nioByteBuffer().array().sameElements(content))
}

test("SPARK-34142: fallback storage API - cleanUp") {
test("SPARK-34142: fallback storage API - cleanUp app") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
val appId = s"test$cleanUp"
Expand All @@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
val location = new File(dir, appId)
assert(location.mkdir())
assert(location.exists())
val shuffle = new File(location, "1")
assert(shuffle.mkdir())
assert(shuffle.exists())
FallbackStorage.cleanUp(conf, new Configuration())
assert(location.exists() != cleanUp)
assert(shuffle.exists() != cleanUp)
}
}
}

test("SPARK-34142: fallback storage API - cleanUp shuffle") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
val appId = s"test$cleanUp"
val conf = new SparkConf(false)
.set("spark.app.id", appId)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath + "/")
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)

val location = new File(dir, appId)
assert(location.mkdir())
assert(location.exists())
val shuffle1 = new File(location, "1")
assert(shuffle1.mkdir())
assert(shuffle1.exists())
val shuffle2 = new File(location, "2")
assert(shuffle2.mkdir())
assert(shuffle2.exists())
FallbackStorage.cleanUp(conf, new Configuration(), Some(1))
assert(location.exists())
assert(shuffle1.exists() != cleanUp)
assert(shuffle2.exists())
}
}
}
Expand All @@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)

val ids = Set((1, 1L, 1))
val bm = mock(classOf[BlockManager])
Expand All @@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {

when(bm.getPeers(mc.any()))
.thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
when(bm.master).thenReturn(bmm)
val blockTransferService = mock(classOf[BlockTransferService])
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),
Expand Down