diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 891d62fc3aa6..6b10d7192057 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -20,7 +20,7 @@ //! //! [`filter`]: crate::filter::filter //! [`take`]: crate::take::take -use crate::filter::filter_record_batch; +use crate::filter::{compute_filter_plan, FilterPlan}; use arrow_array::types::{BinaryViewType, StringViewType}; use arrow_array::{downcast_primitive, Array, ArrayRef, BooleanArray, RecordBatch}; use arrow_schema::{ArrowError, DataType, SchemaRef}; @@ -198,15 +198,139 @@ impl BatchCoalescer { /// let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap(); /// assert_eq!(completed_batch, expected_batch); /// ``` + /// Zero-copy implementation using `compute_filter_plan`. pub fn push_batch_with_filter( &mut self, batch: RecordBatch, - filter: &BooleanArray, + predicate: &BooleanArray, ) -> Result<(), ArrowError> { - // TODO: optimize this to avoid materializing (copying the results - // of filter to a new batch) - let filtered_batch = filter_record_batch(&batch, filter)?; - self.push_batch(filtered_batch) + // First compute the filter plan based on the predicate + // (calls FilterBuilder::optimize internally) + let plan = compute_filter_plan(predicate); + + match plan { + FilterPlan::None => { + // No rows selected + Ok(()) + } + FilterPlan::All => { + // All rows selected: directly call push_batch (consumes batch) + self.push_batch(batch) + } + FilterPlan::Slices(slices) => { + // Consume the batch and set sources on in_progress arrays + let (_schema, arrays, _nrows) = batch.into_parts(); + assert_eq!(arrays.len(), self.in_progress_arrays.len()); + + self.in_progress_arrays + .iter_mut() + .zip(arrays) + .for_each(|(in_progress, array)| { + in_progress.set_source(Some(array)); + }); + + // For each contiguous slice, copy rows in chunks fitting target_batch_size + for (mut start, end) in slices { + let mut remaining = end - start; + while remaining > 0 { + let space = self.target_batch_size - self.buffered_rows; + debug_assert!(space > 0); + let to_copy = remaining.min(space); + + for in_progress in self.in_progress_arrays.iter_mut() { + // copy_rows(offset, length) + in_progress.copy_rows(start, to_copy)?; + } + + self.buffered_rows += to_copy; + start += to_copy; + remaining -= to_copy; + + if self.buffered_rows == self.target_batch_size { + self.finish_buffered_batch()?; + } + } + } + + // Clear sources to allow memory to be freed + for in_progress in self.in_progress_arrays.iter_mut() { + in_progress.set_source(None); + } + + Ok(()) + } + FilterPlan::Indices(indices) => { + // Consume batch and set sources (same as slices path) + let (_schema, arrays, _nrows) = batch.into_parts(); + assert_eq!(arrays.len(), self.in_progress_arrays.len()); + + self.in_progress_arrays + .iter_mut() + .zip(arrays) + .for_each(|(in_progress, array)| { + in_progress.set_source(Some(array)); + }); + + // Merge consecutive indices into ranges to reduce copy_rows calls + let mut it = indices.into_iter(); + if let Some(mut cur) = it.next() { + let mut run_start = cur; + let mut run_end = cur + 1; // exclusive + for idx in it { + if idx == run_end { + // Extend current run + run_end += 1; + } else { + // Flush current run [run_start, run_end) + let mut remaining = run_end - run_start; + let mut src_off = run_start; + while remaining > 0 { + let space = self.target_batch_size - self.buffered_rows; + debug_assert!(space > 0); + let to_copy = remaining.min(space); + for in_progress in self.in_progress_arrays.iter_mut() { + in_progress.copy_rows(src_off, to_copy)?; + } + self.buffered_rows += to_copy; + src_off += to_copy; + remaining -= to_copy; + if self.buffered_rows == self.target_batch_size { + self.finish_buffered_batch()?; + } + } + // Start new run + run_start = idx; + run_end = idx + 1; + } + } + + // Flush last run + let mut remaining = run_end - run_start; + let mut src_off = run_start; + while remaining > 0 { + let space = self.target_batch_size - self.buffered_rows; + debug_assert!(space > 0); + let to_copy = remaining.min(space); + for in_progress in self.in_progress_arrays.iter_mut() { + in_progress.copy_rows(src_off, to_copy)?; + } + self.buffered_rows += to_copy; + src_off += to_copy; + remaining -= to_copy; + if self.buffered_rows == self.target_batch_size { + self.finish_buffered_batch()?; + } + } + } + + // Clear sources + for in_progress in self.in_progress_arrays.iter_mut() { + in_progress.set_source(None); + } + + Ok(()) + } + } } /// Push all the rows from `batch` into the Coalescer @@ -400,6 +524,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use rand::{Rng, SeedableRng}; use std::ops::Range; + use crate::filter::filter_record_batch; #[test] fn test_coalesce() { diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 641599cea641..8e397f2d208e 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -164,6 +164,44 @@ fn multiple_arrays(data_type: &DataType) -> bool { } } +/// A public, lightweight plan describing how to apply a Boolean filter. +/// +/// Used for zero-copy filtering externally (e.g., in BatchCoalescer): +/// - `None`: no rows selected +/// - `All`: all rows selected +/// - `Slices`: list of continuous ranges `[start, end)` (can be used directly for `copy_rows`) +/// - `Indices`: list of single-row indices (can be merged into continuous ranges externally) +#[derive(Debug, Clone)] +pub enum FilterPlan { + None, + All, + Slices(Vec<(usize, usize)>), + Indices(Vec), +} + +/// Compute a filtering plan based on `FilterBuilder::optimize`. +/// +/// This function calls `FilterBuilder::new(filter).optimize()`, then +/// converts the optimized `IterationStrategy` into the above `FilterPlan` +/// to enable zero-copy execution externally. +pub fn compute_filter_plan(filter: &BooleanArray) -> FilterPlan { + let fb = FilterBuilder::new(filter); + let pred = fb.build(); + + match pred.strategy { + IterationStrategy::None => FilterPlan::None, + IterationStrategy::All => FilterPlan::All, + IterationStrategy::Slices(s) => FilterPlan::Slices(s), // moved directly + IterationStrategy::Indices(i) => FilterPlan::Indices(i), // moved directly + IterationStrategy::SlicesIterator => { + FilterPlan::Slices(SlicesIterator::new(&pred.filter).collect()) + } + IterationStrategy::IndexIterator => { + FilterPlan::Indices(IndexIterator::new(&pred.filter, pred.count).collect()) + } + } +} + /// Returns a filtered [RecordBatch] where the corresponding elements of /// `predicate` are true. ///