Skip to content

Commit 0e302c1

Browse files
committed
update
1 parent 9e42e1e commit 0e302c1

2 files changed

Lines changed: 85 additions & 28 deletions

File tree

src/runtime/streaming/operators/key_operator.rs

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
//!
13+
//! Key-by over the physical plan output: key column(s) are **values** projected by the plan
14+
//! (e.g. `_key_user_id`); **shuffle / `StreamOutput::Keyed` uses `u64` hashes** computed by
15+
//! [`datafusion_common::hash_utils::create_hashes`] on those columns — same mechanism as
16+
//! [`crate::runtime::streaming::operators::key_by::KeyByOperator`].
1417
1518
use anyhow::{anyhow, Result};
16-
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
19+
use ahash::RandomState;
1720
use arrow::compute::{sort_to_indices, take};
21+
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
1822
use async_trait::async_trait;
23+
use datafusion_common::hash_utils::create_hashes;
24+
use datafusion_common::ScalarValue;
1925
use futures::StreamExt;
26+
use tracing::info;
2027

2128
use crate::runtime::streaming::api::context::TaskContext;
2229
use crate::runtime::streaming::api::operator::Operator;
@@ -31,6 +38,7 @@ pub struct KeyExecutionOperator {
3138
name: String,
3239
executor: StatelessPhysicalExecutor,
3340
key_fields: Vec<usize>,
41+
random_state: RandomState
3442
}
3543

3644
impl KeyExecutionOperator {
@@ -39,10 +47,18 @@ impl KeyExecutionOperator {
3947
executor: StatelessPhysicalExecutor,
4048
key_fields: Vec<usize>,
4149
) -> Self {
50+
let deterministic_random_state = RandomState::with_seeds(
51+
0x1234567890ABCDEF,
52+
0x0FEDCBA987654321,
53+
0x1357924680135792,
54+
0x2468013579246801
55+
);
56+
4257
Self {
4358
name,
4459
executor,
4560
key_fields,
61+
random_state: deterministic_random_state,
4662
}
4763
}
4864
}
@@ -70,36 +86,28 @@ impl Operator for KeyExecutionOperator {
7086
continue;
7187
}
7288

73-
let mut final_hashes = vec![0u64; num_rows];
74-
75-
for &col_idx in &self.key_fields {
76-
let col = out_batch.column(col_idx);
77-
let int64_array = col
78-
.as_any()
79-
.downcast_ref::<arrow_array::Int64Array>()
80-
.ok_or_else(|| anyhow!("Column at index {} must be Int64Array", col_idx))?;
81-
82-
for i in 0..num_rows {
83-
let val = int64_array.value(i) as u64;
84-
if self.key_fields.len() == 1 {
85-
final_hashes[i] = val;
86-
} else {
87-
final_hashes[i] ^= val;
88-
}
89-
}
90-
}
89+
let key_arrays: Vec<ArrayRef> = self
90+
.key_fields
91+
.iter()
92+
.map(|&i| out_batch.column(i).clone())
93+
.collect();
94+
95+
let mut hash_buffer = vec![0u64; num_rows];
96+
create_hashes(&key_arrays, &self.random_state, &mut hash_buffer)
97+
.map_err(|e| anyhow!("KeyExecution failed to hash columns: {e}"))?;
98+
99+
let hash_array = UInt64Array::from(hash_buffer);
91100

92-
let hash_array = UInt64Array::from(final_hashes);
93101
let sorted_indices = sort_to_indices(&hash_array, None, None)
94-
.map_err(|e| anyhow!("Failed to sort by key: {e}"))?;
102+
.map_err(|e| anyhow!("Failed to sort by hash: {e}"))?;
95103

96104
let sorted_hashes_ref = take(&hash_array, &sorted_indices, None)?;
97105
let sorted_hashes = sorted_hashes_ref
98106
.as_any()
99107
.downcast_ref::<UInt64Array>()
100108
.unwrap();
101109

102-
let sorted_columns: std::result::Result<Vec<ArrayRef>, _> = out_batch
110+
let sorted_columns: Result<Vec<ArrayRef>, _> = out_batch
103111
.columns()
104112
.iter()
105113
.map(|col| take(col, &sorted_indices, None))
@@ -116,13 +124,11 @@ impl Operator for KeyExecutionOperator {
116124
}
117125

