Skip to content

Commit d075c86

Browse files
committed
Add fetch_order_sensitive during limit pushdown to decide if use limit pruning
1 parent 96b0eb1 commit d075c86

File tree

18 files changed

+188
-66
lines changed

18 files changed

+188
-66
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ impl TableProvider for ListingTable {
506506
.with_statistics(statistics)
507507
.with_projection_indices(projection)
508508
.with_limit(limit)
509+
.with_limit_order_sensitive(args.limit_order_sensitive())
509510
.with_output_ordering(output_ordering)
510511
.with_expr_adapter(self.expr_adapter_factory.clone())
511512
.build(),

datafusion/catalog/src/table.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ pub struct ScanArgs<'a> {
336336
filters: Option<&'a [Expr]>,
337337
projection: Option<&'a [usize]>,
338338
limit: Option<usize>,
339+
limit_order_sensitive: bool,
339340
}
340341

341342
impl<'a> ScanArgs<'a> {
@@ -397,6 +398,25 @@ impl<'a> ScanArgs<'a> {
397398
pub fn limit(&self) -> Option<usize> {
398399
self.limit
399400
}
401+
402+
/// Set whether the scan's limit should be order-sensitive.
403+
///
404+
/// If specified, the scan should return the limited rows in a specific order.
405+
/// Or we can leverage limit pruning to optimize the scan.
406+
///
407+
/// # Arguments
408+
/// * `order_sensitive` - Whether the scan's limit should be order-sensitive
409+
pub fn with_limit_order_sensitive(mut self, order_sensitive: bool) -> Self {
410+
self.limit_order_sensitive = order_sensitive;
411+
self
412+
}
413+
414+
/// Get whether the scan's limit should be order-sensitive.
415+
///
416+
/// Returns `true` if the scan's limit should be order-sensitive, or `false` if not.
417+
pub fn limit_order_sensitive(&self) -> bool {
418+
self.limit_order_sensitive
419+
}
400420
}
401421

402422
/// Result of a table scan operation from [`TableProvider::scan_with_args`].

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ impl DefaultPhysicalPlanner {
457457
projection,
458458
filters,
459459
fetch,
460+
fetch_order_sensitive,
460461
..
461462
}) => {
462463
let source = source_as_provider(source)?;
@@ -468,7 +469,8 @@ impl DefaultPhysicalPlanner {
468469
let opts = ScanArgs::default()
469470
.with_projection(projection.as_deref())
470471
.with_filters(Some(&filters_vec))
471-
.with_limit(*fetch);
472+
.with_limit(*fetch)
473+
.with_limit_order_sensitive(*fetch_order_sensitive);
472474
let res = source.scan_with_args(session_state, opts).await?;
473475
Arc::clone(res.plan())
474476
}

datafusion/core/tests/parquet/mod.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ struct TestOutput {
126126
impl TestOutput {
127127
/// retrieve the value of the named metric, if any
128128
fn metric_value(&self, metric_name: &str) -> Option<usize> {
129-
if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
129+
if let Some((pruned, _matched, _fully)) = self.pruning_metric(metric_name) {
130130
return Some(pruned);
131131
}
132132

@@ -140,9 +140,10 @@ impl TestOutput {
140140
})
141141
}
142142

143-
fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
143+
fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize, usize)> {
144144
let mut total_pruned = 0;
145145
let mut total_matched = 0;
146+
let mut total_fully_matched = 0;
146147
let mut found = false;
147148

148149
for metric in self.parquet_metrics.iter() {
@@ -154,13 +155,15 @@ impl TestOutput {
154155
{
155156
total_pruned += pruning_metrics.pruned();
156157
total_matched += pruning_metrics.matched();
158+
total_fully_matched += pruning_metrics.fully_matched();
159+
157160
found = true;
158161
}
159162
}
160163
}
161164

162165
if found {
163-
Some((total_pruned, total_matched))
166+
Some((total_pruned, total_matched, total_fully_matched))
164167
} else {
165168
None
166169
}
@@ -172,32 +175,33 @@ impl TestOutput {
172175
}
173176

174177
/// The number of row_groups pruned / matched by bloom filter
175-
fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
178+
fn row_groups_bloom_filter(&self) -> Option<(usize, usize, usize)> {
176179
self.pruning_metric("row_groups_pruned_bloom_filter")
177180
}
178181

179182
/// The number of row_groups matched by statistics
180183
fn row_groups_matched_statistics(&self) -> Option<usize> {
181184
self.pruning_metric("row_groups_pruned_statistics")
182-
.map(|(_pruned, matched)| matched)
185+
.map(|(_pruned, matched, _fully)| matched)
183186
}
184187

185188
/// The number of row_groups fully matched by statistics
186189
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
187-
self.metric_value("row_groups_fully_matched_statistics")
190+
self.pruning_metric("row_groups_pruned_statistics")
191+
.map(|(_pruned, _, fully)| fully)
188192
}
189193

190194
/// The number of row_groups pruned by statistics
191195
fn row_groups_pruned_statistics(&self) -> Option<usize> {
192196
self.pruning_metric("row_groups_pruned_statistics")
193-
.map(|(pruned, _matched)| pruned)
197+
.map(|(pruned, _matched, _fully)| pruned)
194198
}
195199

196200
/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
197201
/// for testing purpose, here it only aggregate the `pruned` count.
198202
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
199203
self.pruning_metric("files_ranges_pruned_statistics")
200-
.map(|(pruned, _matched)| pruned)
204+
.map(|(pruned, _matched, _fully)| pruned)
201205
}
202206

