diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2e61773251b67..9e20a8bc1feef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging { } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) - FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf) + FallbackStorage.registerBlockManagerIfNeeded( + _env.blockManager.master, _conf, _hadoopConfiguration) // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7cb3d068b676f..54f38cbccea73 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -598,7 +598,9 @@ package object config { ConfigBuilder("spark.storage.decommission.fallbackStorage.path") .doc("The location for fallback storage during block manager decommissioning. " + "For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " + - "The storage should be managed by TTL because Spark will not clean it up.") + "The storage will not be cleaned up by Spark unless " + + "spark.storage.decommission.fallbackStorage.cleanUp is true. " + + "Use an external clean up mechanism when false, for instance a TTL.") .version("3.1.0") .stringConf .checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.") @@ -606,7 +608,9 @@ package object config { private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP = ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp") - .doc("If true, Spark cleans up its fallback storage data during shutting down.") + .doc("If true, Spark cleans up its fallback storage data once individual shuffles are " + + "freed (interval configured via spark.cleaner.periodicGC.interval), and during " + + "shutting down.") .version("3.2.0") .booleanConf .createWithDefault(false) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 0f2bfaede4454..9d9f63dc672bb 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID +import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle import org.apache.spark.util.Utils /** @@ -95,7 +96,8 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { } } -private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) { +private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf: Configuration) + extends RpcEndpointRef(conf) { // scalastyle:off executioncontextglobal import scala.concurrent.ExecutionContext.Implicits.global // scalastyle:on executioncontextglobal @@ -103,7 +105,12 @@ private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRe override def name: String = "fallback" override def send(message: Any): Unit = {} override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { - Future{true.asInstanceOf[T]} + message match { + case RemoveShuffle(shuffleId) => + FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId)) + Future{true.asInstanceOf[T]} + case _ => Future{true.asInstanceOf[T]} + } } } @@ -120,20 +127,24 @@ private[spark] object FallbackStorage extends Logging { } /** Register the fallback block manager and its RPC endpoint. */ - def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: SparkConf): Unit = { + def registerBlockManagerIfNeeded(master: BlockManagerMaster, + conf: SparkConf, + hadoopConf: Configuration): Unit = { if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) { master.registerBlockManager( - FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new NoopRpcEndpointRef(conf)) + FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, + new FallbackStorageRpcEndpointRef(conf, hadoopConf)) } } - /** Clean up the generated fallback location for this app. */ - def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = { + /** Clean up the generated fallback location for this app (and shuffle id if given). */ + def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = { if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) && conf.contains("spark.app.id")) { - val fallbackPath = + val fallbackPath = shuffleId.foldLeft( new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId) + ) { case (path, shuffleId) => new Path(path, shuffleId.toString) } val fallbackUri = fallbackPath.toUri val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf) // The fallback directory for this app may not be created yet. diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala index 6c51bd4ff2e2f..6df8bc85b5104 100644 --- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils} import org.apache.spark.LocalSparkContext.withSpark +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.config._ import org.apache.spark.io.CompressionCodec import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER} @@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf) val fallbackStorage = new FallbackStorage(conf) - val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false) + val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false) val bm = mock(classOf[BlockManager]) val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false) @@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, "file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf) val fallbackStorage = new FallbackStorage(conf) - val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false) + val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false) val bm = mock(classOf[BlockManager]) val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false) @@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { assert(readResult.nioByteBuffer().array().sameElements(content)) } - test("SPARK-34142: fallback storage API - cleanUp") { + test("SPARK-34142: fallback storage API - cleanUp app") { withTempDir { dir => Seq(true, false).foreach { cleanUp => val appId = s"test$cleanUp" @@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { val location = new File(dir, appId) assert(location.mkdir()) assert(location.exists()) + val shuffle = new File(location, "1") + assert(shuffle.mkdir()) + assert(shuffle.exists()) FallbackStorage.cleanUp(conf, new Configuration()) assert(location.exists() != cleanUp) + assert(shuffle.exists() != cleanUp) + } + } + } + + test("SPARK-34142: fallback storage API - cleanUp shuffle") { + withTempDir { dir => + Seq(true, false).foreach { cleanUp => + val appId = s"test$cleanUp" + val conf = new SparkConf(false) + .set("spark.app.id", appId) + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath + "/") + .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp) + + val location = new File(dir, appId) + assert(location.mkdir()) + assert(location.exists()) + val shuffle1 = new File(location, "1") + assert(shuffle1.mkdir()) + assert(shuffle1.exists()) + val shuffle2 = new File(location, "2") + assert(shuffle2.mkdir()) + assert(shuffle2.exists()) + FallbackStorage.cleanUp(conf, new Configuration(), Some(1)) + assert(location.exists()) + assert(shuffle1.exists() != cleanUp) + assert(shuffle2.exists()) } } } @@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf) val ids = Set((1, 1L, 1)) val bm = mock(classOf[BlockManager]) @@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext { when(bm.getPeers(mc.any())) .thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID)) - val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false) + val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false) when(bm.master).thenReturn(bmm) val blockTransferService = mock(classOf[BlockTransferService]) when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),