Skip to content

Commit 70501bd

Browse files
committed
update
1 parent d25fb45 commit 70501bd

5 files changed

Lines changed: 35 additions & 64 deletions

File tree

src/runtime/streaming/operators/grouping/incremental_aggregate.rs

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,7 @@ impl IncrementalAggregatingFunc {
466466
let state = accumulator.state().unwrap_or_else(|_| {
467467
let state = accumulator.state().unwrap();
468468
*accumulator = expr.create_sliding_accumulator().unwrap();
469-
let states: Vec<_> =
470-
state.iter().map(|s| s.to_array()).try_collect().unwrap();
469+
let states: Vec<_> = state.iter().map(|s| s.to_array()).try_collect().unwrap();
471470
accumulator.merge_batch(&states).unwrap();
472471
state
473472
});
@@ -749,19 +748,11 @@ impl Operator for IncrementalAggregatingFunc {
749748

750749
for key in active_keys {
751750
if key == KEY_SLIDING_SNAPSHOT {
752-
sliding_batches.extend(
753-
store
754-
.get_batches(&key)
755-
.await
756-
.map_err(|e| anyhow!("{e}"))?,
757-
);
751+
sliding_batches
752+
.extend(store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?);
758753
} else if key == KEY_BATCH_SNAPSHOT {
759-
batch_batches.extend(
760-
store
761-
.get_batches(&key)
762-
.await
763-
.map_err(|e| anyhow!("{e}"))?,
764-
);
754+
batch_batches
755+
.extend(store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?);
765756
}
766757
}
767758

@@ -774,8 +765,7 @@ impl Operator for IncrementalAggregatingFunc {
774765

775766
// Restore sliding (reversible) accumulator state
776767
if !sliding_batches.is_empty() {
777-
let combined =
778-
concat_batches(&self.sliding_state_schema.schema, &sliding_batches)?;
768+
let combined = concat_batches(&self.sliding_state_schema.schema, &sliding_batches)?;
779769
let key_cols: Vec<ArrayRef> = combined.columns()[0..num_keys].to_vec();
780770
let aggregate_states: Vec<Vec<ArrayRef>> = self
781771
.aggregates
@@ -807,8 +797,7 @@ impl Operator for IncrementalAggregatingFunc {
807797

808798
// Restore batch (non-reversible) detail dictionaries
809799
if !batch_batches.is_empty() {
810-
let combined =
811-
concat_batches(&self.batch_state_schema.schema, &batch_batches)?;
800+
let combined = concat_batches(&self.batch_state_schema.schema, &batch_batches)?;
812801
let key_cols: Vec<ArrayRef> = combined.columns()[0..num_keys].to_vec();
813802

814803
let acc_idx_col = combined
@@ -859,21 +848,12 @@ impl Operator for IncrementalAggregatingFunc {
859848
}) = accs.get_mut(acc_idx)
860849
{
861850
let vk = Key(Arc::new(args_row.clone()));
862-
data.insert(
863-
vk.clone(),
864-
BatchData {
865-
count,
866-
generation,
867-
},
868-
);
851+
data.insert(vk.clone(), BatchData { count, generation });
869852
changed_values.insert(vk);
870853
}
871854
}
872855
}
873-
info!(
874-
rows = combined.num_rows(),
875-
"Restored batch detail state."
876-
);
856+
info!(rows = combined.num_rows(), "Restored batch detail state.");
877857
}
878858

879859
info!(
@@ -934,8 +914,7 @@ impl Operator for IncrementalAggregatingFunc {
934914

935915
// Full snapshot of sliding (reversible) accumulator state
936916
if let Some(cols) = self.checkpoint_sliding()? {
937-
let batch =
938-
RecordBatch::try_new(self.sliding_state_schema.schema.clone(), cols)?;
917+
let batch = RecordBatch::try_new(self.sliding_state_schema.schema.clone(), cols)?;
939918
store
940919
.put(KEY_SLIDING_SNAPSHOT.to_vec(), batch)
941920
.await
@@ -944,8 +923,7 @@ impl Operator for IncrementalAggregatingFunc {
944923

945924
// Full snapshot of batch (non-reversible) detail state
946925
if let Some(cols) = self.checkpoint_batch()? {
947-
let batch =
948-
RecordBatch::try_new(self.batch_state_schema.schema.clone(), cols)?;
926+
let batch = RecordBatch::try_new(self.batch_state_schema.schema.clone(), cols)?;
949927
store
950928
.put(KEY_BATCH_SNAPSHOT.to_vec(), batch)
951929
.await
@@ -957,7 +935,10 @@ impl Operator for IncrementalAggregatingFunc {
957935
.snapshot_epoch(barrier.epoch as u64)
958936
.map_err(|e| anyhow!("Snapshot failed: {e}"))?;
959937

960-
info!(epoch = barrier.epoch, "Updating Aggregate snapshotted successfully.");
938+
info!(
939+
epoch = barrier.epoch,
940+
"Updating Aggregate snapshotted successfully."
941+
);
961942

962943
self.updated_keys.clear();
963944

src/runtime/streaming/operators/windows/session_aggregating_window.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -759,10 +759,7 @@ impl Operator for SessionWindowOperator {
759759
.insert(ts);
760760
}
761761

762-
let batches = store
763-
.get_batches(&key)
764-
.await
765-
.map_err(|e| anyhow!("{e}"))?;
762+
let batches = store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?;
766763
recovered_batches.extend(batches);
767764
}
768765

@@ -836,8 +833,7 @@ impl Operator for SessionWindowOperator {
836833
if let Some(ts_set) = self.pending_timestamps.get_mut(row_key) {
837834
for session_res in session_results {
838835
let start_nanos = to_nanos(session_res.window_start) as u64;
839-
let end_nanos =
840-
to_nanos(session_res.window_end - self.config.gap) as u64;
836+
let end_nanos = to_nanos(session_res.window_end - self.config.gap) as u64;
841837

842838
let expired_ts: Vec<u64> =
843839
ts_set.range(start_nanos..=end_nanos).copied().collect();
@@ -868,7 +864,10 @@ impl Operator for SessionWindowOperator {
868864
.snapshot_epoch(barrier.epoch as u64)
869865
.map_err(|e| anyhow!("Snapshot failed: {e}"))?;
870866

871-
info!(epoch = barrier.epoch, "Session Window Operator snapshotted state.");
867+
info!(
868+
epoch = barrier.epoch,
869+
"Session Window Operator snapshotted state."
870+
);
872871
Ok(())
873872
}
874873

src/runtime/streaming/operators/windows/sliding_aggregating_window.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -363,10 +363,7 @@ impl Operator for SlidingWindowOperator {
363363

364364
for key in active_keys {
365365
if let Some((state_type, ts_nanos)) = parse_state_key(&key) {
366-
let batches = store
367-
.get_batches(&key)
368-
.await
369-
.map_err(|e| anyhow!("{e}"))?;
366+
let batches = store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?;
370367
if batches.is_empty() {
371368
continue;
372369
}
@@ -444,7 +441,10 @@ impl Operator for SlidingWindowOperator {
444441
let partition_ranges = partition(std::slice::from_ref(&sorted_bins))?.ranges();
445442

446443
let watermark = ctx.current_watermark();
447-
let store = self.state_store.clone().expect("State store not initialized");
444+
let store = self
445+
.state_store
446+
.clone()
447+
.expect("State store not initialized");
448448

449449
for range in partition_ranges {
450450
let bin_start = from_nanos(typed_bin.value(range.start) as u128);
@@ -494,7 +494,10 @@ impl Operator for SlidingWindowOperator {
494494
return Ok(vec![]);
495495
};
496496
let watermark_bin = self.bin_start(current_time);
497-
let store = self.state_store.clone().expect("State store not initialized");
497+
let store = self
498+
.state_store
499+
.clone()
500+
.expect("State store not initialized");
498501

499502
let mut final_outputs = Vec::new();
500503

@@ -537,9 +540,7 @@ impl Operator for SlidingWindowOperator {
537540

538541
// Phase 3: tombstone raw data (Type 0) — no longer needed after partial is saved
539542
let r_key = build_state_key(STATE_TYPE_RAW, bin_start_nanos);
540-
store
541-
.remove_batches(r_key)
542-
.map_err(|e| anyhow!("{e}"))?;
543+
store.remove_batches(r_key).map_err(|e| anyhow!("{e}"))?;
543544
self.pending_raw_bins.remove(&bin_start_nanos);
544545

545546
// Phase 4: compute final sliding window result
@@ -589,9 +590,7 @@ impl Operator for SlidingWindowOperator {
589590

590591
for ts in expired_partials {
591592
let p_key = build_state_key(STATE_TYPE_PARTIAL, ts);
592-
store
593-
.remove_batches(p_key)
594-
.map_err(|e| anyhow!("{e}"))?;
593+
store.remove_batches(p_key).map_err(|e| anyhow!("{e}"))?;
595594
self.pending_partial_bins.remove(&ts);
596595
}
597596
}

src/runtime/streaming/operators/windows/tumbling_aggregating_window.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,7 @@ impl Operator for TumblingWindowOperator {
188188
if let Some(ts_nanos) = Self::extract_timestamp(&key) {
189189
let bin_start = from_nanos(ts_nanos as u128);
190190

191-
let batches = store
192-
.get_batches(&key)
193-
.await
194-
.map_err(|e| anyhow!("{e}"))?;
191+
let batches = store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?;
195192
if batches.is_empty() {
196193
continue;
197194
}

src/runtime/streaming/operators/windows/window_function.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,7 @@ impl Operator for WindowFunctionOperator {
227227
for ts in expired_ts {
228228
let key = Self::build_state_key(ts);
229229

230-
let batches = store
231-
.get_batches(&key)
232-
.await
233-
.map_err(|e| anyhow!("{e}"))?;
230+
let batches = store.get_batches(&key).await.map_err(|e| anyhow!("{e}"))?;
234231

235232
if !batches.is_empty() {
236233
let (tx, rx) = unbounded_channel();
@@ -252,9 +249,7 @@ impl Operator for WindowFunctionOperator {
252249
}
253250
}
254251

255-
store
256-
.remove_batches(key)
257-
.map_err(|e| anyhow!("{e}"))?;
252+
store.remove_batches(key).map_err(|e| anyhow!("{e}"))?;
258253
self.pending_timestamps.remove(&ts);
259254
}
260255

0 commit comments

Comments
 (0)