Skip to content

compute: intra-ts thinning for monotonic topk #16813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ once_cell = "1.16.0"
prometheus = { version = "0.13.3", default-features = false }
scopeguard = "1.1.0"
serde = { version = "1.0.152", features = ["derive"] }
smallvec = { version = "1.10.0", features = ["serde", "union"] }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tokio = { version = "1.23.0", features = ["fs", "rt", "sync", "net"] }
tracing = "0.1.37"
Expand Down
196 changes: 191 additions & 5 deletions src/compute/src/render/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//!
//! Consult [TopKPlan] documentation for details.

use std::collections::HashMap;

use differential_dataflow::hashable::Hashable;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::ArrangeBySelf;
Expand All @@ -19,6 +21,8 @@ use differential_dataflow::operators::Consolidate;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::AsCollection;
use differential_dataflow::Collection;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use timely::dataflow::Scope;

use mz_compute_client::plan::top_k::{
Expand Down Expand Up @@ -56,6 +60,33 @@ where
arity,
limit,
}) => {
let mut datum_vec = mz_repr::DatumVec::new();
let collection = ok_input.map(move |row| {
let group_row = {
let datums = datum_vec.borrow_with(&row);
let iterator = group_key.iter().map(|i| datums[*i]);
let total_size = mz_repr::datums_size(iterator.clone());
let mut group_row = Row::with_capacity(total_size);
group_row.packer().extend(iterator);
group_row
};
(group_row, row)
});

// For monotonic inputs, we are able to thin the input relation in two stages:
// 1. First, we can do an intra-timestamp thinning which has the advantage of
// being computed in a streaming fashion, even for the initial snapshot.
// 2. Then, we can do inter-timestamp thinning by feeding back negations for
// any records that have been invalidated.
let collection = if let Some(limit) = limit {
render_intra_ts_thinning(collection, order_key.clone(), limit)
} else {
collection
};

let collection =
collection.map(|(group_row, row)| ((group_row, row.hashed()), row));

// For monotonic inputs, we are able to retract inputs that can no longer be produced
// as outputs. Any inputs beyond `offset + limit` will never again be produced as
// outputs, and can be removed. The simplest form of this is when `offset == 0` and
Expand All @@ -65,17 +96,24 @@ where
// of `offset` and `limit`, discarding only the records not produced in the intermediate
// stage.
use differential_dataflow::operators::iterate::Variable;
let delay = std::time::Duration::from_nanos(10_000_000_000);
let delay = std::time::Duration::from_secs(10);
let retractions = Variable::new(
&mut ok_input.scope(),
<G::Timestamp as crate::render::RenderTimestamp>::system_delay(
delay.try_into().expect("must fit"),
),
);
let thinned = ok_input.concat(&retractions.negate());
let result = build_topk(thinned, group_key, order_key, 0, limit, arity);
retractions.set(&ok_input.concat(&result.negate()));
result
let thinned = collection.concat(&retractions.negate());

// As an additional optimization, we can skip creating the full topk hierachy
// here since we now have an upper bound on the number records due to the
// intra-ts thinning. The maximum number of records per timestamp is
// (num_workers * limit), which we expect to be a small number and so we render
// a single topk stage.
let result = build_topk_stage(thinned, order_key, 1u64, 0, limit, arity);
retractions.set(&collection.concat(&result.negate()));

result.map(|((_key, _hash), row)| row)
}
TopKPlan::Basic(BasicTopKPlan {
group_key,
Expand Down Expand Up @@ -317,6 +355,154 @@ where
// TODO(#7331): Here we discard the arranged output.
result.as_collection(|_k, v| v.clone())
}

fn render_intra_ts_thinning<G>(
collection: Collection<G, (Row, Row), Diff>,
order_key: Vec<mz_expr::ColumnOrder>,
limit: usize,
) -> Collection<G, (Row, Row), Diff>
where
G: Scope,
G::Timestamp: Lattice,
{
let mut aggregates = HashMap::new();
let mut vector = Vec::new();
collection
.inner
.unary_notify(
Pipeline,
"TopKIntraTimeThinning",
[],
move |input, output, notificator| {
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
let agg_time = aggregates
.entry(time.time().clone())
.or_insert_with(HashMap::new);
for ((grp_row, row), record_time, diff) in vector.drain(..) {
let monoid = monoids::Top1Monoid {
row,
order_key: order_key.clone(),
};
let topk = agg_time.entry((grp_row, record_time)).or_insert_with(
move || {
topk_agg::TopKBatch::new(
limit.try_into().expect("must fit"),
)
},
);
topk.update(monoid, diff);
}
notificator.notify_at(time.retain());
}

notificator.for_each(|time, _, _| {
if let Some(aggs) = aggregates.remove(time.time()) {
let mut session = output.session(&time);
for ((grp_row, record_time), topk) in aggs {
session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
((grp_row.clone(), monoid.row), record_time.clone(), diff)
}))
}
}
});
},
)
.as_collection()
}
}
}

