Skip to content

Draft POC: Push batch with filter without copy #8103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 131 additions & 6 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{compute_filter_plan, FilterPlan};
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{downcast_primitive, Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_schema::{ArrowError, DataType, SchemaRef};
Expand Down Expand Up @@ -198,15 +198,139 @@ impl BatchCoalescer {
/// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
/// assert_eq!(completed_batch, expected_batch);
/// ```
/// Zero-copy implementation using `compute_filter_plan`.
pub fn push_batch_with_filter(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
predicate: &BooleanArray,
) -> Result<(), ArrowError> {
// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
self.push_batch(filtered_batch)
// First compute the filter plan based on the predicate
// (calls FilterBuilder::optimize internally)
let plan = compute_filter_plan(predicate);

match plan {
FilterPlan::None => {
// No rows selected
Ok(())
}
FilterPlan::All => {
// All rows selected: directly call push_batch (consumes batch)
self.push_batch(batch)
}
FilterPlan::Slices(slices) => {
// Consume the batch and set sources on in_progress arrays
let (_schema, arrays, _nrows) = batch.into_parts();
assert_eq!(arrays.len(), self.in_progress_arrays.len());

self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});

// For each contiguous slice, copy rows in chunks fitting target_batch_size
for (mut start, end) in slices {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect to really make this fast it will need to have specialized implementations for the different array types (not use mutable array data)

I think we could yoink / reuse some of the existing code from the filter kernel:

_ => downcast_primitive_array! {

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for review and good suggestion.

I found the hot path for profile is, copy_rows:

Especially for the code for null handling:

        // add nulls if necessary
        if let Some(nulls) = s.nulls().as_ref() {
            let nulls = nulls.slice(offset, len);
            self.nulls.append_buffer(&nulls);
        } else {
            self.nulls.append_n_non_nulls(len);
        };

It may due to we will call more times for copy_rows here, but the original logic is just filter and concact to a batch, and then to push_batch, so it will be friendly for SIMD. Also it will make copy_rows SIMD friendly for bigger batch. So the original run is pretty faster for most cases.

May be we need to only change this logic for selective < 0.005 for example, but it's hard for me to decide.

let mut remaining = end - start;
while remaining > 0 {
let space = self.target_batch_size - self.buffered_rows;
debug_assert!(space > 0);
let to_copy = remaining.min(space);

for in_progress in self.in_progress_arrays.iter_mut() {
// copy_rows(offset, length)
in_progress.copy_rows(start, to_copy)?;
}

self.buffered_rows += to_copy;
start += to_copy;
remaining -= to_copy;

if self.buffered_rows == self.target_batch_size {
self.finish_buffered_batch()?;
}
}
}

// Clear sources to allow memory to be freed
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

Ok(())
}
FilterPlan::Indices(indices) => {
// Consume batch and set sources (same as slices path)
let (_schema, arrays, _nrows) = batch.into_parts();
assert_eq!(arrays.len(), self.in_progress_arrays.len());

self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});

// Merge consecutive indices into ranges to reduce copy_rows calls
let mut it = indices.into_iter();
if let Some(mut cur) = it.next() {
let mut run_start = cur;
let mut run_end = cur + 1; // exclusive
for idx in it {
if idx == run_end {
// Extend current run
run_end += 1;
} else {
// Flush current run [run_start, run_end)
let mut remaining = run_end - run_start;
let mut src_off = run_start;
while remaining > 0 {
let space = self.target_batch_size - self.buffered_rows;
debug_assert!(space > 0);
let to_copy = remaining.min(space);
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(src_off, to_copy)?;
}
self.buffered_rows += to_copy;
src_off += to_copy;
remaining -= to_copy;
if self.buffered_rows == self.target_batch_size {
self.finish_buffered_batch()?;
}
}
// Start new run
run_start = idx;
run_end = idx + 1;
}
}

// Flush last run
let mut remaining = run_end - run_start;
let mut src_off = run_start;
while remaining > 0 {
let space = self.target_batch_size - self.buffered_rows;
debug_assert!(space > 0);
let to_copy = remaining.min(space);
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows(src_off, to_copy)?;
}
self.buffered_rows += to_copy;
src_off += to_copy;
remaining -= to_copy;
if self.buffered_rows == self.target_batch_size {
self.finish_buffered_batch()?;
}
}
}

// Clear sources
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

Ok(())
}
}
}

/// Push all the rows from `batch` into the Coalescer
Expand Down Expand Up @@ -400,6 +524,7 @@ mod tests {
use arrow_schema::{DataType, Field, Schema};
use rand::{Rng, SeedableRng};
use std::ops::Range;
use crate::filter::filter_record_batch;

#[test]
fn test_coalesce() {
Expand Down
38 changes: 38 additions & 0 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,44 @@ fn multiple_arrays(data_type: &DataType) -> bool {
}
}

/// A public, lightweight plan describing how to apply a Boolean filter.
///
/// Used for zero-copy filtering externally (e.g., in BatchCoalescer):
/// - `None`: no rows selected
/// - `All`: all rows selected
/// - `Slices`: list of continuous ranges `[start, end)` (can be used directly for `copy_rows`)
/// - `Indices`: list of single-row indices (can be merged into continuous ranges externally)
#[derive(Debug, Clone)]
pub enum FilterPlan {
None,
All,
Slices(Vec<(usize, usize)>),
Indices(Vec<usize>),
}

/// Compute a filtering plan based on `FilterBuilder::optimize`.
///
/// This function calls `FilterBuilder::new(filter).optimize()`, then
/// converts the optimized `IterationStrategy` into the above `FilterPlan`
/// to enable zero-copy execution externally.
pub fn compute_filter_plan(filter: &BooleanArray) -> FilterPlan {
let fb = FilterBuilder::new(filter);
let pred = fb.build();

match pred.strategy {
IterationStrategy::None => FilterPlan::None,
IterationStrategy::All => FilterPlan::All,
IterationStrategy::Slices(s) => FilterPlan::Slices(s), // moved directly
IterationStrategy::Indices(i) => FilterPlan::Indices(i), // moved directly
IterationStrategy::SlicesIterator => {
FilterPlan::Slices(SlicesIterator::new(&pred.filter).collect())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoiding this allocation will likely help

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb , i tried now, but it not improved the regression. The compute_filter_plan almost cost nothing for the benchmark profile.

}
IterationStrategy::IndexIterator => {
FilterPlan::Indices(IndexIterator::new(&pred.filter, pred.count).collect())
}
}
}

/// Returns a filtered [RecordBatch] where the corresponding elements of
/// `predicate` are true.
///
Expand Down
Loading