Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V0,
&crate::event::format::LogSource::default()
)
Expand Down Expand Up @@ -709,7 +709,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V0,
&crate::event::format::LogSource::default(),
)
Expand Down Expand Up @@ -797,7 +797,7 @@ mod tests {
json,
None,
None,
None,
&[],
SchemaVersion::V1,
&crate::event::format::LogSource::default(),
)
Expand Down
16 changes: 10 additions & 6 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
time_partition_limit: stream_meta
.time_partition_limit
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
custom_partition: stream_meta.custom_partitions.clone(),
static_schema_flag: stream_meta.static_schema_flag,
log_source: stream_meta.log_source.clone(),
};
Expand Down Expand Up @@ -485,6 +485,7 @@ pub mod error {
use http::StatusCode;

use crate::{
handlers::http::modal::utils::logstream_utils::HeaderParseError,
hottier::HotTierError,
parseable::StreamNotFound,
storage::ObjectStorageError,
Expand Down Expand Up @@ -554,6 +555,8 @@ pub mod error {
HotTierValidation(#[from] HotTierValidationError),
#[error("{0}")]
HotTierError(#[from] HotTierError),
#[error("Error when parsing headers: {0}")]
HeaderParsing(#[from] HeaderParseError),
}

impl actix_web::ResponseError for StreamError {
Expand Down Expand Up @@ -589,6 +592,7 @@ pub mod error {
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
StreamError::HeaderParsing(_) => StatusCode::BAD_REQUEST,
}
}

Expand Down Expand Up @@ -626,7 +630,7 @@ mod tests {
#[actix_web::test]
async fn header_without_log_source() {
let req = TestRequest::default().to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Json);
}

Expand All @@ -635,19 +639,19 @@ mod tests {
let mut req = TestRequest::default()
.insert_header(("X-P-Log-Source", "pmeta"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Pmeta);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "otel-logs"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);

req = TestRequest::default()
.insert_header(("X-P-Log-Source", "kinesis"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
}

Expand All @@ -656,7 +660,7 @@ mod tests {
let req = TestRequest::default()
.insert_header(("X-P-Log-Source", "teststream"))
.to_http_request();
let PutStreamHeaders { log_source, .. } = req.headers().into();
let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap();
assert_eq!(log_source, crate::event::format::LogSource::Json);
}
}
19 changes: 6 additions & 13 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

use arrow_schema::Field;
use chrono::{DateTime, NaiveDateTime, Utc};
use itertools::Itertools;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -72,15 +71,15 @@ pub async fn push_logs(
.get_stream(stream_name)?
.get_time_partition_limit();
let static_schema_flag = stream.get_static_schema_flag();
let custom_partition = stream.get_custom_partition();
let custom_partitions = stream.get_custom_partitions();
let schema_version = stream.get_schema_version();

let data = if time_partition.is_some() || custom_partition.is_some() {
let data = if time_partition.is_some() || !custom_partitions.is_empty() {
convert_array_to_object(
json,
time_partition.as_ref(),
time_partition_limit,
custom_partition.as_ref(),
&custom_partitions,
schema_version,
log_source,
)?
Expand All @@ -89,7 +88,7 @@ pub async fn push_logs(
json,
None,
None,
None,
&[],
schema_version,
log_source,
)?)?]
Expand All @@ -101,13 +100,7 @@ pub async fn push_logs(
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
_ => Utc::now().naive_utc(),
};
let custom_partition_values = match custom_partition.as_ref() {
Some(custom_partition) => {
let custom_partitions = custom_partition.split(',').collect_vec();
get_custom_partition_values(&value, &custom_partitions)
}
None => HashMap::new(),
};
let custom_partition_values = get_custom_partition_values(&value, &custom_partitions);
let schema = PARSEABLE
.streams
.read()
Expand Down Expand Up @@ -162,7 +155,7 @@ pub fn into_event_batch(

pub fn get_custom_partition_values(
json: &Value,
custom_partition_list: &[&str],
custom_partition_list: &[String],
) -> HashMap<String, String> {
let mut custom_partition_values: HashMap<String, String> = HashMap::new();
for custom_partition_field in custom_partition_list {
Expand Down
94 changes: 76 additions & 18 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::num::NonZeroU32;

use actix_web::http::header::HeaderMap;

use crate::{
Expand All @@ -27,31 +29,57 @@ use crate::{
storage::StreamType,
};

/// Name of a field that appears within a data stream
pub type FieldName = String;

/// Name of the field used as a custom partition
pub type CustomPartition = String;

#[derive(Debug, thiserror::Error)]
pub enum HeaderParseError {
#[error("Maximum 3 custom partition keys are supported")]
TooManyPartitions,
#[error("Missing 'd' suffix for duration value")]
UnsupportedUnit,
#[error("Could not convert duration to an unsigned number")]
ZeroOrNegative,
}

#[derive(Debug, Default)]
pub struct PutStreamHeaders {
pub time_partition: String,
pub time_partition_limit: String,
pub custom_partition: Option<String>,
pub time_partition: Option<FieldName>,
pub time_partition_limit: Option<NonZeroU32>,
pub custom_partitions: Vec<String>,
pub static_schema_flag: bool,
pub update_stream_flag: bool,
pub stream_type: StreamType,
pub log_source: LogSource,
}

impl From<&HeaderMap> for PutStreamHeaders {
fn from(headers: &HeaderMap) -> Self {
PutStreamHeaders {
time_partition: headers
.get(TIME_PARTITION_KEY)
.map_or("", |v| v.to_str().unwrap())
.to_string(),
time_partition_limit: headers
.get(TIME_PARTITION_LIMIT_KEY)
.map_or("", |v| v.to_str().unwrap())
.to_string(),
custom_partition: headers
.get(CUSTOM_PARTITION_KEY)
.map(|v| v.to_str().unwrap().to_string()),
impl TryFrom<&HeaderMap> for PutStreamHeaders {
type Error = HeaderParseError;

fn try_from(headers: &HeaderMap) -> Result<Self, Self::Error> {
let time_partition = headers
.get(TIME_PARTITION_KEY)
.map(|v| v.to_str().unwrap().to_owned());
let time_partition_limit = match headers
.get(TIME_PARTITION_LIMIT_KEY)
.map(|v| v.to_str().unwrap())
{
Some(limit) => Some(parse_time_partition_limit(limit)?),
None => None,
};
let custom_partition = headers
.get(CUSTOM_PARTITION_KEY)
.map(|v| v.to_str().unwrap())
.unwrap_or_default();
let custom_partitions = parse_custom_partition(custom_partition)?;

let headers = PutStreamHeaders {
time_partition,
time_partition_limit,
custom_partitions,
static_schema_flag: headers
.get(STATIC_SCHEMA_FLAG)
.is_some_and(|v| v.to_str().unwrap() == "true"),
Expand All @@ -65,6 +93,36 @@ impl From<&HeaderMap> for PutStreamHeaders {
log_source: headers
.get(LOG_SOURCE_KEY)
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
}
};

Ok(headers)
}
}

pub fn parse_custom_partition(
custom_partition: &str,
) -> Result<Vec<CustomPartition>, HeaderParseError> {
let custom_partition_list = custom_partition
.split(',')
.map(String::from)
.collect::<Vec<String>>();
if custom_partition_list.len() > 3 {
return Err(HeaderParseError::TooManyPartitions);
}

Ok(custom_partition_list)
}

pub fn parse_time_partition_limit(
time_partition_limit: &str,
) -> Result<NonZeroU32, HeaderParseError> {
if !time_partition_limit.ends_with('d') {
return Err(HeaderParseError::UnsupportedUnit);
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
let Ok(days) = days.parse::<NonZeroU32>() else {
return Err(HeaderParseError::ZeroOrNegative);
};

Ok(days)
}
14 changes: 5 additions & 9 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct LogStreamMetadata {
pub first_event_at: Option<String>,
pub time_partition: Option<String>,
pub time_partition_limit: Option<NonZeroU32>,
pub custom_partition: Option<String>,
pub custom_partitions: Vec<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub stream_type: StreamType,
Expand All @@ -94,9 +94,9 @@ impl LogStreamMetadata {
#[allow(clippy::too_many_arguments)]
pub fn new(
created_at: String,
time_partition: String,
time_partition: Option<String>,
time_partition_limit: Option<NonZeroU32>,
custom_partition: Option<String>,
custom_partitions: Vec<String>,
static_schema_flag: bool,
static_schema: HashMap<String, Arc<Field>>,
stream_type: StreamType,
Expand All @@ -109,13 +109,9 @@ impl LogStreamMetadata {
} else {
created_at
},
time_partition: if time_partition.is_empty() {
None
} else {
Some(time_partition)
},
time_partition,
time_partition_limit,
custom_partition,
custom_partitions,
static_schema_flag,
schema: if static_schema.is_empty() {
HashMap::new()
Expand Down
4 changes: 2 additions & 2 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async fn migration_stream(
stats,
time_partition,
time_partition_limit,
custom_partition,
custom_partitions,
static_schema_flag,
hot_tier_enabled,
stream_type,
Expand Down Expand Up @@ -307,7 +307,7 @@ async fn migration_stream(
first_event_at,
time_partition,
time_partition_limit: time_partition_limit.and_then(|limit| limit.parse().ok()),
custom_partition,
custom_partitions,
static_schema_flag,
hot_tier_enabled,
stream_type,
Expand Down
Loading
Loading