/// Types for in-place intra-ts aggregation of monotonic streams.
pub mod topk_agg {
use differential_dataflow::consolidation;
use smallvec::SmallVec;

// TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
// that. It would be nice to find a way to reuse some or all of the code from there.
//
// Additionally, because we're calling into DD's consolidate method we are forced to work with
// the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
// for each record. It would be nice to also remove the need for cloning that piece of data
pub struct TopKBatch<T> {
updates: SmallVec<[(T, i64); 16]>,
clean: usize,
limit: i64,
}
Comment on lines +427 to +431
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a lot like a ChangeBatch; is there a specific reason it isn't that (plus the limit and a method to prune based on the limit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not an accident, I did copy the methods from ChangeBatch. The thing I couldn't do with ChangeBatch is remove the records that I don't need anymore every time compaction happens. I couldn't think of an API that would make sense to add to ChangeBatch to allow for that customization but maybe you have an idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frankmcsherry this is the only outstanding comment. I pulled in differential_dataflow::consolidation to do the consolidation but that locks us in with the Ord trait. I can put the manual consolidation back here and sort based on a shared Vec<ColumnOrder>. What do you prefer?

The Top1 plan suffers from the same problem if that makes you feel any better 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not! :D

We should either have an issue open, or a // TODO, or a fix. This seems like a fine place to eventually do the work for both cases. You needn't do it here right now, but we should open up an issue if we leave it undone!


impl<T: Ord> TopKBatch<T> {
pub fn new(limit: i64) -> Self {
Self {
updates: SmallVec::new(),
clean: 0,
limit,
}
}

/// Adds a new update, for `item` with `value`.
///
/// This could be optimized to perform compaction when the number of "dirty" elements exceeds
/// half the length of the list, which would keep the total footprint within reasonable bounds
/// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
/// is worth paying without some experimentation.
#[inline]
pub fn update(&mut self, item: T, value: i64) {
self.updates.push((item, value));
self.maintain_bounds();
}

/// Compact the internal representation.
///
/// This method sort `self.updates` and consolidates elements with equal item, discarding
/// any whose accumulation is zero. It is optimized to only do this if the number of dirty
/// elements is non-zero.
#[inline]
pub fn compact(&mut self) {
if self.clean < self.updates.len() && self.updates.len() > 1 {
let len = consolidation::consolidate_slice(&mut self.updates);
self.updates.truncate(len);

// We can now retain only the first K records and throw away everything else
let mut limit = self.limit;
self.updates.retain(|x| {
if limit > 0 {
limit -= x.1;
true
} else {
false
}
});
// By the end of the loop above `limit` will be less than or equal to zero. The
// case where it goes negative is when the last record we retained had more copies
// than necessary. For this reason we need to do one final adjustment of the diff
// field of the last record so that the total sum of the diffs in the batch is K.
if let Some(item) = self.updates.last_mut() {
// We are subtracting the limit *negated*, therefore we are subtracting a value
// that is *greater* than or equal to zero, which represents the excess.
item.1 -= -limit;
}
}
self.clean = self.updates.len();
}

/// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
/// This function tries to minimize work by only compacting if enough work has accumulated.
fn maintain_bounds(&mut self) {
// if we have more than 32 elements and at least half of them are not clean, compact
if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
self.compact()
}
}
}

impl<T: Ord> IntoIterator for TopKBatch<T> {
type Item = (T, i64);
type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;

fn into_iter(mut self) -> Self::IntoIter {
self.compact();
self.updates.into_iter()
}
}
}

Expand Down
53 changes: 53 additions & 0 deletions test/testdrive/top-k-monotonic.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Test monotonicity analyses which derive from ENVELOPE NONE sources.
# Note that these only test the implementation for monotonic sources,
# they do not test that the analysis doesn't have false positives on
# non-monotonic sources.

$ set non-dbz-schema={
"type": "record",
"name": "cpx",
"fields": [
{"name": "a", "type": "long"},
{"name": "b", "type": "long"}
]
}

$ kafka-create-topic topic=non-dbz-data

$ kafka-ingest format=avro topic=non-dbz-data schema=${non-dbz-schema} timestamp=1
{"a": 1, "b": 1}
{"a": 1, "b": 2}
{"a": 1, "b": 3}
{"a": 1, "b": 4}
{"a": 1, "b": 5}
{"a": 2, "b": 1000}
{"a": 2, "b": 1001}
{"a": 2, "b": 1002}
{"a": 2, "b": 1003}
{"a": 2, "b": 1004}

> CREATE CONNECTION kafka_conn
TO KAFKA (BROKER '${testdrive.kafka-addr}');

> CREATE SOURCE non_dbz_data
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-non-dbz-data-${testdrive.seed}')
FORMAT AVRO USING SCHEMA '${non-dbz-schema}'
ENVELOPE NONE

# Create a monotonic topk plan that has both a limit and a group to test that thinning works as expected
> SELECT * FROM (SELECT DISTINCT a FROM non_dbz_data) grp, LATERAL (SELECT b FROM non_dbz_data WHERE a = grp.a ORDER BY b LIMIT 2);
a b
---------
1 1
1 2
2 1000
2 1001