Skip to content

Commit 649f04f

Browse files
committed
update
1 parent 70501bd commit 649f04f

4 files changed

Lines changed: 22 additions & 21 deletions

File tree

src/runtime/streaming/api/context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub struct TaskContext {
7878
}
7979

8080
impl TaskContext {
81+
#[allow(clippy::too_many_arguments)]
8182
pub fn new(
8283
job_id: String,
8384
pipeline_id: u32,

src/runtime/streaming/job/job_manager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ impl JobManager {
425425
.collect())
426426
}
427427

428+
#[allow(clippy::too_many_arguments)]
428429
fn build_and_spawn_pipeline(
429430
&self,
430431
job_id: String,

src/runtime/streaming/operators/grouping/incremental_aggregate.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -840,17 +840,16 @@ impl Operator for IncrementalAggregatingFunc {
840840
);
841841
}
842842

843-
if let Some(accs) = self.accumulators.get_mut(&key) {
844-
if let Some(IncrementalState::Batch {
843+
if let Some(accs) = self.accumulators.get_mut(&key)
844+
&& let Some(IncrementalState::Batch {
845845
data,
846846
changed_values,
847847
..
848848
}) = accs.get_mut(acc_idx)
849-
{
850-
let vk = Key(Arc::new(args_row.clone()));
851-
data.insert(vk.clone(), BatchData { count, generation });
852-
changed_values.insert(vk);
853-
}
849+
{
850+
let vk = Key(Arc::new(args_row.clone()));
851+
data.insert(vk.clone(), BatchData { count, generation });
852+
changed_values.insert(vk);
854853
}
855854
}
856855
info!(rows = combined.num_rows(), "Restored batch detail state.");

src/runtime/streaming/state/operator_state.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,10 @@ impl OperatorStateStore {
188188
}
189189

190190
for (table_epoch, table) in self.immutable_tables.lock().iter().rev() {
191-
if let Some(del_ep) = deleted_epoch {
192-
if *table_epoch <= del_ep {
193-
continue;
194-
}
191+
if let Some(del_ep) = deleted_epoch
192+
&& *table_epoch <= del_ep
193+
{
194+
continue;
195195
}
196196
if let Some(batches) = table.get(key) {
197197
out.extend(batches.clone());
@@ -208,10 +208,10 @@ impl OperatorStateStore {
208208
let mut acc = Vec::new();
209209
for path in paths {
210210
let file_epoch = extract_epoch(&path);
211-
if let Some(del_ep) = deleted_epoch {
212-
if file_epoch <= del_ep {
213-
continue;
214-
}
211+
if let Some(del_ep) = deleted_epoch
212+
&& file_epoch <= del_ep
213+
{
214+
continue;
215215
}
216216

217217
// Native Bloom Filter intercepts empty reads here
@@ -387,8 +387,8 @@ impl OperatorStateStore {
387387
for path in &files_to_merge {
388388
let file_epoch = extract_epoch(path);
389389
let file = File::open(path).map_err(StateEngineError::IoError)?;
390-
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
391-
while let Some(batch) = reader.next() {
390+
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
391+
for batch in reader {
392392
let b = batch?;
393393
if let Some(filtered) =
394394
filter_tombstones_from_batch(&b, &tombstone_snapshot, file_epoch)?
@@ -482,8 +482,8 @@ impl OperatorStateStore {
482482
let mut map = HashMap::new();
483483
for path in tomb_paths {
484484
let file = File::open(&path).map_err(StateEngineError::IoError)?;
485-
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
486-
while let Some(batch) = reader.next() {
485+
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?.build()?;
486+
for batch in reader {
487487
let batch = batch?;
488488
let key_col = batch
489489
.column(0)
@@ -527,9 +527,9 @@ impl OperatorStateStore {
527527
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
528528
let schema = builder.parquet_schema();
529529
let mask = ProjectionMask::leaves(schema, vec![schema.columns().len() - 1]);
530-
let mut reader = builder.with_projection(mask).build()?;
530+
let reader = builder.with_projection(mask).build()?;
531531

532-
while let Some(batch) = reader.next() {
532+
for batch in reader {
533533
let batch = batch?;
534534
let key_col = batch
535535
.column(0)

0 commit comments

Comments
 (0)