Skip to content

Commit fdef903

Browse files
committed
extract some logic into identify_fully_matched_row_groups
1 parent e65f5a3 commit fdef903

File tree

1 file changed

+74
-41
lines changed

1 file changed

+74
-41
lines changed

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 74 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ impl RowGroupAccessPlanFilter {
8484
}
8585

8686
/// Prunes the access plan based on the limit and fully contained row groups.
87+
/// See the [description](https://github.com/apache/datafusion/issues/18860#issuecomment-3563442093)
88+
/// for how the pruning works and improves performance.
89+
/// For more information, see the [paper](https://arxiv.org/pdf/2504.11540)'s "Pruning for LIMIT Queries" part
8790
pub fn prune_by_limit(
8891
&mut self,
8992
limit: usize,
@@ -197,47 +200,15 @@ impl RowGroupAccessPlanFilter {
197200
}
198201
}
199202

200-
// Note: this part of code shouldn't be expensive with a limited number of row groups
201-
// If we do find it's expensive, we can consider optimizing it further.
202-
if !fully_contained_candidates_original_idx.is_empty() {
203-
// Use NotExpr to create the inverted predicate
204-
let inverted_expr =
205-
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
206-
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
207-
// before building the pruning predicate
208-
let mut simplifier = PhysicalExprSimplifier::new(arrow_schema);
209-
let inverted_expr = simplifier.simplify(inverted_expr).unwrap();
210-
if let Ok(inverted_predicate) = PruningPredicate::try_new(
211-
inverted_expr,
212-
Arc::clone(predicate.schema()),
213-
) {
214-
let inverted_pruning_stats = RowGroupPruningStatistics {
215-
parquet_schema,
216-
row_group_metadatas: fully_contained_candidates_original_idx
217-
.iter()
218-
.map(|&i| &groups[i])
219-
.collect::<Vec<_>>(),
220-
arrow_schema,
221-
};
222-
223-
if let Ok(inverted_values) =
224-
inverted_predicate.prune(&inverted_pruning_stats)
225-
{
226-
for (i, &original_row_group_idx) in
227-
fully_contained_candidates_original_idx.iter().enumerate()
228-
{
229-
// If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false),
230-
// it implies that *all* rows in this group satisfy the original predicate.
231-
if !inverted_values[i] {
232-
self.is_fully_matched[original_row_group_idx] = true;
233-
metrics
234-
.row_groups_pruned_statistics
235-
.add_fully_matched(1);
236-
}
237-
}
238-
}
239-
}
240-
}
203+
// Check if any of the matched row groups are fully contained by the predicate
204+
self.identify_fully_matched_row_groups(
205+
fully_contained_candidates_original_idx,
206+
arrow_schema,
207+
parquet_schema,
208+
groups,
209+
predicate,
210+
metrics,
211+
);
241212
}
242213
// stats filter array could not be built, so we can't prune
243214
Err(e) => {
@@ -247,6 +218,68 @@ impl RowGroupAccessPlanFilter {
247218
}
248219
}
249220

221+
/// Identifies row groups that are fully matched by the predicate.
222+
///
223+
/// This optimization checks whether all rows in a row group satisfy the predicate
224+
/// by inverting the predicate and checking if it prunes the row group. If the
225+
/// inverted predicate prunes a row group, it means no rows match the inverted
226+
/// predicate, which implies all rows match the original predicate.
227+
///
228+
/// Note: This optimization is relatively inexpensive for a limited number of row groups.
229+
fn identify_fully_matched_row_groups(
230+
&mut self,
231+
candidate_row_group_indices: Vec<usize>,
232+
arrow_schema: &Schema,
233+
parquet_schema: &SchemaDescriptor,
234+
groups: &[RowGroupMetaData],
235+
predicate: &PruningPredicate,
236+
metrics: &ParquetFileMetrics,
237+
) {
238+
if candidate_row_group_indices.is_empty() {
239+
return;
240+
}
241+
242+
// Use NotExpr to create the inverted predicate
243+
let inverted_expr = Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
244+
245+
// Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
246+
// before building the pruning predicate
247+
let mut simplifier = PhysicalExprSimplifier::new(arrow_schema);
248+
let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else {
249+
return;
250+
};
251+
252+
let Ok(inverted_predicate) =
253+
PruningPredicate::try_new(inverted_expr, Arc::clone(predicate.schema()))
254+
else {
255+
return;
256+
};
257+
258+
let inverted_pruning_stats = RowGroupPruningStatistics {
259+
parquet_schema,
260+
row_group_metadatas: candidate_row_group_indices
261+
.iter()
262+
.map(|&i| &groups[i])
263+
.collect::<Vec<_>>(),
264+
arrow_schema,
265+
};
266+
267+
let Ok(inverted_values) = inverted_predicate.prune(&inverted_pruning_stats)
268+
else {
269+
return;
270+
};
271+
272+
for (i, &original_row_group_idx) in candidate_row_group_indices.iter().enumerate()
273+
{
274+
// If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false),
275+
// it implies that *all* rows in this group satisfy the original predicate.
276+
if !inverted_values[i] {
277+
self.is_fully_matched[original_row_group_idx] = true;
278+
metrics.row_groups_pruned_statistics.add_fully_matched(1);
279+
}
280+
}
281+
}
282+
250283
/// Prune remaining row groups using available bloom filters and the
251284
/// [`PruningPredicate`].
252285
///

0 commit comments

Comments
 (0)