203207
/// The number of row_groups matched by bloom filter or statistics
@@ -207,26 +211,27 @@ impl TestOutput {
207211
/// count.
208212
fn row_groups_matched(&self) -> Option<usize> {
209213
self.row_groups_bloom_filter()
210-
.map(|(_pruned, matched)| matched)
214+
.map(|(_pruned, matched, _fully)| matched)
211215
}
212216

213217
/// The number of row_groups pruned
214218
fn row_groups_pruned(&self) -> Option<usize> {
215219
self.row_groups_bloom_filter()
216-
.map(|(pruned, _matched)| pruned)
220+
.map(|(pruned, _matched, _fully)| pruned)
217221
.zip(self.row_groups_pruned_statistics())
218222
.map(|(a, b)| a + b)
219223
}
220224

221225
/// The number of row pages pruned
222226
fn row_pages_pruned(&self) -> Option<usize> {
223227
self.pruning_metric("page_index_rows_pruned")
224-
.map(|(pruned, _matched)| pruned)
228+
.map(|(pruned, _matched, _fully)| pruned)
225229
}
226230

227231
/// The number of row groups pruned by limit pruning
228232
fn limit_pruned_row_groups(&self) -> Option<usize> {
229-
self.metric_value("limit_pruned_row_groups")
233+
self.pruning_metric("limit_pruned_row_groups")
234+
.map(|(pruned, _, _)| pruned)
230235
}
231236

232237
fn description(&self) -> String {

datafusion/core/tests/parquet/row_group_pruning.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,12 @@ impl RowGroupPruningTest {
157157
);
158158
let bloom_filter_metrics = output.row_groups_bloom_filter();
159159
assert_eq!(
160-
bloom_filter_metrics.map(|(_pruned, matched)| matched),
160+
bloom_filter_metrics.map(|(_pruned, matched, _)| matched),
161161
self.expected_row_group_matched_by_bloom_filter,
162162
"mismatched row_groups_matched_bloom_filter",
163163
);
164164
assert_eq!(
165-
bloom_filter_metrics.map(|(pruned, _matched)| pruned),
165+
bloom_filter_metrics.map(|(pruned, _matched, _)| pruned),
166166
self.expected_row_group_pruned_by_bloom_filter,
167167
"mismatched row_groups_pruned_bloom_filter",
168168
);

datafusion/datasource-parquet/src/opener.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ pub(super) struct ParquetOpener {
6868
pub batch_size: usize,
6969
/// Optional limit on the number of rows to read
7070
pub limit: Option<usize>,
71+
/// limit order sensitivity
72+
pub limit_order_sensitive: bool,
7173
/// Optional predicate to apply during the scan
7274
pub predicate: Option<Arc<dyn PhysicalExpr>>,
7375
/// Schema of the output table without partition columns.
@@ -163,6 +165,8 @@ impl FileOpener for ParquetOpener {
163165
let encryption_context = self.get_encryption_context();
164166
let max_predicate_cache_size = self.max_predicate_cache_size;
165167

168+
let limit_order_sensitive = self.limit_order_sensitive;
169+
166170
Ok(Box::pin(async move {
167171
#[cfg(feature = "parquet_encryption")]
168172
let file_decryption_properties = encryption_context
@@ -407,8 +411,8 @@ impl FileOpener for ParquetOpener {
407411
.add_matched(n_remaining_row_groups);
408412
}
409413

410-
// Prune by limit
411-
if let Some(limit) = limit {
414+
// Prune by limit if limit is set and limit order is not sensitive
415+
if let (Some(limit), false) = (limit, limit_order_sensitive) {
412416
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
413417
}
414418

@@ -881,6 +885,7 @@ mod test {
881885
projection: Arc::new([0, 1]),
882886
batch_size: 1024,
883887
limit: None,
888+
limit_order_sensitive: false,
884889
predicate: Some(predicate),
885890
logical_file_schema: schema.clone(),
886891
metadata_size_hint: None,
@@ -950,6 +955,7 @@ mod test {
950955
projection: Arc::new([0]),
951956
batch_size: 1024,
952957
limit: None,
958+
limit_order_sensitive: false,
953959
predicate: Some(predicate),
954960
logical_file_schema: file_schema.clone(),
955961
metadata_size_hint: None,
@@ -1039,6 +1045,7 @@ mod test {
10391045
projection: Arc::new([0]),
10401046
batch_size: 1024,
10411047
limit: None,
1048+
limit_order_sensitive: false,
10421049
predicate: Some(predicate),
10431050
logical_file_schema: file_schema.clone(),
10441051
metadata_size_hint: None,
@@ -1131,6 +1138,7 @@ mod test {
11311138
projection: Arc::new([0]),
11321139
batch_size: 1024,
11331140
limit: None,
1141+
limit_order_sensitive: false,
11341142
predicate: Some(predicate),
11351143
logical_file_schema: file_schema.clone(),
11361144
metadata_size_hint: None,
@@ -1223,6 +1231,7 @@ mod test {
12231231
projection: Arc::new([0]),
12241232
batch_size: 1024,
12251233
limit: None,
1234+
limit_order_sensitive: false,
12261235
predicate: Some(predicate),
12271236
logical_file_schema: file_schema.clone(),
12281237
metadata_size_hint: None,
@@ -1377,6 +1386,7 @@ mod test {
13771386
projection: Arc::new([0, 1]),
13781387
batch_size: 1024,
13791388
limit: None,
1389+
limit_order_sensitive: false,
13801390
predicate: Some(predicate),
13811391
logical_file_schema: Arc::clone(&table_schema),
13821392
metadata_size_hint: None,

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ impl FileSource for ParquetSource {
583583
.batch_size
584584
.expect("Batch size must set before creating ParquetOpener"),
585585
limit: base_config.limit,
586+
limit_order_sensitive: base_config.limit_order_sensitive,
586587
predicate: self.predicate.clone(),
587588
logical_file_schema: Arc::clone(base_config.file_schema()),
588589
partition_fields: base_config.table_partition_cols().clone(),

datafusion/datasource/src/file_scan_config.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ pub struct FileScanConfig {
175175
/// The maximum number of records to read from this plan. If `None`,
176176
/// all records after filtering are returned.
177177
pub limit: Option<usize>,
178+
/// Whether the scan's limit is order sensitive
179+
pub limit_order_sensitive: bool,
178180
/// All equivalent lexicographical orderings that describe the schema.
179181
pub output_ordering: Vec<LexOrdering>,
180182
/// File compression type
@@ -256,6 +258,7 @@ pub struct FileScanConfigBuilder {
256258
object_store_url: ObjectStoreUrl,
257259
file_source: Arc<dyn FileSource>,
258260
limit: Option<usize>,
261+
limit_order_sensitive: bool,
259262
projection_indices: Option<Vec<usize>>,
260263
constraints: Option<Constraints>,
261264
file_groups: Vec<FileGroup>,
@@ -287,6 +290,7 @@ impl FileScanConfigBuilder {
287290
file_compression_type: None,
288291
new_lines_in_values: None,
289292
limit: None,
293+
limit_order_sensitive: false,
290294
projection_indices: None,
291295
constraints: None,
292296
batch_size: None,
@@ -301,6 +305,12 @@ impl FileScanConfigBuilder {
301305
self
302306
}
303307

308+
/// Set whether the limit should be order-sensitive.
309+
pub fn with_limit_order_sensitive(mut self, order_sensitive: bool) -> Self {
310+
self.limit_order_sensitive = order_sensitive;
311+
self
312+
}
313+
304314
/// Set the file source for scanning files.
305315
///
306316
/// This method allows you to change the file source implementation (e.g. ParquetSource, CsvSource, etc.)
@@ -428,6 +438,7 @@ impl FileScanConfigBuilder {
428438
object_store_url,
429439
file_source,
430440
limit,
441+
limit_order_sensitive,
431442
projection_indices,
432443
constraints,
433444
file_groups,
@@ -461,6 +472,7 @@ impl FileScanConfigBuilder {
461472
object_store_url,
462473
file_source,
463474
limit,
475+
limit_order_sensitive,
464476
projection_exprs,
465477
constraints,
466478
file_groups,
@@ -485,6 +497,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
485497
file_compression_type: Some(config.file_compression_type),
486498
new_lines_in_values: Some(config.new_lines_in_values),
487499
limit: config.limit,
500+
limit_order_sensitive: config.limit_order_sensitive,
488501
projection_indices: config
489502
.projection_exprs
490503
.map(|p| p.ordered_column_indices()),

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2708,12 +2708,12 @@ mod tests {
27082708

27092709
assert_snapshot!(plan, @r"
27102710
Union
2711-
Cross Join:
2711+
Cross Join:
27122712
SubqueryAlias: left
27132713
Values: (Int32(1))
27142714
SubqueryAlias: right
27152715
Values: (Int32(1))
2716-
Cross Join:
2716+
Cross Join:
27172717
SubqueryAlias: left
27182718
Values: (Int32(1))
27192719
SubqueryAlias: right

0 commit comments

Comments
 (0)