118126
let sub_batch = sorted_batch.slice(start_idx, end_idx - start_idx);
119-
120127
outputs.push(StreamOutput::Keyed(current_hash, sub_batch));
121128

122129
start_idx = end_idx;
123130
}
124131
}
125-
126132
Ok(outputs)
127133
}
128134

@@ -146,4 +152,3 @@ impl Operator for KeyExecutionOperator {
146152
Ok(vec![])
147153
}
148154
}
149-

src/runtime/streaming/operators/windows/session_aggregating_window.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ use arrow::compute::{
1818
use arrow::row::{RowConverter, SortField};
1919
use arrow_array::types::TimestampNanosecondType;
2020
use arrow_array::{
21-
Array, BooleanArray, PrimitiveArray, RecordBatch, StructArray, TimestampNanosecondArray,
21+
Array, ArrayRef, BooleanArray, PrimitiveArray, RecordBatch, StructArray,
22+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
2223
};
23-
use arrow_schema::{DataType, Field, FieldRef, Schema};
24+
use arrow_schema::{DataType, Field, FieldRef, Schema, TimeUnit};
2425
use datafusion::execution::context::SessionContext;
2526
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
2627
use datafusion::execution::SendableRecordBatchStream;
@@ -30,6 +31,7 @@ use datafusion_proto::protobuf::PhysicalPlanNode;
3031
use futures::StreamExt;
3132
use prost::Message;
3233
use std::collections::{BTreeMap, HashMap, HashSet};
34+
use tracing::info;
3335
use std::sync::{Arc, RwLock};
3436
use std::time::{Duration, SystemTime};
3537
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@@ -315,6 +317,53 @@ fn start_time_for_sorted_batch(batch: &RecordBatch, schema: &FsSchema) -> System
315317
from_nanos(timestamp_array.value(0) as u128)
316318
}
317319

320+
/// Appends the stream `_timestamp` column (see [`build_session_output_schema`]) using each
321+
/// session's `window_end` as the row event time.
322+
fn append_output_timestamp_column(
323+
columns: &mut Vec<ArrayRef>,
324+
session_results: &[SessionWindowResult],
325+
ts_field: &Field,
326+
) -> Result<()> {
327+
let nanos = |r: &SessionWindowResult| to_nanos(r.window_end)as i64 - 1;
328+
match ts_field.data_type() {
329+
DataType::Timestamp(TimeUnit::Second, tz) => {
330+
let v: Vec<i64> = session_results
331+
.iter()
332+
.map(|r| (nanos(r) / 1_000_000_000))
333+
.collect();
334+
columns.push(Arc::new(
335+
TimestampSecondArray::from(v).with_timezone_opt(tz.clone()),
336+
));
337+
}
338+
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
339+
let v: Vec<i64> = session_results
340+
.iter()
341+
.map(|r| (nanos(r) / 1_000_000))
342+
.collect();
343+
columns.push(Arc::new(
344+
TimestampMillisecondArray::from(v).with_timezone_opt(tz.clone()),
345+
));
346+
}
347+
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
348+
let v: Vec<i64> = session_results
349+
.iter()
350+
.map(|r| (nanos(r) / 1000))
351+
.collect();
352+
columns.push(Arc::new(
353+
TimestampMicrosecondArray::from(v).with_timezone_opt(tz.clone()),
354+
));
355+
}
356+
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
357+
let v: Vec<i64> = session_results.iter().map(|r| nanos(r)).collect();
358+
columns.push(Arc::new(
359+
TimestampNanosecondArray::from(v).with_timezone_opt(tz.clone()),
360+
));
361+
}
362+
dt => bail!("unsupported timestamp type for session window output: {dt}"),
363+
}
364+
Ok(())
365+
}
366+
318367
fn build_session_output_schema(
319368
input: &FsSchema,
320369
window_field: FieldRef,
@@ -590,6 +639,9 @@ impl SessionWindowOperator {
590639
columns.insert(self.config.window_index, Arc::new(window_struct_array));
591640
columns.extend_from_slice(merged_batch.columns());
592641

642+
let ts_field = self.config.input_schema_ref.schema.field(self.config.input_schema_ref.timestamp_index);
643+
append_output_timestamp_column(&mut columns, &session_results, ts_field)?;
644+
593645
RecordBatch::try_new(self.config.output_schema.clone(), columns)
594646
.context("failed to create session window output batch")
595647
}

0 commit comments

Comments
 (0)