11
11
//!
12
12
//! Consult [TopKPlan] documentation for details.
13
13
14
+ use std:: collections:: HashMap ;
15
+
14
16
use differential_dataflow:: hashable:: Hashable ;
15
17
use differential_dataflow:: lattice:: Lattice ;
16
18
use differential_dataflow:: operators:: arrange:: ArrangeBySelf ;
@@ -19,6 +21,8 @@ use differential_dataflow::operators::Consolidate;
19
21
use differential_dataflow:: trace:: implementations:: ord:: OrdValSpine ;
20
22
use differential_dataflow:: AsCollection ;
21
23
use differential_dataflow:: Collection ;
24
+ use timely:: dataflow:: channels:: pact:: Pipeline ;
25
+ use timely:: dataflow:: operators:: Operator ;
22
26
use timely:: dataflow:: Scope ;
23
27
24
28
use mz_compute_client:: plan:: top_k:: {
56
60
arity,
57
61
limit,
58
62
} ) => {
63
+ let mut datum_vec = mz_repr:: DatumVec :: new ( ) ;
64
+ let collection = ok_input. map ( move |row| {
65
+ let group_row = {
66
+ let datums = datum_vec. borrow_with ( & row) ;
67
+ let iterator = group_key. iter ( ) . map ( |i| datums[ * i] ) ;
68
+ let total_size = mz_repr:: datums_size ( iterator. clone ( ) ) ;
69
+ let mut group_row = Row :: with_capacity ( total_size) ;
70
+ group_row. packer ( ) . extend ( iterator) ;
71
+ group_row
72
+ } ;
73
+ ( group_row, row)
74
+ } ) ;
75
+
76
+ // For monotonic inputs, we are able to thin the input relation in two stages:
77
+ // 1. First, we can do an intra-timestamp thinning which has the advantage of
78
+ // being computed in a streaming fashion, even for the initial snapshot.
79
+ // 2. Then, we can do inter-timestamp thinning by feeding back negations for
80
+ // any records that have been invalidated.
81
+ let collection = if let Some ( limit) = limit {
82
+ render_intra_ts_thinning ( collection, order_key. clone ( ) , limit)
83
+ } else {
84
+ collection
85
+ } ;
86
+
87
+ let collection =
88
+ collection. map ( |( group_row, row) | ( ( group_row, row. hashed ( ) ) , row) ) ;
89
+
59
90
// For monotonic inputs, we are able to retract inputs that can no longer be produced
60
91
// as outputs. Any inputs beyond `offset + limit` will never again be produced as
61
92
// outputs, and can be removed. The simplest form of this is when `offset == 0` and
@@ -65,17 +96,24 @@ where
65
96
// of `offset` and `limit`, discarding only the records not produced in the intermediate
66
97
// stage.
67
98
use differential_dataflow:: operators:: iterate:: Variable ;
68
- let delay = std:: time:: Duration :: from_nanos ( 10_000_000_000 ) ;
99
+ let delay = std:: time:: Duration :: from_secs ( 10 ) ;
69
100
let retractions = Variable :: new (
70
101
& mut ok_input. scope ( ) ,
71
102
<G :: Timestamp as crate :: render:: RenderTimestamp >:: system_delay (
72
103
delay. try_into ( ) . expect ( "must fit" ) ,
73
104
) ,
74
105
) ;
75
- let thinned = ok_input. concat ( & retractions. negate ( ) ) ;
76
- let result = build_topk ( thinned, group_key, order_key, 0 , limit, arity) ;
77
- retractions. set ( & ok_input. concat ( & result. negate ( ) ) ) ;
78
- result
106
+ let thinned = collection. concat ( & retractions. negate ( ) ) ;
107
+
108
+ // As an additional optimization, we can skip creating the full topk hierachy
109
+ // here since we now have an upper bound on the number records due to the
110
+ // intra-ts thinning. The maximum number of records per timestamp is
111
+ // (num_workers * limit), which we expect to be a small number and so we render
112
+ // a single topk stage.
113
+ let result = build_topk_stage ( thinned, order_key, 1u64 , 0 , limit, arity) ;
114
+ retractions. set ( & collection. concat ( & result. negate ( ) ) ) ;
115
+
116
+ result. map ( |( ( _key, _hash) , row) | row)
79
117
}
80
118
TopKPlan :: Basic ( BasicTopKPlan {
81
119
group_key,
@@ -317,6 +355,148 @@ where
317
355
// TODO(#7331): Here we discard the arranged output.
318
356
result. as_collection ( |_k, v| v. clone ( ) )
319
357
}
358
+
359
+ fn render_intra_ts_thinning < G > (
360
+ collection : Collection < G , ( Row , Row ) , Diff > ,
361
+ order_key : Vec < mz_expr:: ColumnOrder > ,
362
+ limit : usize ,
363
+ ) -> Collection < G , ( Row , Row ) , Diff >
364
+ where
365
+ G : Scope ,
366
+ G :: Timestamp : Lattice ,
367
+ {
368
+ let mut aggregates = HashMap :: new ( ) ;
369
+ let mut vector = Vec :: new ( ) ;
370
+ collection
371
+ . inner
372
+ . unary_notify (
373
+ Pipeline ,
374
+ "TopKIntraTimeThinning" ,
375
+ [ ] ,
376
+ move |input, output, notificator| {
377
+ while let Some ( ( time, data) ) = input. next ( ) {
378
+ data. swap ( & mut vector) ;
379
+ let agg_time = aggregates
380
+ . entry ( time. time ( ) . clone ( ) )
381
+ . or_insert_with ( HashMap :: new) ;
382
+ for ( ( grp_row, row) , record_time, diff) in vector. drain ( ..) {
383
+ let monoid = monoids:: Top1Monoid {
384
+ row,
385
+ order_key : order_key. clone ( ) ,
386
+ } ;
387
+ let topk = agg_time. entry ( ( grp_row, record_time) ) . or_insert_with (
388
+ move || {
389
+ topk_agg:: TopKBatch :: new (
390
+ limit. try_into ( ) . expect ( "must fit" ) ,
391
+ )
392
+ } ,
393
+ ) ;
394
+ topk. update ( monoid, diff) ;
395
+ }
396
+ notificator. notify_at ( time. retain ( ) ) ;
397
+ }
398
+
399
+ notificator. for_each ( |time, _, _| {
400
+ if let Some ( aggs) = aggregates. remove ( time. time ( ) ) {
401
+ let mut session = output. session ( & time) ;
402
+ for ( ( grp_row, record_time) , topk) in aggs {
403
+ session. give_iterator ( topk. into_iter ( ) . map ( |( monoid, diff) | {
404
+ ( ( grp_row. clone ( ) , monoid. row ) , record_time. clone ( ) , diff)
405
+ } ) )
406
+ }
407
+ }
408
+ } ) ;
409
+ } ,
410
+ )
411
+ . as_collection ( )
412
+ }
413
+ }
414
+ }
415
+
416
+ /// Types for in-place intra-ts aggregation of monotonic streams.
417
+ pub mod topk_agg {
418
+ use differential_dataflow:: consolidation;
419
+ use smallvec:: SmallVec ;
420
+
421
+ pub struct TopKBatch < T > {
422
+ updates : SmallVec < [ ( T , i64 ) ; 16 ] > ,
423
+ clean : usize ,
424
+ limit : i64 ,
425
+ }
426
+
427
+ impl < T : Ord > TopKBatch < T > {
428
+ pub fn new ( limit : i64 ) -> Self {
429
+ Self {
430
+ updates : SmallVec :: new ( ) ,
431
+ clean : 0 ,
432
+ limit,
433
+ }
434
+ }
435
+
436
+ /// Adds a new update, for `item` with `value`.
437
+ ///
438
+ /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
439
+ /// half the length of the list, which would keep the total footprint within reasonable bounds
440
+ /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
441
+ /// is worth paying without some experimentation.
442
+ #[ inline]
443
+ pub fn update ( & mut self , item : T , value : i64 ) {
444
+ self . updates . push ( ( item, value) ) ;
445
+ self . maintain_bounds ( ) ;
446
+ }
447
+
448
+ /// Compact the internal representation.
449
+ ///
450
+ /// This method sort `self.updates` and consolidates elements with equal item, discarding
451
+ /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
452
+ /// elements is non-zero.
453
+ #[ inline]
454
+ pub fn compact ( & mut self ) {
455
+ if self . clean < self . updates . len ( ) && self . updates . len ( ) > 1 {
456
+ let len = consolidation:: consolidate_slice ( & mut self . updates ) ;
457
+ self . updates . truncate ( len) ;
458
+
459
+ // We can now retain only the first K records and throw away everything else
460
+ let mut limit = self . limit ;
461
+ self . updates . retain ( |x| {
462
+ if limit > 0 {
463
+ limit -= x. 1 ;
464
+ true
465
+ } else {
466
+ false
467
+ }
468
+ } ) ;
469
+ // By the end of the loop above `limit` will be less than or equal to zero. The
470
+ // case where it goes negative is when the last record we retained had more copies
471
+ // than necessary. For this reason we need to do one final adjustment of the diff
472
+ // field of the last record so that the total sum of the diffs in the batch is K.
473
+ if let Some ( item) = self . updates . last_mut ( ) {
474
+ // We are subtracting the limit *negated*, therefore we are subtracting a value
475
+ // that is *greater* than or equal to zero, which represents the excess.
476
+ item. 1 -= -limit;
477
+ }
478
+ }
479
+ self . clean = self . updates . len ( ) ;
480
+ }
481
+
482
+ /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
483
+ /// This function tries to minimize work by only compacting if enough work has accumulated.
484
+ fn maintain_bounds ( & mut self ) {
485
+ // if we have more than 32 elements and at least half of them are not clean, compact
486
+ if self . updates . len ( ) > 32 && self . updates . len ( ) >> 1 >= self . clean {
487
+ self . compact ( )
488
+ }
489
+ }
490
+ }
491
+
492
+ impl < T : Ord > IntoIterator for TopKBatch < T > {
493
+ type Item = ( T , i64 ) ;
494
+ type IntoIter = smallvec:: IntoIter < [ ( T , i64 ) ; 16 ] > ;
495
+
496
+ fn into_iter ( mut self ) -> Self :: IntoIter {
497
+ self . compact ( ) ;
498
+ self . updates . into_iter ( )
499
+ }
320
500
}
321
501
}
322
502
0 commit comments