Skip to content

Commit 55a5026

Browse files
committed
update
1 parent 7b1f959 commit 55a5026

3 files changed

Lines changed: 136 additions & 31 deletions

File tree

src/runtime/streaming/format/deserializer.rs

Lines changed: 119 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,84 +12,176 @@
1212

1313

1414
use anyhow::{anyhow, Result};
15-
use arrow_array::builder::StringBuilder;
16-
use arrow_array::RecordBatch;
15+
use arrow_array::builder::{BinaryBuilder, StringBuilder, TimestampNanosecondBuilder};
16+
use arrow_array::{ArrayRef, RecordBatch};
1717
use arrow_json::reader::ReaderBuilder;
18-
use arrow_schema::SchemaRef;
18+
use arrow_schema::{Schema, SchemaRef};
19+
use std::collections::HashMap;
1920
use std::sync::Arc;
2021

22+
use crate::sql::common::TIMESTAMP_FIELD;
23+
2124
use super::config::{BadDataPolicy, Format};
2225

2326
pub struct DataDeserializer {
2427
format: Format,
25-
schema: SchemaRef,
28+
final_schema: SchemaRef,
29+
decoder_schema: SchemaRef,
2630
bad_data_policy: BadDataPolicy,
2731
}
2832

2933
impl DataDeserializer {
3034
pub fn new(format: Format, schema: SchemaRef, bad_data_policy: BadDataPolicy) -> Self {
35+
let decoder_schema = schema_without_timestamp(schema.as_ref());
3136
Self {
3237
format,
33-
schema,
38+
final_schema: schema,
39+
decoder_schema,
3440
bad_data_policy,
3541
}
3642
}
3743

3844
pub fn deserialize_batch(&self, messages: &[&[u8]]) -> Result<RecordBatch> {
45+
self.deserialize_batch_with_kafka_timestamps(messages, &[])
46+
}
47+
48+
pub fn deserialize_batch_with_kafka_timestamps(
49+
&self,
50+
messages: &[&[u8]],
51+
kafka_timestamps_ms: &[u64],
52+
) -> Result<RecordBatch> {
3953
match &self.format {
40-
Format::Json(_) => self.deserialize_json(messages),
41-
Format::RawString => self.deserialize_raw_string(messages),
42-
Format::RawBytes => self.deserialize_raw_bytes(messages),
54+
Format::Json(_) => self.deserialize_json(messages, kafka_timestamps_ms),
55+
Format::RawString => self.deserialize_raw_string(messages, kafka_timestamps_ms),
56+
Format::RawBytes => self.deserialize_raw_bytes(messages, kafka_timestamps_ms),
4357
}
4458
}
4559

46-
fn deserialize_json(&self, messages: &[&[u8]]) -> Result<RecordBatch> {
60+
fn deserialize_json(&self, messages: &[&[u8]], kafka_timestamps_ms: &[u64]) -> Result<RecordBatch> {
4761
let mut buffer = Vec::with_capacity(messages.len() * 256);
4862
for msg in messages {
4963
buffer.extend_from_slice(msg);
5064
buffer.push(b'\n');
5165
}
5266

5367
let allow_bad_data = self.bad_data_policy == BadDataPolicy::Drop;
54-
let mut decoder = ReaderBuilder::new(self.schema.clone())
55-
.with_strict_mode(!allow_bad_data)
68+
let mut decoder = ReaderBuilder::new(self.decoder_schema.clone())
69+
.with_strict_mode(false)
5670
.build_decoder()?;
5771

5872
decoder.decode(&buffer)?;
5973

60-
let batch = if allow_bad_data {
61-
let (batch, _mask, _, _errors) = decoder.flush_with_bad_data()?.unwrap();
62-
batch
74+
let (batch, valid_indices) = if allow_bad_data {
75+
let Some((batch, mask, _, _errors)) = decoder.flush_with_bad_data()? else {
76+
return Ok(RecordBatch::new_empty(self.final_schema.clone()));
77+
};
78+
let mut indices = Vec::with_capacity(batch.num_rows());
79+
for i in 0..mask.len() {
80+
if mask.value(i) {
81+
indices.push(i);
82+
}
83+
}
84+
(batch, indices)
6385
} else {
64-
decoder
86+
let batch = decoder
6587
.flush()?
66-
.ok_or_else(|| anyhow!("JSON decoder returned no batch"))?
88+
.unwrap_or_else(|| RecordBatch::new_empty(self.decoder_schema.clone()));
89+
let indices: Vec<usize> = (0..batch.num_rows()).collect();
90+
(batch, indices)
6791
};
6892

69-
Ok(batch)
93+
self.rebuild_with_timestamp(batch, kafka_timestamps_ms, &valid_indices)
7094
}
7195

72-
fn deserialize_raw_string(&self, messages: &[&[u8]]) -> Result<RecordBatch> {
96+
fn deserialize_raw_string(&self, messages: &[&[u8]], kafka_timestamps_ms: &[u64]) -> Result<RecordBatch> {
97+
let value_idx = self
98+
.decoder_schema
99+
.index_of("value")
100+
.map_err(|_| anyhow!("RawString format requires a 'value' column"))?;
101+
73102
let mut builder = StringBuilder::with_capacity(messages.len(), messages.len() * 64);
74103
for msg in messages {
75104
builder.append_value(String::from_utf8_lossy(msg));
76105
}
77106

78-
let array = Arc::new(builder.finish());
79-
RecordBatch::try_new(self.schema.clone(), vec![array])
80-
.map_err(|e| anyhow!("build RawString batch: {e}"))
107+
let mut columns = vec![None; self.decoder_schema.fields().len()];
108+
columns[value_idx] = Some(Arc::new(builder.finish()) as ArrayRef);
109+
let decoded_columns = columns
110+
.into_iter()
111+
.map(|c| c.ok_or_else(|| anyhow!("missing RawString decoded column")))
112+
.collect::<Result<Vec<_>>>()?;
113+
let decoded_batch = RecordBatch::try_new(self.decoder_schema.clone(), decoded_columns)
114+
.map_err(|e| anyhow!("build RawString decoded batch: {e}"))?;
115+
let valid_indices: Vec<usize> = (0..decoded_batch.num_rows()).collect();
116+
self.rebuild_with_timestamp(decoded_batch, kafka_timestamps_ms, &valid_indices)
81117
}
82118

83-
fn deserialize_raw_bytes(&self, messages: &[&[u8]]) -> Result<RecordBatch> {
84-
use arrow_array::builder::BinaryBuilder;
85-
119+
fn deserialize_raw_bytes(&self, messages: &[&[u8]], kafka_timestamps_ms: &[u64]) -> Result<RecordBatch> {
120+
let value_idx = self
121+
.decoder_schema
122+
.index_of("value")
123+
.map_err(|_| anyhow!("RawBytes format requires a 'value' column"))?;
86124
let mut builder = BinaryBuilder::with_capacity(messages.len(), messages.len() * 64);
87125
for msg in messages {
88126
builder.append_value(msg);
89127
}
90128

91-
let array = Arc::new(builder.finish());
92-
RecordBatch::try_new(self.schema.clone(), vec![array])
93-
.map_err(|e| anyhow!("build RawBytes batch: {e}"))
129+
let mut columns = vec![None; self.decoder_schema.fields().len()];
130+
columns[value_idx] = Some(Arc::new(builder.finish()) as ArrayRef);
131+
let decoded_columns = columns
132+
.into_iter()
133+
.map(|c| c.ok_or_else(|| anyhow!("missing RawBytes decoded column")))
134+
.collect::<Result<Vec<_>>>()?;
135+
let decoded_batch = RecordBatch::try_new(self.decoder_schema.clone(), decoded_columns)
136+
.map_err(|e| anyhow!("build RawBytes decoded batch: {e}"))?;
137+
let valid_indices: Vec<usize> = (0..decoded_batch.num_rows()).collect();
138+
self.rebuild_with_timestamp(decoded_batch, kafka_timestamps_ms, &valid_indices)
139+
}
140+
141+
fn rebuild_with_timestamp(
142+
&self,
143+
decoded_batch: RecordBatch,
144+
kafka_timestamps_ms: &[u64],
145+
valid_indices: &[usize],
146+
) -> Result<RecordBatch> {
147+
let mut by_name: HashMap<String, ArrayRef> = decoded_batch
148+
.schema()
149+
.fields()
150+
.iter()
151+
.zip(decoded_batch.columns().iter())
152+
.map(|(f, a)| (f.name().to_string(), a.clone()))
153+
.collect();
154+
155+
let mut ts_builder = TimestampNanosecondBuilder::with_capacity(valid_indices.len());
156+
for idx in valid_indices {
157+
let ms = kafka_timestamps_ms.get(*idx).copied().unwrap_or(0);
158+
ts_builder.append_value((ms as i64).saturating_mul(1_000_000));
159+
}
160+
let timestamp_col: ArrayRef = Arc::new(ts_builder.finish());
161+
162+
let mut columns = Vec::with_capacity(self.final_schema.fields().len());
163+
for field in self.final_schema.fields() {
164+
if field.name() == TIMESTAMP_FIELD {
165+
columns.push(timestamp_col.clone());
166+
} else {
167+
let array = by_name
168+
.remove(field.name())
169+
.ok_or_else(|| anyhow!("decoded JSON missing field '{}'", field.name()))?;
170+
columns.push(array);
171+
}
172+
}
173+
174+
RecordBatch::try_new(self.final_schema.clone(), columns)
175+
.map_err(|e| anyhow!("build JSON batch with _timestamp: {e}"))
94176
}
95177
}
178+
179+
fn schema_without_timestamp(schema: &Schema) -> SchemaRef {
180+
let fields = schema
181+
.fields()
182+
.iter()
183+
.filter(|f| f.name() != TIMESTAMP_FIELD)
184+
.cloned()
185+
.collect::<Vec<_>>();
186+
Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
187+
}

src/runtime/streaming/format/serializer.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use arrow_json::EncoderOptions;
1818
use arrow_schema::{DataType, Field, SchemaRef};
1919
use std::sync::Arc;
2020

21+
use crate::sql::common::TIMESTAMP_FIELD;
22+
2123
use super::config::{Format, JsonFormat};
2224
use super::json_encoder::CustomEncoderFactory;
2325

@@ -32,7 +34,7 @@ impl DataSerializer {
3234
.fields()
3335
.iter()
3436
.enumerate()
35-
.filter(|(_, f)| !f.name().starts_with('_'))
37+
.filter(|(_, f)| f.name() != TIMESTAMP_FIELD)
3638
.map(|(i, _)| i)
3739
.collect();
3840

src/runtime/streaming/operators/source/kafka/mod.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ pub trait BatchDeserializer: Send + 'static {
5959
pub struct BufferedDeserializer {
6060
inner: DataDeserializer,
6161
buffer: Vec<Vec<u8>>,
62+
/// Parallel to `buffer`: Kafka message timestamp (ms) per row for filling `_timestamp`.
63+
kafka_timestamps_ms: Vec<u64>,
6264
batch_size: usize,
6365
}
6466

@@ -67,6 +69,7 @@ impl BufferedDeserializer {
6769
Self {
6870
inner: DataDeserializer::new(format, schema, bad_data_policy),
6971
buffer: Vec::with_capacity(batch_size),
72+
kafka_timestamps_ms: Vec::with_capacity(batch_size),
7073
batch_size,
7174
}
7275
}
@@ -76,10 +79,11 @@ impl BatchDeserializer for BufferedDeserializer {
7679
fn deserialize_slice(
7780
&mut self,
7881
payload: &[u8],
79-
_timestamp: u64,
82+
timestamp: u64,
8083
_metadata: Option<HashMap<&str, FieldValueType<'_>>>,
8184
) -> Result<()> {
8285
self.buffer.push(payload.to_vec());
86+
self.kafka_timestamps_ms.push(timestamp);
8387
Ok(())
8488
}
8589

@@ -93,8 +97,11 @@ impl BatchDeserializer for BufferedDeserializer {
9397
}
9498

9599
let refs: Vec<&[u8]> = self.buffer.iter().map(|v| v.as_slice()).collect();
96-
let batch = self.inner.deserialize_batch(&refs)?;
100+
let batch = self
101+
.inner
102+
.deserialize_batch_with_kafka_timestamps(&refs, &self.kafka_timestamps_ms)?;
97103
self.buffer.clear();
104+
self.kafka_timestamps_ms.clear();
98105
Ok(Some(batch))
99106
}
100107

@@ -277,7 +284,11 @@ impl SourceOperator for KafkaSourceOperator {
277284
Ok(Ok(msg)) => {
278285
let partition = msg.partition();
279286
let offset = msg.offset();
280-
let timestamp = msg.timestamp().to_millis().unwrap_or(0);
287+
let timestamp = msg.timestamp().to_millis().ok_or_else(|| {
288+
anyhow!(
289+
"Failed to read timestamp from Kafka record: message has no timestamp"
290+
)
291+
})?;
281292

282293
self.current_offsets.insert(partition, offset);
283294

0 commit comments

Comments
 (0)