Skip to content

Commit 96b0eb1

Browse files
committed
Support row group limit pruning
1 parent e12666d commit 96b0eb1

File tree

3 files changed

+20
-24
lines changed

3 files changed

+20
-24
lines changed

datafusion/datasource-parquet/src/metrics.rs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,12 @@ pub struct ParquetFileMetrics {
4545
pub files_ranges_pruned_statistics: PruningMetrics,
4646
/// Number of times the predicate could not be evaluated
4747
pub predicate_evaluation_errors: Count,
48-
/// Number of row groups whose bloom filters were checked and matched (not pruned)
49-
pub row_groups_matched_bloom_filter: Count,
5048
/// Number of row groups pruned by bloom filters
51-
pub row_groups_pruned_bloom_filter: Count,
49+
pub row_groups_pruned_bloom_filter: PruningMetrics,
5250
/// Number of row groups pruned due to limit pruning.
53-
pub limit_pruned_row_groups: Count,
54-
/// Number of row groups whose statistics were checked and fully matched
55-
pub row_groups_fully_matched_statistics: Count,
56-
/// Number of row groups whose statistics were checked and matched (not pruned)
57-
pub row_groups_matched_statistics: Count,
51+
pub limit_pruned_row_groups: PruningMetrics,
5852
/// Number of row groups pruned by statistics
59-
pub row_groups_pruned_statistics: Count,
53+
pub row_groups_pruned_statistics: PruningMetrics,
6054
/// Total number of bytes scanned
6155
pub bytes_scanned: Count,
6256
/// Total rows filtered out by predicates pushed into parquet scan
@@ -98,19 +92,13 @@ impl ParquetFileMetrics {
9892
// -----------------------
9993
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
10094
.with_new_label("filename", filename.to_string())
101-
.counter("row_groups_pruned_bloom_filter", partition);
95+
.with_type(MetricType::SUMMARY)
96+
.pruning_metrics("row_groups_pruned_bloom_filter", partition);
10297

10398
let limit_pruned_row_groups = MetricBuilder::new(metrics)
10499
.with_new_label("filename", filename.to_string())
105-
.counter("limit_pruned_row_groups", partition);
106-
107-
let row_groups_fully_matched_statistics = MetricBuilder::new(metrics)
108-
.with_new_label("filename", filename.to_string())
109-
.counter("row_groups_fully_matched_statistics", partition);
110-
111-
let row_groups_matched_statistics = MetricBuilder::new(metrics)
112-
.with_new_label("filename", filename.to_string())
113-
.counter("row_groups_matched_statistics", partition);
100+
.with_type(MetricType::SUMMARY)
101+
.pruning_metrics("limit_pruned_row_groups", partition);
114102

115103
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
116104
.with_new_label("filename", filename.to_string())
@@ -185,8 +173,6 @@ impl ParquetFileMetrics {
185173
files_ranges_pruned_statistics,
186174
predicate_evaluation_errors,
187175
row_groups_pruned_bloom_filter,
188-
row_groups_fully_matched_statistics,
189-
row_groups_matched_statistics,
190176
row_groups_pruned_statistics,
191177
limit_pruned_row_groups,
192178
bytes_scanned,

datafusion/datasource-parquet/src/row_group_filter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl RowGroupAccessPlanFilter {
111111
let new_num_accessible_row_groups = fully_matched_row_group_indexes.len();
112112
let pruned_count = original_num_accessible_row_groups
113113
.saturating_sub(new_num_accessible_row_groups);
114-
metrics.limit_pruned_row_groups.add(pruned_count);
114+
metrics.limit_pruned_row_groups.add_pruned(pruned_count);
115115

116116
let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len());
117117
for &idx in &fully_matched_row_group_indexes {
@@ -194,7 +194,6 @@ impl RowGroupAccessPlanFilter {
194194
} else {
195195
metrics.row_groups_pruned_statistics.add_matched(1);
196196
fully_contained_candidates_original_idx.push(*idx);
197-
metrics.row_groups_matched_statistics.add(1);
198197
}
199198
}
200199

@@ -231,7 +230,9 @@ impl RowGroupAccessPlanFilter {
231230
// it implies that *all* rows in this group satisfy the original predicate.
232231
if !inverted_values[i] {
233232
self.is_fully_matched[original_row_group_idx] = true;
234-
metrics.row_groups_fully_matched_statistics.add(1);
233+
metrics
234+
.row_groups_pruned_statistics
235+
.add_fully_matched(1);
235236
}
236237
}
237238
}

datafusion/physical-plan/src/metrics/value.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ impl Drop for ScopedTimerGuard<'_> {
373373
pub struct PruningMetrics {
374374
pruned: Arc<AtomicUsize>,
375375
matched: Arc<AtomicUsize>,
376+
fully_matched: Arc<AtomicUsize>,
376377
}
377378

378379
impl Display for PruningMetrics {
@@ -401,6 +402,7 @@ impl PruningMetrics {
401402
Self {
402403
pruned: Arc::new(AtomicUsize::new(0)),
403404
matched: Arc::new(AtomicUsize::new(0)),
405+
fully_matched: Arc::new(AtomicUsize::new(0)),
404406
}
405407
}
406408

@@ -418,6 +420,13 @@ impl PruningMetrics {
418420
self.matched.fetch_add(n, Ordering::Relaxed);
419421
}
420422

423+
/// Add `n` to the metric's fully matched value
424+
pub fn add_fully_matched(&self, n: usize) {
425+
// relaxed ordering for operations on `value` poses no issues
426+
// we're purely using atomic ops with no associated memory ops
427+
self.fully_matched.fetch_add(n, Ordering::Relaxed);
428+
}
429+
421430
/// Subtract `n` to the metric's matched value.
422431
pub fn subtract_matched(&self, n: usize) {
423432
// relaxed ordering for operations on `value` poses no issues

0 commit comments

Comments
 (0)