From 747ed4606631d138312967ef7a93b8c2eb239ac9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2026 18:55:14 -0700 Subject: [PATCH 1/5] Revert "[SPARK-56203][SQL] Fix race condition in `SortExec.rowSorter`" This reverts commit 1e52a02861c8232b0ba8b16cfeb3ad40563d0804. --- .../apache/spark/sql/execution/SortExec.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 34ff34c26eb4b..11fde41aae9e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -62,13 +62,9 @@ case class SortExec( "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) - // Each task thread has its own UnsafeExternalRowSorter instance stored here. - // Using a stable lazy val (rather than a reassigned var) ensures that the ThreadLocal - // object itself is never replaced: concurrent tasks on different threads each get their - // own independent slot in the same ThreadLocal, so one task can never observe or clobber - // another task's sorter reference. - @transient private[sql] lazy val rowSorter: ThreadLocal[UnsafeExternalRowSorter] = - new ThreadLocal[UnsafeExternalRowSorter]() + // Each task has its own instance of UnsafeExternalRowSorter. It is created in the + // createSorter method and stored in a ThreadLocal variable. + private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter] = _ /** * This method gets invoked only once for each SortExec instance to initialize an @@ -77,6 +73,8 @@ case class SortExec( * should make it public. */ def createSorter(): UnsafeExternalRowSorter = { + rowSorter = new ThreadLocal[UnsafeExternalRowSorter]() + val ordering = RowOrdering.create(sortOrder, output) // The comparator for comparing prefix @@ -198,13 +196,13 @@ case class SortExec( } /** - * In SortExec, we overwrite cleanupResources to close UnsafeExternalRowSorter. - * There's possible for rowSorter to be null here, for example, in the scenario of empty iterator - * in the current task, the downstream physical node (like SortMergeJoinExec) will trigger - * cleanupResources before rowSorter is initialized in createSorter. + * In SortExec, we overwrites cleanupResources to close UnsafeExternalRowSorter. */ override protected[sql] def cleanupResources(): Unit = { - if (rowSorter.get() != null) { + if (rowSorter != null && rowSorter.get() != null) { + // There's possible for rowSorter is null here, for example, in the scenario of empty + // iterator in the current task, the downstream physical node(like SortMergeJoinExec) will + // trigger cleanupResources before rowSorter initialized in createSorter. rowSorter.get().cleanupResources() } super.cleanupResources() From 8dadbb5a55687f5e34cb7220305fbdffc3be2a21 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2026 18:55:36 -0700 Subject: [PATCH 2/5] Revert "[SPARK-52609][SQL] Make sure row sorter from each task won't be overwritten by other tasks in parallel" This reverts commit 26a9ba6481c4700a6ffbb83e82cde6d49d8160da. --- .../apache/spark/sql/execution/SortExec.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 11fde41aae9e4..5abc6f3ed5769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -62,9 +62,7 @@ case class SortExec( "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) - // Each task has its own instance of UnsafeExternalRowSorter. It is created in the - // createSorter method and stored in a ThreadLocal variable. - private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter] = _ + private[sql] var rowSorter: UnsafeExternalRowSorter = _ /** * This method gets invoked only once for each SortExec instance to initialize an @@ -73,8 +71,6 @@ case class SortExec( * should make it public. */ def createSorter(): UnsafeExternalRowSorter = { - rowSorter = new ThreadLocal[UnsafeExternalRowSorter]() - val ordering = RowOrdering.create(sortOrder, output) // The comparator for comparing prefix @@ -99,14 +95,13 @@ case class SortExec( } val pageSize = SparkEnv.get.memoryManager.pageSizeBytes - val newRowSorter = UnsafeExternalRowSorter.create( + rowSorter = UnsafeExternalRowSorter.create( schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort) if (testSpillFrequency > 0) { - newRowSorter.setTestSpillFrequency(testSpillFrequency) + rowSorter.setTestSpillFrequency(testSpillFrequency) } - rowSorter.set(newRowSorter) - rowSorter.get() + rowSorter } protected override def doExecute(): RDD[InternalRow] = { @@ -199,11 +194,11 @@ case class SortExec( * In SortExec, we overwrites cleanupResources to close UnsafeExternalRowSorter. */ override protected[sql] def cleanupResources(): Unit = { - if (rowSorter != null && rowSorter.get() != null) { + if (rowSorter != null) { // There's possible for rowSorter is null here, for example, in the scenario of empty // iterator in the current task, the downstream physical node(like SortMergeJoinExec) will // trigger cleanupResources before rowSorter initialized in createSorter. - rowSorter.get().cleanupResources() + rowSorter.cleanupResources() } super.cleanupResources() } From a776ecf556c8f02ac7ac229e23a7777862cebf16 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2026 19:07:34 -0700 Subject: [PATCH 3/5] [SQL] Add warning comment to SortExec.rowSorter and fix cleanupResources comment Co-Authored-By: Claude Sonnet 4.6 --- .../org/apache/spark/sql/execution/SortExec.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 5abc6f3ed5769..1e9d09cc05175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -62,6 +62,9 @@ case class SortExec( "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) + // WARNING: This is a shared mutable var on the SortExec instance. Do not access it from + // multiple threads concurrently — Spark operators are not thread-safe and one task's sorter + // could overwrite another's, causing a race condition. private[sql] var rowSorter: UnsafeExternalRowSorter = _ /** @@ -191,13 +194,13 @@ case class SortExec( } /** - * In SortExec, we overwrites cleanupResources to close UnsafeExternalRowSorter. + * In SortExec, we overwrite cleanupResources to close UnsafeExternalRowSorter. + * There's possible for rowSorter to be null here, for example, in the scenario of empty iterator + * in the current task, the downstream physical node (like SortMergeJoinExec) will trigger + * cleanupResources before rowSorter is initialized in createSorter. */ override protected[sql] def cleanupResources(): Unit = { if (rowSorter != null) { - // There's possible for rowSorter is null here, for example, in the scenario of empty - // iterator in the current task, the downstream physical node(like SortMergeJoinExec) will - // trigger cleanupResources before rowSorter initialized in createSorter. rowSorter.cleanupResources() } super.cleanupResources() From a317af229b4c1ea47bbcce1ae9f7bed258907795 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2026 19:19:59 -0700 Subject: [PATCH 4/5] [SQL] Soften thread-safety wording in SortExec.rowSorter warning comment Co-Authored-By: Claude Sonnet 4.6 --- .../main/scala/org/apache/spark/sql/execution/SortExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 1e9d09cc05175..72b3df10c78e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -63,8 +63,8 @@ case class SortExec( "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) // WARNING: This is a shared mutable var on the SortExec instance. Do not access it from - // multiple threads concurrently — Spark operators are not thread-safe and one task's sorter - // could overwrite another's, causing a race condition. + // multiple threads concurrently — Spark operators do not guarantee thread-safety and one + // task's sorter could overwrite another's, causing a race condition. private[sql] var rowSorter: UnsafeExternalRowSorter = _ /** From 323c4f6b41c5bebba3ba4f67fe996e1ecc6fd47c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 26 Mar 2026 23:07:27 -0700 Subject: [PATCH 5/5] [SQL] Fix non-ASCII em dash in SortExec.rowSorter warning comment Co-Authored-By: Claude Sonnet 4.6 --- .../main/scala/org/apache/spark/sql/execution/SortExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 72b3df10c78e8..46b2cde2eed95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -63,7 +63,7 @@ case class SortExec( "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) // WARNING: This is a shared mutable var on the SortExec instance. Do not access it from - // multiple threads concurrently — Spark operators do not guarantee thread-safety and one + // multiple threads concurrently - Spark operators do not guarantee thread-safety and one // task's sorter could overwrite another's, causing a race condition. private[sql] var rowSorter: UnsafeExternalRowSorter = _