-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Row group limit pruning #18868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Row group limit pruning #18868
Conversation
52f012f to
d075c86
Compare
| /// which row groups should be accessed | ||
| access_plan: ParquetAccessPlan, | ||
| /// which row groups are fully contained within the pruning predicate | ||
| is_fully_matched: Vec<bool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field tracks the row groups and marks if they're fully matched
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend also defining "full contained" (aka it is known for sure that ALL rows within the RowGoup pass the filter -- or conversely that no rows are filtered)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean to rename is_fully_matched as fully_contained?
|
I also like to construct some good-fit cases to show some benchmark results. |
aaf76c0 to
cb5711d
Compare
|
I plan to review this PR carefully tomorrow |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for this PR @xudong963
The idea is very cool, and a nicely done PR; I have several suggestions, but I think it is very close.
Suggestions:
- Rename the flag to
preserve_order - Make this new flag on TableScan visible in EXPLAIN plans so we have a better chance of understanding when it is being applied/not applied
- Implement an end to end test for the behavior you want to see (specifically, a test that shows a query on a parquet file and demonstrates that only a single row group is fetched with this flag, and more than one is fetched without the flag)
In my mind, 3 is the most important
datafusion/catalog/src/table.rs
Outdated
| /// | ||
| /// # Arguments | ||
| /// * `order_sensitive` - Whether the scan's limit should be order-sensitive | ||
| pub fn with_limit_order_sensitive(mut self, order_sensitive: bool) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was pretty confused by this name for awhile. After reading more of the PR I undersand now that this flag basically means that the order of the rows in the file is important for the query. When the order is not required for the query, it gives the datasource the freedom to reorder the rows if there is some way that is more efficient to apply filters / limits
I think this is a nice general concept (it doesn't only apply to limits -- it could be used to parallelize decode within a CSV reader, for example)
What do you think about renaming this field / other references to preserve_order instead of limit_order_sensitive .
The semantics would be that the scan must preserve the order of rows (basically the same practical effect of this flag now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly, I like preserve_order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done 24ae747
| ) | ||
| .map(LogicalPlan::TableScan) | ||
| .map(Transformed::yes); | ||
| let mut new_scan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry people will start to miss these fields. I wonder if we should make a TableScanBuilder or something (as a follow on PR) and deprecate TableScan::try_new to make it less likely that these fields will get forgotten
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I agree
| })), | ||
|
|
||
| LogicalPlan::Sort(mut sort) => { | ||
| let marked_input = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this optimizer is already doing a top-down traversal, I think we can avoid this call (which will rewrite all inputs a second time) and just set a flag on on the PushDownLimit structure itself and reuse the existing tree rewrite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I refactored it e65f5a3
| LogicalPlan::Sort(mut sort) => { | ||
| let marked_input = | ||
| mark_fetch_order_sensitive(Arc::unwrap_or_clone(sort.input))?; | ||
| sort.input = Arc::new(marked_input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably a conservative choice, but I think it will over eagerly mark some inputs -- for example I think it will mark this scan, even when it doesn't matter
SELECT COUNT(*) as cnt, x
FROM t
GROUP BY x
ORDER BY cnt DESC
LIMIT 10;I think if you used a flag approach, you could probably avoid this (by resetting the flag when you hit an aggregate, for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I added a test for it in the commit e65f5a3.
| for (i, &original_row_group_idx) in | ||
| fully_contained_candidates_original_idx.iter().enumerate() | ||
| { | ||
| // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very clever idea -- basically if not(predicate) would prune the predicate it means that predicate evaluated to (known) true for all rows
I tried thinking of counter examples, but I couldn't come up with any and the reasoning is sound
| use crate::expressions::{lit, BinaryExpr, Literal, NotExpr}; | ||
| use crate::PhysicalExpr; | ||
|
|
||
| /// Attempts to simplify NOT expressions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend we pull this code into its own PR for easier review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree
| type Node = Arc<dyn PhysicalExpr>; | ||
|
|
||
| fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> { | ||
| // Apply NOT expression simplification first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pull the NOT simplification into its own PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened a PR for it #18970
| &self.is_fully_matched | ||
| } | ||
|
|
||
| /// Prunes the access plan based on the limit and fully contained row groups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should explain the rationale here and leave a link to the paper -- I think you can adapt the very nice description on #18860
Basically:
- Why is this optimization important: (can reduce the number of RowGroups needed with a known limit)
- Why is it correct (the rationale on Support limit pruning #18860)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in fdef903
| /// which row groups should be accessed | ||
| access_plan: ParquetAccessPlan, | ||
| /// which row groups are fully contained within the pruning predicate | ||
| is_fully_matched: Vec<bool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend also defining "full contained" (aka it is known for sure that ALL rows within the RowGoup pass the filter -- or conversely that no rows are filtered)
|
🤖 |
|
🤖: Benchmark completed Details
|
fdef903 to
7e1fc4f
Compare
Which issue does this PR close?
Rationale for this change
See #18860 (comment)
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?
No, no new configs, or API change