Skip to content

Commit fd41ecf

Browse files
committed
remove two O(m) passes
1 parent 902effe commit fd41ecf

2 files changed

Lines changed: 23 additions & 31 deletions

File tree

  • datafusion

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -517,16 +517,21 @@ where
517517
// Copy only the surviving bytes into the fresh builder.
518518
self.buffer.append_slice(&frozen[end..]);
519519

520-
// Only surviving entries (offset >= end) need rebasing; emitted entries
521-
// (offset < end) are removed by drain_emitted and must not be touched.
522-
if end > 0 {
523-
for entry in self.map.iter_mut() {
524-
if entry.len.as_usize() > SHORT_VALUE_LEN && entry.offset_or_inline >= end
525-
{
526-
entry.offset_or_inline -= end;
527-
}
520+
// Single pass: drop emitted entries, rebase buffer offsets and payloads.
521+
let threshold = V::from(n);
522+
self.map.retain(|entry| {
523+
if entry.payload < threshold {
524+
return false;
528525
}
529-
}
526+
if end > 0
527+
&& entry.len.as_usize() > SHORT_VALUE_LEN
528+
&& entry.offset_or_inline >= end
529+
{
530+
entry.offset_or_inline -= end;
531+
}
532+
entry.payload = entry.payload - threshold;
533+
true
534+
});
530535

531536
let nulls = self.null.and_then(|(_, null_idx)| {
532537
if null_idx >= cursor && null_idx < cursor + n {
@@ -542,8 +547,11 @@ where
542547
None
543548
}
544549
});
545-
546-
self.drain_emitted(n);
550+
// Rebase null payload. If self.null survived the and_then above,
551+
// null_idx >= cursor + n, which by invariant means payload >= threshold.
552+
if let Some((ref mut payload, _)) = self.null {
553+
*payload = *payload - threshold;
554+
}
547555

548556
match self.output_type {
549557
OutputType::Binary => Arc::new(unsafe {
@@ -607,25 +615,6 @@ where
607615
}
608616
}
609617

610-
#[inline]
611-
fn drain_emitted(&mut self, n: usize)
612-
where
613-
V: std::ops::Sub<Output = V> + PartialOrd + From<usize>,
614-
{
615-
let threshold = V::from(n);
616-
self.map.retain(|entry| entry.payload >= threshold);
617-
for entry in self.map.iter_mut() {
618-
entry.payload = entry.payload - threshold;
619-
}
620-
match &mut self.null {
621-
Some((payload, _)) if *payload >= threshold => {
622-
*payload = *payload - threshold;
623-
}
624-
Some(_) => self.null = None, // null was in the emitted window
625-
None => {}
626-
}
627-
}
628-
629618
/// Total number of entries (including null, if present)
630619
pub fn len(&self) -> usize {
631620
self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,11 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
9090
self.num_groups = 0;
9191
self.map.take().into_state()
9292
}
93+
EmitTo::First(n) if n >= self.num_groups => {
94+
self.num_groups = 0;
95+
self.map.take().into_state()
96+
}
9397
EmitTo::First(n) => {
94-
let n = n.min(self.num_groups);
9598
let group_values = self.map.emit(n);
9699
self.num_groups -= n;
97100
group_values

0 commit comments

Comments
 (0)