Skip to content

compute: intra-ts thinning for monotonic topk #16813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 6, 2023

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Dec 24, 2022

Motivation

This PR implements a pre-arrangement thinning of monotonic collections that are on their way to a topk computation. This thinning has the advantage of being able to be performed in a streaming fashion even for single timestamps that might contain a lot of data.

With this change a monotonic collection flowing into a top 3 operator whose snapshot is 10GB can be performed on machines with very little RAM as we will incrementally discard records that cannot possible be in the top 3 as rows flow in.

Checklist

@petrosagg petrosagg force-pushed the topk-optimization branch 2 times, most recently from 6741bfe to 8d89f0c Compare December 27, 2022 13:18
@petrosagg petrosagg changed the title compute: optimize memory usage monotonic topk plans compute: intra-ts thinning for monotonic topk Dec 27, 2022
@petrosagg petrosagg marked this pull request as ready for review January 4, 2023 16:32
Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally good, though I think there is at least an issue to fix with the finishing of the multiplicity for the last element. There is also either work to do here, or future work to do to reduce the number of Vec<ColumnOrder> allocations.

Comment on lines +394 to +431
pub struct TopKBatch<T> {
updates: SmallVec<[(T, i64); 16]>,
clean: usize,
limit: i64,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like a ChangeBatch; is there a specific reason it isn't that (plus the limit and a method to prune based on the limit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an accident, I did copy the methods from ChangeBatch. The thing I couldn't do with ChangeBatch is remove the records that I don't need anymore every time compaction happens. I couldn't think of an API that would make sense to add to ChangeBatch to allow for that customization but maybe you have an idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankmcsherry this is the only outstanding comment. I pulled in differential_dataflow::consolidation to do the consolidation but that locks us in with the Ord trait. I can put the manual consolidation back here and sort based on a shared Vec<ColumnOrder>. What do you prefer?

The Top1 plan suffers from the same problem if that makes you feel any better 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not! :D

We should either have an issue open, or a // TODO, or a fix. This seems like a fine place to eventually do the work for both cases. You needn't do it here right now, but we should open up an issue if we leave it undone!

Comment on lines 429 to 435
self.updates.sort_by(|x, y| x.0.cmp(&y.0));
for i in 0..self.updates.len() - 1 {
if self.updates[i].0 == self.updates[i + 1].0 {
self.updates[i + 1].1 += self.updates[i].1;
self.updates[i].1 = 0;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like consolidation. Should we just do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll change it! I got the code from timely which has it inline, probably because importing consolidation would produce a circular dependency with DD.

Comment on lines 437 to 445
self.updates.retain(|x| {
if limit > 0 {
limit -= x.1;
true
} else {
false
}
});
// Adjust the diff of the last record that was retained so that we have exactly K
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might end up happier if this struct owns the Vec<ColumnOrder> rather than having T own that and having T::cmp run the logic. At least, it reduces the number of allocations from "one per record" down to "one per worker".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the monotonic top1 rendering has the same problem. The easiest fix that will improve the situation is to change it to an Arc<Vec<ColumnOrder>> but that won't survive across exchanges. To go further we'd have to either not rely on pure Ord traits or do some kind of data interning.

What I wanted to experiment with is a data interning scheme where each worker deterministically builds a thread local map of (dataflow_id, item_id) -> Rc<dyn Any> so that operators can associate any piece of data that is common across dataflows at dataflow construction with a small amount of data that is what gets exchanged over the network. Has this pattern been explored before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think elsewhere we don't rely on Ord, but that seems fine. There is a mz_expr::compare_columns we use to sort things elsewhere, so anywhere it says sort we could use sort_by(that) instead.

Comment on lines 447 to 483
if let Some(item) = self.updates.last_mut() {
item.1 -= -limit;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really follow this. It looks like up above limit could go negative if records have multiplicity greater than one, in which case this will increment the count? I assume that isn't what you want, but instead to subtract some trailing counts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing, maybe I should add more comments. I am subtracting the negated limit, so if limit goes negative I'm subtracting a positive number, which is subtracting the trailing counts. I had it as item.1 += limit originally but that seemed equally confusing to me.

Copy link
Contributor

@vmarcos vmarcos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a comment regarding the lack of partitioning by group_key in each worker. IIUC, each worker should compute the top-k elements by group_key and then these will be merged appropriately by the downstream top-k stages.

Also, we should change the call to build_topk to be a single-stage call instead (with an appropriate comment) as per our discussion that after thinning, we can bound the size of each top-k group in the final stage by limit * num_workers * 10s_of_timestamps. Should this be part of this PR or a follow-up PR?

Comment on lines 337 to 362
collection: Collection<G, Row, Diff>,
order_key: Vec<mz_expr::ColumnOrder>,
limit: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that we cannot ignore the group_key here, i.e., we probably need to map in a similar style to what is done in render_top1_monotonic so that we form groups by group_key and not groups ignoring the group_key. In other words, the data structure for aggregates should have another layer partitioning the top-k by group_key. Otherwise, we will risk thinning too much, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some test queries confirming that the implementation has the problem pointed out above:

materialize=> SELECT n_nationkey, COUNT(*)
FROM (
    SELECT n_nationkey, l_orderkey, l_linenumber, l_discount
    FROM (SELECT n_nationkey FROM nation) grp,
         LATERAL (SELECT l_orderkey, l_linenumber, l_discount 
                  FROM lineitem
                  WHERE l_orderkey IN (SELECT o_orderkey
                                       FROM orders JOIN customer ON o_custkey = c_custkey
                                       WHERE c_nationkey = grp.n_nationkey)
                 )
)
GROUP BY n_nationkey;
 n_nationkey | count  
-------------+--------
           0 | 239603
           1 | 238446
           2 | 241107
           3 | 242006
           4 | 235952
           5 | 238987
           6 | 246415
           7 | 239064
           8 | 238967
           9 | 246133
          10 | 238622
          11 | 235800
          12 | 237770
          13 | 244155
          14 | 235347
          15 | 237850
          16 | 244548
          17 | 236521
          18 | 242526
          19 | 243962
          20 | 233321
          21 | 241118
          22 | 245236
          23 | 237400
          24 | 240359
(25 rows)

Time: 899.775 ms
materialize=> SELECT n_nationkey, COUNT(*)
FROM (
    SELECT n_nationkey, l_orderkey, l_linenumber, l_discount
    FROM (SELECT n_nationkey FROM nation) grp,
         LATERAL (SELECT l_orderkey, l_linenumber, l_discount 
                  FROM lineitem
                  WHERE l_orderkey IN (SELECT o_orderkey
                                       FROM orders JOIN customer ON o_custkey = c_custkey
                                       WHERE c_nationkey = grp.n_nationkey)
                  ORDER BY l_discount DESC LIMIT 10
                 )
)
GROUP BY n_nationkey;
 n_nationkey | count 
-------------+-------
           0 |    10
(1 row)

Time: 1731.416 ms (00:01.731)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! I fixed it and also added a testcase

@petrosagg petrosagg force-pushed the topk-optimization branch 3 times, most recently from f26f7b9 to d5e8ad5 Compare January 5, 2023 15:18
Copy link
Contributor

@vmarcos vmarcos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot! This looks great!

This PR implements a pre-arrangement thinning of monotonic collections
that are on their way to a topk computation. This thinning has the
advantage of being able to be performed in a streaming fashion even for
single timestamps that might contain a lot of data.

With this change a monotonic collection flowing into a top 3 operator
whose snapshot is 10GB can be performed on machines with very little RAM
as we will incrementally discard records that cannot possible be in the
top 3 as rows flow in.
@petrosagg petrosagg merged commit 03c945f into MaterializeInc:main Jan 6, 2023
@petrosagg petrosagg deleted the topk-optimization branch January 6, 2023 17:12
vmarcos added a commit that referenced this pull request Jan 11, 2023
This commit retrospectively migrates and refines an internal document into a design document for recent changes to monotonic top-k rendering introduced in PR #16813.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants