Skip to content

Commit 284ae30

Browse files
Dandandanclaude
andauthored
Perf: cache primitive sort key in SortPreservingMerge to drop per-comparison bounds checks (#23162)
## Which issue does this PR close? <!-- No dedicated issue; this is a self-contained performance improvement to the sort-preserving merge hot path. Happy to file a tracking issue if preferred. --> - N/A (performance) ## Rationale for this change Two inefficiencies in the hot path: 1. The single-column primitive/array cursor comparison was **not being inlined** (in profiles it appeared as a separate ~21% self-time symbol), and every comparison bounds-checked the underlying `ScalarBuffer` twice. 2. `maybe_poll_stream` was called on **every** output row, even though it is a no-op whenever the winner's cursor is still live (the common case). ``` ┏━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ HEAD ┃ perf_spm-compare-cache ┃ Change ┃ ┡━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ Q1 │ 165.13 / 166.08 ±0.75 / 166.98 ms │ 131.46 / 132.49 ±0.79 / 133.86 ms │ +1.25x faster │ │ Q2 │ 141.22 / 141.86 ±0.81 / 143.38 ms │ 118.06 / 120.08 ±1.30 / 121.91 ms │ +1.18x faster │ │ Q3 │ 652.56 / 657.09 ±2.91 / 661.10 ms │ 645.82 / 649.88 ±3.27 / 654.55 ms │ no change │ │ Q4 │ 195.87 / 199.80 ±6.83 / 213.43 ms │ 180.22 / 183.86 ±5.04 / 193.43 ms │ +1.09x faster │ │ Q5 │ 279.14 / 279.91 ±0.50 / 280.71 ms │ 260.15 / 260.70 ±0.48 / 261.54 ms │ +1.07x faster │ │ Q6 │ 292.50 / 293.43 ±0.71 / 294.56 ms │ 273.28 / 274.77 ±1.66 / 277.81 ms │ +1.07x faster │ │ Q7 │ 465.87 / 467.23 ±2.01 / 471.21 ms │ 445.83 / 449.21 ±3.47 / 455.33 ms │ no change │ │ Q8 │ 327.51 / 330.08 ±3.25 / 336.40 ms │ 319.12 / 325.35 ±6.20 / 336.30 ms │ no change │ │ Q9 │ 342.17 / 346.17 ±2.68 / 348.71 ms │ 336.40 / 348.61 ±13.37 / 368.15 ms │ no change │ │ Q10 │ 484.08 / 486.50 ±2.53 / 490.75 ms │ 474.15 / 490.16 ±13.58 / 507.56 ms │ no change │ │ Q11 │ 244.24 / 250.98 ±8.70 / 267.17 ms │ 229.07 / 237.38 ±8.10 / 249.40 ms │ +1.06x faster │ └───────┴───────────────────────────────────┴────────────────────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (HEAD) │ 3619.13ms │ │ Total Time (perf_spm-compare-cache) │ 3472.49ms │ │ Average Time (HEAD) │ 329.01ms │ │ Average Time (perf_spm-compare-cache) │ 315.68ms │ │ Queries Faster │ 6 │ │ Queries Slower │ 0 │ │ Queries with No Change │ 5 │ │ Queries with Failure │ 0 │ └───────────────────────────────────────┴───────────┘ ``` ## What changes are included in this PR? - Inline the lightweight primitive/array cursor comparisons. - Skip the per-row `maybe_poll_stream` call unless the winner's cursor is actually exhausted and needs a fresh `RecordBatch`. - Cache the current (and previous) value of a primitive cursor, refreshed once per `advance()` via a new `CursorValues::set_offset` hook (default no-op; `ArrayValues` forwards to the inner cursor). ## Are these changes tested? Existing tests ## Are there any user-facing changes? No. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent a00f749 commit 284ae30

2 files changed

Lines changed: 104 additions & 11 deletions

File tree

datafusion/physical-plan/src/sorts/cursor.rs

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ pub trait CursorValues {
4444

4545
/// Returns comparison of `l[l_idx]` and `r[r_idx]`
4646
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering;
47+
48+
/// Notifies the values that the owning [`Cursor`] moved to `offset` (always
49+
/// `< len()`), so caching implementations can refresh the value(s) read by
50+
/// the hot comparisons. Default no-op (e.g. byte/row cursors don't benefit).
51+
#[inline]
52+
fn set_offset(&mut self, offset: usize) {
53+
let _ = offset;
54+
}
4755
}
4856

4957
/// A comparable cursor, used by sort operations
@@ -89,14 +97,22 @@ impl<T: CursorValues> Cursor<T> {
8997
}
9098

9199
/// Returns true if there are no more rows in this cursor
100+
#[inline]
92101
pub fn is_finished(&self) -> bool {
93102
self.offset == self.values.len()
94103
}
95104

96105
/// Advance the cursor, returning the previous row index
106+
#[inline]
97107
pub fn advance(&mut self) -> usize {
98108
let t = self.offset;
99109
self.offset += 1;
110+
// Refresh the cache for the new position. The guard keeps `set_offset`
111+
// in bounds; a finished cursor's stale cache is never read (it is taken
112+
// before the next comparison).
113+
if self.offset < self.values.len() {
114+
self.values.set_offset(self.offset);
115+
}
100116
t
101117
}
102118

@@ -112,6 +128,7 @@ impl<T: CursorValues> Cursor<T> {
112128
}
113129

114130
impl<T: CursorValues> PartialEq for Cursor<T> {
131+
#[inline]
115132
fn eq(&self, other: &Self) -> bool {
116133
T::eq(&self.values, self.offset, &other.values, other.offset)
117134
}
@@ -142,6 +159,7 @@ impl<T: CursorValues> PartialOrd for Cursor<T> {
142159
}
143160

144161
impl<T: CursorValues> Ord for Cursor<T> {
162+
#[inline]
145163
fn cmp(&self, other: &Self) -> Ordering {
146164
T::compare(&self.values, self.offset, &other.values, other.offset)
147165
}
@@ -180,10 +198,14 @@ impl RowValues {
180198
}
181199

182200
impl CursorValues for RowValues {
201+
#[inline]
183202
fn len(&self) -> usize {
184203
self.rows.num_rows()
185204
}
186205

206+
// No inline hint on purpose: for the heavyweight `Rows` byte comparison the
207+
// compiler's own choice wins — both `#[inline]` and `#[inline(never)]`
208+
// measurably regress the multi-column merge path.
187209
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
188210
l.rows.row(l_idx) == r.rows.row(r_idx)
189211
}
@@ -209,29 +231,74 @@ impl<T: ArrowPrimitiveType> CursorArray for PrimitiveArray<T> {
209231
type Values = PrimitiveValues<T::Native>;
210232

211233
fn values(&self) -> Self::Values {
212-
PrimitiveValues(self.values().clone())
234+
PrimitiveValues::new(self.values().clone())
213235
}
214236
}
215237

238+
/// [`CursorValues`] for a primitive column.
239+
///
240+
/// Caches the value at the current (and previous) offset, refreshed once per
241+
/// [`Cursor::advance`] via [`CursorValues::set_offset`], so the hot loser-tree
242+
/// comparisons read a cached field instead of indexing the buffer each time.
216243
#[derive(Debug)]
217-
pub struct PrimitiveValues<T: ArrowNativeTypeOp>(ScalarBuffer<T>);
244+
pub struct PrimitiveValues<T: ArrowNativeTypeOp> {
245+
values: ScalarBuffer<T>,
246+
/// Cached `values[offset]`.
247+
current: T,
248+
/// Cached `values[offset - 1]` (read by `eq_to_previous`, only past offset 0).
249+
previous: T,
250+
/// Current offset; used only to `debug_assert!` the cache is read in sync.
251+
offset: usize,
252+
}
253+
254+
impl<T: ArrowNativeTypeOp> PrimitiveValues<T> {
255+
fn new(values: ScalarBuffer<T>) -> Self {
256+
// Non-empty in practice; `unwrap_or_default` just avoids a panic.
257+
let first = values.first().copied().unwrap_or_default();
258+
Self {
259+
values,
260+
current: first,
261+
previous: first,
262+
offset: 0,
263+
}
264+
}
265+
}
218266

219267
impl<T: ArrowNativeTypeOp> CursorValues for PrimitiveValues<T> {
268+
#[inline(always)]
220269
fn len(&self) -> usize {
221-
self.0.len()
270+
self.values.len()
222271
}
223272

273+
#[inline(always)]
224274
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
225-
l.0[l_idx].is_eq(r.0[r_idx])
275+
// Arbitrary indices (cross-batch comparison), so index directly.
276+
l.values[l_idx].is_eq(r.values[r_idx])
226277
}
227278

279+
#[inline(always)]
228280
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
229281
assert!(idx > 0);
230-
cursor.0[idx].is_eq(cursor.0[idx - 1])
282+
debug_assert_eq!(idx, cursor.offset);
283+
cursor.current.is_eq(cursor.previous)
231284
}
232285

286+
#[inline(always)]
233287
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
234-
l.0[l_idx].compare(r.0[r_idx])
288+
debug_assert_eq!(l_idx, l.offset);
289+
debug_assert_eq!(r_idx, r.offset);
290+
l.current.compare(r.current)
291+
}
292+
293+
#[inline(always)]
294+
fn set_offset(&mut self, offset: usize) {
295+
// The caller (`Cursor::advance`) guarantees `offset < len`; inlined, that
296+
// guard dominates the index below so its bounds check is elided — the
297+
// length is checked once per row, not per comparison. The old `current`
298+
// is `values[offset - 1]`, so it becomes `previous`.
299+
self.previous = self.current;
300+
self.current = self.values[offset];
301+
self.offset = offset;
235302
}
236303
}
237304

