Skip to content

Commit 2f507a5

Browse files
committed
update
1 parent 5b85ad6 commit 2f507a5

2 files changed

Lines changed: 3 additions & 7 deletions

File tree

src/runtime/streaming/operators/key_operator.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ use arrow::compute::{sort_to_indices, take};
2121
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
2222
use async_trait::async_trait;
2323
use datafusion_common::hash_utils::create_hashes;
24-
use datafusion_common::ScalarValue;
2524
use futures::StreamExt;
26-
use tracing::info;
27-
2825
use crate::runtime::streaming::api::context::TaskContext;
2926
use crate::runtime::streaming::api::operator::Operator;
3027
use crate::runtime::streaming::operators::StatelessPhysicalExecutor;

src/runtime/streaming/operators/windows/session_aggregating_window.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use datafusion_proto::protobuf::PhysicalPlanNode;
3131
use futures::StreamExt;
3232
use prost::Message;
3333
use std::collections::{BTreeMap, HashMap, HashSet};
34-
use tracing::info;
3534
use std::sync::{Arc, RwLock};
3635
use std::time::{Duration, SystemTime};
3736
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@@ -329,7 +328,7 @@ fn append_output_timestamp_column(
329328
DataType::Timestamp(TimeUnit::Second, tz) => {
330329
let v: Vec<i64> = session_results
331330
.iter()
332-
.map(|r| (nanos(r) / 1_000_000_000))
331+
.map(|r| nanos(r) / 1_000_000_000)
333332
.collect();
334333
columns.push(Arc::new(
335334
TimestampSecondArray::from(v).with_timezone_opt(tz.clone()),
@@ -338,7 +337,7 @@ fn append_output_timestamp_column(
338337
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
339338
let v: Vec<i64> = session_results
340339
.iter()
341-
.map(|r| (nanos(r) / 1_000_000))
340+
.map(|r| nanos(r) / 1_000_000)
342341
.collect();
343342
columns.push(Arc::new(
344343
TimestampMillisecondArray::from(v).with_timezone_opt(tz.clone()),
@@ -347,7 +346,7 @@ fn append_output_timestamp_column(
347346
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
348347
let v: Vec<i64> = session_results
349348
.iter()
350-
.map(|r| (nanos(r) / 1000))
349+
.map(|r| nanos(r) / 1000)
351350
.collect();
352351
columns.push(Arc::new(
353352
TimestampMicrosecondArray::from(v).with_timezone_opt(tz.clone()),

0 commit comments

Comments
 (0)