Skip to content

Commit 218080e

Browse files
author
Devdutt Shenoi
committed
fix: concat at once
1 parent 51d166e commit 218080e

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed

src/event/format/json.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#![allow(deprecated)]
2121

2222
use anyhow::anyhow;
23-
use arrow::compute::concat_batches;
2423
use arrow_array::RecordBatch;
2524
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2625
use arrow_schema::{DataType, Field, Fields, Schema};
@@ -281,14 +280,13 @@ impl EventFormat for Event {
281280
}
282281

283282
match partitions.get_mut(&key) {
284-
Some(PartitionEvent { rb, .. }) => {
285-
*rb = concat_batches(&schema, [rb, &batch])?;
286-
}
283+
Some(PartitionEvent { rbs, .. }) => rbs.push(batch),
287284
_ => {
288285
partitions.insert(
289286
key,
290287
PartitionEvent {
291-
rb: batch,
288+
rbs: vec![batch],
289+
schema,
292290
parsed_timestamp,
293291
custom_partition_values,
294292
},

src/event/mod.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,26 @@
1919

2020
pub mod format;
2121

22+
use arrow::compute::concat_batches;
2223
use arrow_array::RecordBatch;
23-
use arrow_schema::Field;
24+
use arrow_schema::{Field, Schema};
2425
use itertools::Itertools;
2526
use std::sync::Arc;
2627

2728
use self::error::EventError;
28-
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};
29+
use crate::{
30+
metadata::update_stats,
31+
parseable::{StagingError, Stream},
32+
storage::StreamType,
33+
};
2934
use chrono::NaiveDateTime;
3035
use std::collections::HashMap;
3136

3237
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3338

3439
pub struct PartitionEvent {
35-
pub rb: RecordBatch,
40+
pub rbs: Vec<RecordBatch>,
41+
pub schema: Arc<Schema>,
3642
pub parsed_timestamp: NaiveDateTime,
3743
pub custom_partition_values: HashMap<String, String>,
3844
}
@@ -50,14 +56,15 @@ pub struct Event {
5056
impl Event {
5157
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
5258
for (key, partition) in self.partitions {
59+
let rb =
60+
concat_batches(&partition.schema, &partition.rbs).map_err(StagingError::Arrow)?;
5361
if self.is_first_event {
54-
let schema = partition.rb.schema().as_ref().clone();
55-
stream.commit_schema(schema)?;
62+
stream.commit_schema(partition.schema.as_ref().clone())?;
5663
}
5764

5865
stream.push(
5966
&key,
60-
&partition.rb,
67+
&rb,
6168
partition.parsed_timestamp,
6269
&partition.custom_partition_values,
6370
self.stream_type,
@@ -67,20 +74,22 @@ impl Event {
6774
&stream.stream_name,
6875
self.origin_format,
6976
self.origin_size,
70-
partition.rb.num_rows(),
77+
rb.num_rows(),
7178
partition.parsed_timestamp.date(),
7279
);
7380

74-
crate::livetail::LIVETAIL.process(&stream.stream_name, &partition.rb);
81+
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
7582
}
7683
Ok(())
7784
}
7885

7986
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
8087
for (key, partition) in &self.partitions {
88+
let rb =
89+
concat_batches(&partition.schema, &partition.rbs).map_err(StagingError::Arrow)?;
8190
stream.push(
8291
key,
83-
&partition.rb,
92+
&rb,
8493
partition.parsed_timestamp,
8594
&partition.custom_partition_values,
8695
self.stream_type,

src/handlers/http/ingest.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,18 +236,20 @@ pub async fn post_event(
236236
}
237237

238238
pub async fn push_logs_unchecked(
239-
rb: RecordBatch,
239+
batch: RecordBatch,
240240
stream: &Stream,
241241
) -> Result<event::Event, PostError> {
242+
let schema = batch.schema();
242243
let unchecked_event = event::Event {
243244
origin_format: "json",
244245
origin_size: 0,
245246
time_partition: None,
246247
is_first_event: true, // NOTE: Maybe should be false
247248
partitions: [(
248-
get_schema_key(&rb.schema().fields),
249+
get_schema_key(&schema.fields),
249250
PartitionEvent {
250-
rb,
251+
rbs: vec![batch],
252+
schema,
251253
parsed_timestamp: Utc::now().naive_utc(),
252254
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
253255
},

0 commit comments

Comments
 (0)