Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl EventFormat for Event {
origin_size,
is_first_event,
parsed_timestamp,
time_partition: None,
time_partitioned: time_partition.is_some(),
custom_partition_values,
stream_type,
})
Expand Down
12 changes: 7 additions & 5 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
storage::StreamType,
LOCK_EXPECT,
};
use chrono::NaiveDateTime;
use chrono::{NaiveDateTime, Utc};
use std::collections::HashMap;

pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
Expand All @@ -47,7 +47,7 @@ pub struct Event {
pub origin_size: u64,
pub is_first_event: bool,
pub parsed_timestamp: NaiveDateTime,
pub time_partition: Option<String>,
pub time_partitioned: bool,
pub custom_partition_values: HashMap<String, String>,
pub stream_type: StreamType,
}
Expand All @@ -56,12 +56,14 @@ pub struct Event {
impl Event {
pub fn process(self) -> Result<(), EventError> {
let mut key = get_schema_key(&self.rb.schema().fields);
if self.time_partition.is_some() {
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
key.push_str(&parsed_timestamp_to_min);
if self.time_partitioned {
// For time partitioned streams, concatenate timestamp to filename, ensuring we don't write to a finished arrows file
let curr_timestamp = Utc::now().format("%Y%m%dT%H%M").to_string();
key.push_str(&curr_timestamp);
}

if !self.custom_partition_values.is_empty() {
// For custom partitioned streams, concatenate values to filename, ensuring we write to different arrows files
for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) {
key.push_str(&format!("&{k}={v}"));
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ pub async fn push_logs_unchecked(
origin_format: "json",
origin_size: 0,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
time_partitioned: false,
is_first_event: true, // NOTE: Maybe should be false
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
stream_type: StreamType::UserDefined,
Expand Down
20 changes: 13 additions & 7 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,17 +557,23 @@ impl Parseable {
.await;
}

if !time_partition.is_empty() || !time_partition_limit.is_empty() {
return Err(StreamError::Custom {
msg: "Creating stream with time partition is not supported anymore".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let time_partition_in_days = if !time_partition_limit.is_empty() {
Some(validate_time_partition_limit(&time_partition_limit)?)
} else {
None
};

if let Some(custom_partition) = &custom_partition {
validate_custom_partition(custom_partition)?;
}

if !time_partition.is_empty() && custom_partition.is_some() {
return Err(StreamError::Custom {
msg: "Cannot set both time partition and custom partition".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

let schema = validate_static_schema(
body,
stream_name,
Expand All @@ -579,7 +585,7 @@ impl Parseable {
self.create_stream(
stream_name.to_string(),
&time_partition,
None,
time_partition_in_days,
custom_partition.as_ref(),
static_schema_flag,
schema,
Expand Down
Loading