Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ impl TableProvider for ListingTable {
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_limit(limit)
.with_preserve_order(args.preserve_order())
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
Expand Down
12 changes: 12 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub struct ScanArgs<'a> {
filters: Option<&'a [Expr]>,
projection: Option<&'a [usize]>,
limit: Option<usize>,
preserve_order: bool,
}

impl<'a> ScanArgs<'a> {
Expand Down Expand Up @@ -397,6 +398,17 @@ impl<'a> ScanArgs<'a> {
pub fn limit(&self) -> Option<usize> {
self.limit
}

/// Set whether should keep the output rows in order
pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
self.preserve_order = order_sensitive;
self
}

/// Get whether should keep the output rows in order
pub fn preserve_order(&self) -> bool {
self.preserve_order
}
}

/// Result of a table scan operation from [`TableProvider::scan_with_args`].
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ impl DefaultPhysicalPlanner {
projection,
filters,
fetch,
preserve_order,
..
}) => {
let source = source_as_provider(source)?;
Expand All @@ -468,7 +469,8 @@ impl DefaultPhysicalPlanner {
let opts = ScanArgs::default()
.with_projection(projection.as_deref())
.with_filters(Some(&filters_vec))
.with_limit(*fetch);
.with_limit(*fetch)
.with_preserve_order(*preserve_order);
let res = source.scan_with_args(session_state, opts).await?;
Arc::clone(res.plan())
}
Expand Down
77 changes: 62 additions & 15 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct TestOutput {
impl TestOutput {
/// retrieve the value of the named metric, if any
fn metric_value(&self, metric_name: &str) -> Option<usize> {
if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
if let Some((pruned, _matched, _fully)) = self.pruning_metric(metric_name) {
return Some(pruned);
}

Expand All @@ -140,9 +140,10 @@ impl TestOutput {
})
}

fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize, usize)> {
let mut total_pruned = 0;
let mut total_matched = 0;
let mut total_fully_matched = 0;
let mut found = false;

for metric in self.parquet_metrics.iter() {
Expand All @@ -154,13 +155,15 @@ impl TestOutput {
{
total_pruned += pruning_metrics.pruned();
total_matched += pruning_metrics.matched();
total_fully_matched += pruning_metrics.fully_matched();

found = true;
}
}
}

if found {
Some((total_pruned, total_matched))
Some((total_pruned, total_matched, total_fully_matched))
} else {
None
}
Expand All @@ -172,27 +175,33 @@ impl TestOutput {
}

/// The number of row_groups pruned / matched by bloom filter
fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
fn row_groups_bloom_filter(&self) -> Option<(usize, usize, usize)> {
self.pruning_metric("row_groups_pruned_bloom_filter")
}

/// The number of row_groups matched by statistics
fn row_groups_matched_statistics(&self) -> Option<usize> {
self.pruning_metric("row_groups_pruned_statistics")
.map(|(_pruned, matched)| matched)
.map(|(_pruned, matched, _fully)| matched)
}

/// The number of row_groups fully matched by statistics
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
self.pruning_metric("row_groups_pruned_statistics")
.map(|(_pruned, _, fully)| fully)
}

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.pruning_metric("row_groups_pruned_statistics")
.map(|(pruned, _matched)| pruned)
.map(|(pruned, _matched, _fully)| pruned)
}

/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
/// for testing purpose, here it only aggregate the `pruned` count.
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
self.pruning_metric("files_ranges_pruned_statistics")
.map(|(pruned, _matched)| pruned)
.map(|(pruned, _matched, _fully)| pruned)
}

/// The number of row_groups matched by bloom filter or statistics
Expand All @@ -202,21 +211,27 @@ impl TestOutput {
/// count.
fn row_groups_matched(&self) -> Option<usize> {
self.row_groups_bloom_filter()
.map(|(_pruned, matched)| matched)
.map(|(_pruned, matched, _fully)| matched)
}

/// The number of row_groups pruned
fn row_groups_pruned(&self) -> Option<usize> {
self.row_groups_bloom_filter()
.map(|(pruned, _matched)| pruned)
.map(|(pruned, _matched, _fully)| pruned)
.zip(self.row_groups_pruned_statistics())
.map(|(a, b)| a + b)
}

/// The number of row pages pruned
fn row_pages_pruned(&self) -> Option<usize> {
self.pruning_metric("page_index_rows_pruned")
.map(|(pruned, _matched)| pruned)
.map(|(pruned, _matched, _fully)| pruned)
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.pruning_metric("limit_pruned_row_groups")
.map(|(pruned, _, _)| pruned)
}

fn description(&self) -> String {
Expand All @@ -232,20 +247,41 @@ impl TestOutput {
/// and the appropriate scenario
impl ContextWithParquet {
async fn new(scenario: Scenario, unit: Unit) -> Self {
Self::with_config(scenario, unit, SessionConfig::new()).await
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
}

/// Set custom schema and batches for the test
pub async fn with_custom_data(
scenario: Scenario,
unit: Unit,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Self {
Self::with_config(
scenario,
unit,
SessionConfig::new(),
Some(schema),
Some(batches),
)
.await
}

async fn with_config(
scenario: Scenario,
unit: Unit,
mut config: SessionConfig,
custom_schema: Option<Arc<Schema>>,
custom_batches: Option<Vec<RecordBatch>>,
) -> Self {
// Use a single partition for deterministic results no matter how many CPUs the host has
config = config.with_target_partitions(1);
let file = match unit {
Unit::RowGroup(row_per_group) => {
config = config.with_parquet_bloom_filter_pruning(true);
make_test_file_rg(scenario, row_per_group).await
config.options_mut().execution.parquet.pushdown_filters = true;
make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches)
.await
}
Unit::Page(row_per_page) => {
config = config.with_parquet_page_index_pruning(true);
Expand Down Expand Up @@ -1075,7 +1111,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}

/// Create a test parquet file with various data types
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
async fn make_test_file_rg(
scenario: Scenario,
row_per_group: usize,
custom_schema: Option<Arc<Schema>>,
custom_batches: Option<Vec<RecordBatch>>,
) -> NamedTempFile {
let mut output_file = tempfile::Builder::new()
.prefix("parquet_pruning")
.suffix(".parquet")
Expand All @@ -1088,8 +1129,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let batches = create_data_batch(scenario);
let schema = batches[0].schema();
let (batches, schema) =
if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) {
(batches, schema)
} else {
let batches = create_data_batch(scenario);
let schema = batches[0].schema();
(batches, schema)
};

let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();

Expand Down
Loading