@@ -241,6 +308,7 @@ pub struct ByteArrayValues<T: OffsetSizeTrait> {
241308
}
242309

243310
impl<T: OffsetSizeTrait> ByteArrayValues<T> {
311+
#[inline]
244312
fn value(&self, idx: usize) -> &[u8] {
245313
assert!(idx < self.len());
246314
// Safety: offsets are valid and checked bounds above
@@ -253,19 +321,23 @@ impl<T: OffsetSizeTrait> ByteArrayValues<T> {
253321
}
254322

255323
impl<T: OffsetSizeTrait> CursorValues for ByteArrayValues<T> {
324+
#[inline]
256325
fn len(&self) -> usize {
257326
self.offsets.len() - 1
258327
}
259328

329+
#[inline]
260330
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
261331
l.value(l_idx) == r.value(r_idx)
262332
}
263333

334+
#[inline]
264335
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
265336
assert!(idx > 0);
266337
cursor.value(idx) == cursor.value(idx - 1)
267338
}
268339

340+
#[inline]
269341
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
270342
l.value(l_idx).cmp(r.value(r_idx))
271343
}
@@ -394,16 +466,19 @@ impl<T: CursorValues> ArrayValues<T> {
394466
}
395467
}
396468

469+
#[inline(always)]
397470
fn is_null(&self, idx: usize) -> bool {
398471
(idx < self.null_threshold) == self.options.nulls_first
399472
}
400473
}
401474

