Skip to content

Commit d96df77

Browse files
committed
Cleanup shuffle from fallback storage
1 parent 1dfcd8f commit d96df77

File tree

4 files changed

+67
-14
lines changed

4 files changed

+67
-14
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,8 @@ class SparkContext(config: SparkConf) extends Logging {
641641
}
642642
_ui.foreach(_.setAppId(_applicationId))
643643
_env.blockManager.initialize(_applicationId)
644-
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf)
644+
FallbackStorage.registerBlockManagerIfNeeded(
645+
_env.blockManager.master, _conf, _hadoopConfiguration)
645646

646647
// The metrics system for Driver need to be set spark.app.id to app ID.
647648
// So it should start after we get app ID from the task scheduler and set spark.app.id.

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,15 +598,19 @@ package object config {
598598
ConfigBuilder("spark.storage.decommission.fallbackStorage.path")
599599
.doc("The location for fallback storage during block manager decommissioning. " +
600600
"For example, `s3a://spark-storage/`. In case of empty, fallback storage is disabled. " +
601-
"The storage should be managed by TTL because Spark will not clean it up.")
601+
"The storage will not be cleaned up by Spark unless " +
602+
"spark.storage.decommission.fallbackStorage.cleanUp is true. " +
603+
"Use an external clean up mechanism when false, for instance a TTL.")
602604
.version("3.1.0")
603605
.stringConf
604606
.checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
605607
.createOptional
606608

607609
private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP =
608610
ConfigBuilder("spark.storage.decommission.fallbackStorage.cleanUp")
609-
.doc("If true, Spark cleans up its fallback storage data during shutting down.")
611+
.doc("If true, Spark cleans up its fallback storage data once individual shuffles are " +
612+
"freed (interval configured via spark.cleaner.periodicGC.interval), and during " +
613+
"shutting down.")
610614
.version("3.2.0")
611615
.booleanConf
612616
.createWithDefault(false)

core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.network.util.JavaUtils
3636
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
3737
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
3838
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
39+
import org.apache.spark.storage.BlockManagerMessages.RemoveShuffle
3940
import org.apache.spark.util.Utils
4041

4142
/**
@@ -95,15 +96,21 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
9596
}
9697
}
9798

98-
private[storage] class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
99+
private[storage] class FallbackStorageRpcEndpointRef(conf: SparkConf, hadoopConf: Configuration)
100+
extends RpcEndpointRef(conf) {
99101
// scalastyle:off executioncontextglobal
100102
import scala.concurrent.ExecutionContext.Implicits.global
101103
// scalastyle:on executioncontextglobal
102104
override def address: RpcAddress = null
103105
override def name: String = "fallback"
104106
override def send(message: Any): Unit = {}
105107
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
106-
Future{true.asInstanceOf[T]}
108+
message match {
109+
case RemoveShuffle(shuffleId) =>
110+
FallbackStorage.cleanUp(conf, hadoopConf, Some(shuffleId))
111+
Future{true.asInstanceOf[T]}
112+
case _ => Future{true.asInstanceOf[T]}
113+
}
107114
}
108115
}
109116

@@ -120,20 +127,24 @@ private[spark] object FallbackStorage extends Logging {
120127
}
121128

122129
/** Register the fallback block manager and its RPC endpoint. */
123-
def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: SparkConf): Unit = {
130+
def registerBlockManagerIfNeeded(master: BlockManagerMaster,
131+
conf: SparkConf,
132+
hadoopConf: Configuration): Unit = {
124133
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
125134
master.registerBlockManager(
126-
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0, new NoopRpcEndpointRef(conf))
135+
FALLBACK_BLOCK_MANAGER_ID, Array.empty[String], 0, 0,
136+
new FallbackStorageRpcEndpointRef(conf, hadoopConf))
127137
}
128138
}
129139

