diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 36f4fb5ac970e..805ec68a89476 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -57,6 +57,7 @@ import org.apache.spark.status.api.v1.ThreadStackTrace import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.io.ChunkedByteBuffer private[spark] object IsolatedSessionState { // Authoritative store for all isolated sessions. Sessions are put here when created @@ -883,7 +884,7 @@ private[spark] class Executor( val resources = taskDescription.resources.map { case (rName, addressesAmounts) => rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray) } - val value = Utils.tryWithSafeFinally { + var value: Any = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, @@ -938,7 +939,9 @@ private[spark] class Executor( val resultSer = env.serializer.newInstance() val beforeSerializationNs = System.nanoTime() - val valueByteBuffer = SerializerHelper.serializeToChunkedBuffer(resultSer, value) + var valueByteBuffer: ChunkedByteBuffer = SerializerHelper.serializeToChunkedBuffer( + resultSer, value) + value = null // Allow GC to reclaim the raw task result val afterSerializationNs = System.nanoTime() // Deserialization happens in two parts: first, we deserialize a Task object, which @@ -982,10 +985,15 @@ private[spark] class Executor( val accumUpdates = task.collectAccumulatorUpdates() val metricPeaks = metricsPoller.getTaskMetricPeaks(taskId) // TODO: do not serialize value twice - val directResult = new DirectTaskResult(valueByteBuffer, accumUpdates, metricPeaks) + var directResult: DirectTaskResult[Any] = new DirectTaskResult( + valueByteBuffer, accumUpdates, metricPeaks) // try to estimate a reasonable upper bound of DirectTaskResult serialization val serializedDirectResult = SerializerHelper.serializeToChunkedBuffer(ser, directResult, valueByteBuffer.size + accumUpdates.size * 32 + metricPeaks.length * 8) + // Allow GC to reclaim the first serialization buffer. Both references must be + // nulled: the local var and the field inside directResult point to the same object. + valueByteBuffer = null + directResult = null val resultSize = serializedDirectResult.size executorSource.METRIC_RESULT_SIZE.inc(resultSize)