402475
impl<T: CursorValues> CursorValues for ArrayValues<T> {
476+
#[inline(always)]
403477
fn len(&self) -> usize {
404478
self.values.len()
405479
}
406480

481+
#[inline(always)]
407482
fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool {
408483
match (l.is_null(l_idx), r.is_null(r_idx)) {
409484
(true, true) => true,
@@ -412,15 +487,19 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
412487
}
413488
}
414489

490+
#[inline(always)]
415491
fn eq_to_previous(cursor: &Self, idx: usize) -> bool {
416492
assert!(idx > 0);
417493
match (cursor.is_null(idx), cursor.is_null(idx - 1)) {
418494
(true, true) => true,
419-
(false, false) => T::eq(&cursor.values, idx, &cursor.values, idx - 1),
495+
// Delegate to inner `eq_to_previous` so a caching cursor can answer
496+
// without indexing.
497+
(false, false) => T::eq_to_previous(&cursor.values, idx),
420498
_ => false,
421499
}
422500
}
423501

502+
#[inline(always)]
424503
fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering {
425504
match (l.is_null(l_idx), r.is_null(r_idx)) {
426505
(true, true) => Ordering::Equal,
@@ -438,6 +517,12 @@ impl<T: CursorValues> CursorValues for ArrayValues<T> {
438517
},
439518
}
440519
}
520+
521+
#[inline(always)]
522+
fn set_offset(&mut self, offset: usize) {
523+
// Forward to the wrapped values (e.g. caching `PrimitiveValues`).
524+
self.values.set_offset(offset);
525+
}
441526
}
442527

443528
#[cfg(test)]
@@ -463,7 +548,7 @@ mod tests {
463548
let reservation = consumer.register(&memory_pool);
464549

465550
let values = ArrayValues {
466-
values: PrimitiveValues(values),
551+
values: PrimitiveValues::new(values),
467552
null_threshold,
468553
options,
469554
_reservation: reservation,

datafusion/physical-plan/src/sorts/merge.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,17 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
294294
// Adjust the loser tree if necessary, returning control if needed
295295
if !self.loser_tree_adjusted {
296296
let winner = self.loser_tree[0];
297-
if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
298-
self.done = true;
299-
return Poll::Ready(Some(Err(e)));
297+
// Fast path: skip the `maybe_poll_stream` call (and its `Poll`
298+
// plumbing) unless the winner's cursor is exhausted and needs a
299+
// fresh batch — it is live for almost every row.
300+
if self.cursors[winner].is_none() {
301+
match ready!(self.maybe_poll_stream(cx, winner)) {
302+
Ok(()) => {}
303+
Err(e) => {
304+
self.done = true;
305+
return Poll::Ready(Some(Err(e)));
306+
}
307+
}
300308
}
301309
self.update_loser_tree();
302310
}

0 commit comments

Comments
 (0)