130-
/** Clean up the generated fallback location for this app. */
131-
def cleanUp(conf: SparkConf, hadoopConf: Configuration): Unit = {
140+
/** Clean up the generated fallback location for this app (and shuffle id if given). */
141+
def cleanUp(conf: SparkConf, hadoopConf: Configuration, shuffleId: Option[Int] = None): Unit = {
132142
if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined &&
133143
conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP) &&
134144
conf.contains("spark.app.id")) {
135-
val fallbackPath =
145+
val fallbackPath = shuffleId.foldLeft(
136146
new Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get, conf.getAppId)
147+
) { case (path, shuffleId) => new Path(path, shuffleId.toString) }
137148
val fallbackUri = fallbackPath.toUri
138149
val fallbackFileSystem = FileSystem.get(fallbackUri, hadoopConf)
139150
// The fallback directory for this app may not be created yet.

core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3030

3131
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
3232
import org.apache.spark.LocalSparkContext.withSpark
33+
import org.apache.spark.deploy.SparkHadoopUtil
3334
import org.apache.spark.internal.config._
3435
import org.apache.spark.io.CompressionCodec
3536
import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
@@ -67,8 +68,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
6768
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
6869
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
6970
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
71+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
72+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
7073
val fallbackStorage = new FallbackStorage(conf)
71-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
74+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
7275

7376
val bm = mock(classOf[BlockManager])
7477
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
@@ -118,8 +121,10 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
118121
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
119122
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
120123
"file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
124+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
125+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
121126
val fallbackStorage = new FallbackStorage(conf)
122-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
127+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
123128

124129
val bm = mock(classOf[BlockManager])
125130
val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
@@ -153,7 +158,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
153158
assert(readResult.nioByteBuffer().array().sameElements(content))
154159
}
155160

156-
test("SPARK-34142: fallback storage API - cleanUp") {
161+
test("SPARK-34142: fallback storage API - cleanUp app") {
157162
withTempDir { dir =>
158163
Seq(true, false).foreach { cleanUp =>
159164
val appId = s"test$cleanUp"
@@ -165,8 +170,38 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
165170
val location = new File(dir, appId)
166171
assert(location.mkdir())
167172
assert(location.exists())
173+
val shuffle = new File(location, "1")
174+
assert(shuffle.mkdir())
175+
assert(shuffle.exists())
168176
FallbackStorage.cleanUp(conf, new Configuration())
169177
assert(location.exists() != cleanUp)
178+
assert(shuffle.exists() != cleanUp)
179+
}
180+
}
181+
}
182+
183+
test("SPARK-34142: fallback storage API - cleanUp shuffle") {
184+
withTempDir { dir =>
185+
Seq(true, false).foreach { cleanUp =>
186+
val appId = s"test$cleanUp"
187+
val conf = new SparkConf(false)
188+
.set("spark.app.id", appId)
189+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH, dir.getAbsolutePath + "/")
190+
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, cleanUp)
191+
192+
val location = new File(dir, appId)
193+
assert(location.mkdir())
194+
assert(location.exists())
195+
val shuffle1 = new File(location, "1")
196+
assert(shuffle1.mkdir())
197+
assert(shuffle1.exists())
198+
val shuffle2 = new File(location, "2")
199+
assert(shuffle2.mkdir())
200+
assert(shuffle2.exists())
201+
FallbackStorage.cleanUp(conf, new Configuration(), Some(1))
202+
assert(location.exists())
203+
assert(shuffle1.exists() != cleanUp)
204+
assert(shuffle2.exists())
170205
}
171206
}
172207
}
@@ -177,6 +212,8 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
177212
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
178213
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
179214
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
215+
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
216+
val rpcEndpointRef = new FallbackStorageRpcEndpointRef(conf, hadoopConf)
180217

181218
val ids = Set((1, 1L, 1))
182219
val bm = mock(classOf[BlockManager])
@@ -202,7 +239,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
202239

203240
when(bm.getPeers(mc.any()))
204241
.thenReturn(Seq(FallbackStorage.FALLBACK_BLOCK_MANAGER_ID))
205-
val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
242+
val bmm = new BlockManagerMaster(rpcEndpointRef, null, conf, false)
206243
when(bm.master).thenReturn(bmm)
207244
val blockTransferService = mock(classOf[BlockTransferService])
208245
when(blockTransferService.uploadBlockSync(mc.any(), mc.any(), mc.any(), mc.any(), mc.any(),

0 commit comments

Comments
 (0)