Skip to content

Commit 3b1a092

Browse files
vinooganeshbulldozer-bot[bot]
authored andcommitted
[SPARK-25998][CORE] Change TorrentBroadcast to hold weak reference of broadcast object (apache-spark-on-k8s#469)
… broadcast object ## What changes were proposed in this pull request? This PR changes the broadcast object in TorrentBroadcast from a strong reference to a weak reference. This allows it to be garbage collected even if the Dataset is held in memory. This is ok, because the broadcast object can always be re-read. ## How was this patch tested? Tested in Spark shell by taking a heap dump, full repro steps listed in https://issues.apache.org/jira/browse/SPARK-25998. Closes apache#22995 from bkrieger/bk/torrent-broadcast-weak. Authored-by: Brandon Krieger <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 5f701bf commit 3b1a092

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.broadcast
1919

2020
import java.io._
21+
import java.lang.ref.SoftReference
2122
import java.nio.ByteBuffer
2223
import java.util.zip.Adler32
2324

@@ -63,9 +64,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
6364
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
6465
* which builds this value by reading blocks from the driver and/or other executors.
6566
*
66-
* On the driver, if the value is required, it is read lazily from the block manager.
67+
* On the driver, if the value is required, it is read lazily from the block manager. We hold
68+
* a soft reference so that it can be garbage collected if required, as we can always reconstruct
69+
* in the future.
6770
*/
68-
@transient private lazy val _value: T = readBroadcastBlock()
71+
@transient private var _value: SoftReference[T] = _
6972

7073
/** The compression codec to use, or None if compression is disabled */
7174
@transient private var compressionCodec: Option[CompressionCodec] = _
@@ -94,8 +97,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
9497
/** The checksum for all the blocks. */
9598
private var checksums: Array[Int] = _
9699

97-
override protected def getValue() = {
98-
_value
100+
override protected def getValue() = synchronized {
101+
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
102+
if (memoized != null) {
103+
memoized
104+
} else {
105+
val newlyRead = readBroadcastBlock()
106+
_value = new SoftReference[T](newlyRead)
107+
newlyRead
108+
}
99109
}
100110

101111
private def calcChecksum(block: ByteBuffer): Int = {
@@ -209,8 +219,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
209219
}
210220

211221
private def readBroadcastBlock(): T = Utils.tryOrIOException {
212-
TorrentBroadcast.synchronized {
213-
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
222+
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
223+
broadcastCache.synchronized {
214224

215225
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
216226
setConf(SparkEnv.get.conf)

0 commit comments

Comments
 (0)