Skip to content

Added s3 offloading technique for RDD #51042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 221 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@
package org.apache.spark.rdd

import java.util.Random
import java.util.UUID
import java.util.zip.GZIPInputStream
import java.util.zip.GZIPOutputStream

import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
import scala.ref.WeakReference
import scala.reflect.{classTag, ClassTag}
import scala.util.Try
import scala.util.control.NonFatal

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.TextOutputFormat
Expand All @@ -35,6 +41,7 @@ import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
Expand All @@ -45,8 +52,10 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.serializer.SerializationStream
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
Utils => collectionUtils}
Expand Down Expand Up @@ -1048,17 +1057,226 @@ abstract class RDD[T: ClassTag](
}

/**
* Return an array that contains all of the elements in this RDD.
* Return an array that contains all the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
* This implementation includes an S3 offloading mechanism to reduce driver memory pressure
* for large result sets, configurable via spark.rdd.collect.offloadToS3.enabled.
*
* @note This method should only be used if the resulting array is expected to be small when
* S3 offloading is disabled, as all the data is loaded into the driver's memory.
* With S3 offloading, driver memory pressure during collection is reduced, but the
* final array still resides in driver memory.
*/
def collect(): Array[T] = withScope {
val offloadEnabled = sc.getConf
.getBoolean("spark.rdd.collect.offloadToS3.enabled", defaultValue = false)
if (offloadEnabled) {
try {
logInfo(s"S3 offload: Offloading enabled, attempting S3 " +
s"collect for RDD ID ${MDC(RDD_ID, id)}")
collectOffloadS3()
} catch {
case NonFatal(e) =>
logError(s"S3 offload: S3 offloading failed for RDD ID ${MDC(RDD_ID, id)}, " +
s"falling back to standard collect(). Reason: ${MDC(ERROR, e.getMessage)}", e)
collectInternal()
}
} else {
logInfo(s"S3 offload: Offloading disabled, using standard collect for " +
s"RDD ID ${MDC(RDD_ID, id)}")
collectInternal()
}
}

/**
* The standard RDD collection mechanism, loading all data into the driver's memory.
*/
private def collectInternal(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
import org.apache.spark.util.ArrayImplicits._
Array.concat(results.toImmutableArraySeq: _*)
}

/**
* Collects the RDD elements by offloading intermediate data to S3.
* This is an alternative to the standard collect() for large result sets
* to reduce driver memory pressure during the collection phase.
*/
private def collectOffloadS3(): Array[T] = withScope {
val serializableBroadcastConf = sc
.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))

val s3PathConfigKey = "spark.rdd.collect.s3.path"
val rawS3Root = sc.getConf.getOption(s3PathConfigKey)
.getOrElse(throw new IllegalArgumentException(
s"S3 offload path '$s3PathConfigKey' must be configured for S3 offloading."))
logInfo(s"rawS3Root is $rawS3Root")

val s3RootNormalized = if (rawS3Root.endsWith("/")) rawS3Root.dropRight(1) else rawS3Root
val cleanup = sc.getConf.getBoolean("spark.rdd.collect.s3.cleanup", defaultValue = true)

val sessionId = UUID.randomUUID().toString
val sessionPathString = s"$s3RootNormalized/session-$sessionId"
val sessionHadoopPath = new Path(sessionPathString)
logInfo(s"sessionHadoopPath is $sessionHadoopPath")
val fs = sessionHadoopPath.getFileSystem(sc.hadoopConfiguration)

try {
fs.mkdirs(sessionHadoopPath)
val s3FilePaths = writeFilesToBucket(sessionPathString, serializableBroadcastConf)

val lazyIterator =
readAllFilesFromBucket(sessionPathString, serializableBroadcastConf, s3FilePaths, cleanup)

val collectedDataArray = lazyIterator.toArray

if (collectedDataArray.nonEmpty) {
logInfo(s"S3 offload: Preview first element: ${collectedDataArray.head}")
}

collectedDataArray
} catch {
case NonFatal(e) =>
if (cleanup && fs.exists(sessionHadoopPath)) {
Try(fs.delete(sessionHadoopPath, true)).recover {
case cleanupError => logWarning(s"S3 offload: Failed to cleanup session " +
s"path $sessionPathString during error handling", cleanupError)
}
}
throw e
}
}
/**
* Writes RDD partitions to separate files in a
* specified S3 base path.
* This method is executed by tasks on executors.
*
* @param basePathStr The S3 session path (e.g.,
* s3a://bucket/prefix/session-id)
* under which partition files will
* be written.
* @param conf A serializable Hadoop configuration.
* @return An array of S3 file paths, one for each partition written.
*/
private def writeFilesToBucket(basePathStr: String,
conf: Broadcast[SerializableConfiguration]): Array[String] = {
val pathsRDD: RDD[String] = mapPartitionsWithIndex {
case (partitionIndex, iterator) =>
val fileName = f"part-$partitionIndex%05d.bin"
val filePathStr = s"$basePathStr/$fileName"
val fsPath = new Path(filePathStr)
val fs = fsPath.getFileSystem(conf.value.value)

var out: FSDataOutputStream = null
var serStream: SerializationStream = null
try {
out = fs.create(fsPath, true)
val gzipOut = new GZIPOutputStream(out)
val serInstance = SparkEnv.get.serializer.newInstance()
serStream = serInstance.serializeStream(gzipOut)

iterator.foreach { record =>
serStream.writeObject(record.asInstanceOf[AnyRef])
}

logInfo(s"S3 offload: Executor task for partition " +
s"$partitionIndex records to $filePathStr")
} catch {
case NonFatal(e) =>
logError(s"S3 offload: Executor failed to write " +
s"partition $partitionIndex to $filePathStr", e)
throw e
} finally {
if (serStream != null) {
try {
serStream.close()
} catch {
case NonFatal(e) =>
logError(s"S3 offload: Error closing serialization stream for $filePathStr", e)
}
}
}
Iterator(filePathStr)
}

pathsRDD.collectInternal()
}

/**
* Reads all partition files from a
* specified S3 session path and assembles the data.
* This method is executed on the driver.
*
* @param basePathStr The S3 session path (e.g.,
* s3a://bucket/prefix/session-id)
* from which to read.
* @param conf A serializable Hadoop configuration
* (used for the FileSystem instance
* on the driver).
* @param s3FilePaths An array of S3 file paths
* for the individual partition files.
* @param cleanup A boolean flag indicating
* whether to delete
* the S3 session path after reading.
* @return An array containing all
* elements from the RDD, assembled from S3.
*/
private def readAllFilesFromBucket(basePathStr: String,
conf: Broadcast[SerializableConfiguration],
s3FilePaths: Array[String],
cleanup: Boolean): Iterator[T] = {
val fs = new Path(basePathStr).getFileSystem(conf.value.value)

val sortedPaths = s3FilePaths.sortBy { path =>
val name = new Path(path).getName
Try(name.stripPrefix("part-").stripSuffix(".bin").toInt).getOrElse(0)
}

val allIterators = sortedPaths.iterator.flatMap { path =>
val in = fs.open(new Path(path))
val gzipIn = new GZIPInputStream(in)
val deserializeStream = SparkEnv.get.serializer.newInstance().deserializeStream(gzipIn)

new Iterator[T] {
private var finished = false
private var nextElement: Option[T] = None

override def hasNext: Boolean = {
if (finished) return false
try {
nextElement = Some(deserializeStream.readObject().asInstanceOf[T])
true
} catch {
case _: java.io.EOFException =>
finished = true
deserializeStream.close()
false
}
}

override def next(): T = nextElement.get
}
}

new Iterator[T] {
private var exhausted = false
override def hasNext: Boolean = {
val hn = allIterators.hasNext
if (!hn && !exhausted) {
exhausted = true
if (cleanup) {
Try(fs.delete(new Path(basePathStr), true)).recover {
case e => logInfo(s"S3 cleanup failed for $basePathStr", e)
}
}
}
hn
}

override def next(): T = allIterators.next()
}
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
Expand Down
31 changes: 31 additions & 0 deletions minio/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: "3.8"
name: spark-minio

services:
minio:
image: minio/minio
container_name: minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio-data:/data
- ./minio-init.sh:/minio-init.sh
entrypoint: >
sh -c "
/usr/bin/docker-entrypoint.sh minio server /data --console-address ':9001' &
echo 'Waiting for MinIO to start...';
until curl -s http://localhost:9000/minio/health/ready; do
sleep 1;
done;
echo 'MinIO is ready.';
sh /minio-init.sh;
wait
"

volumes:
minio-data:
47 changes: 47 additions & 0 deletions minio/minio-init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/sh

set -e

MC_ALIAS=minio

# Setup alias
mc alias set $MC_ALIAS http://localhost:9000 minioadmin minioadmin

# Create user
mc admin user add $MC_ALIAS spark-user spark-password || true

# Create policy
cat > /tmp/spark-policy.json <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Effect": "Allow",
"Resource": ["arn:aws:s3:::spark"]
},
{
"Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
"Effect": "Allow",
"Resource": ["arn:aws:s3:::spark/*"]
}
]
}
EOF

mc admin policy create $MC_ALIAS spark-policy /tmp/spark-policy.json || true
mc admin policy attach $MC_ALIAS spark-policy --user spark-user || true

# Check/create bucket
if mc ls $MC_ALIAS/spark >/dev/null 2>&1; then
echo "Bucket 'spark' already exists"
else
mc mb $MC_ALIAS/spark
fi

# Check/create folder placeholder
if mc stat $MC_ALIAS/spark/tmp-spark-offload/.keep >/dev/null 2>&1; then
echo "Folder placeholder '.keep' exists, skipping creation"
else
echo "" | mc pipe $MC_ALIAS/spark/tmp-spark-offload/.keep
fi