Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/connectors/kafka/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error};

use crate::{
connectors::common::processor::Processor,
event::{
Event as ParseableEvent, USER_AGENT_KEY,
format::{EventFormat, LogSourceEntry, json},
},
handlers::TelemetryType,
parseable::PARSEABLE,
storage::StreamType,
};
Expand All @@ -50,13 +50,13 @@ impl ParseableSinkProcessor {
.map(|r| r.topic.as_str())
.unwrap_or_default();
let log_source_entry = LogSourceEntry::default();

PARSEABLE
.create_stream_if_not_exists(
stream_name,
StreamType::UserDefined,
None,
vec![log_source_entry],
TelemetryType::default(),
)
.await?;

Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use crate::event::error::EventError;
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
use crate::handlers::{
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
Expand Down Expand Up @@ -68,6 +70,12 @@ pub async fn ingest(
.and_then(|h| h.to_str().ok())
.map_or(LogSource::default(), LogSource::from);

let telemetry_type = req
.headers()
.get(TELEMETRY_TYPE_KEY)
.and_then(|h| h.to_str().ok())
.map_or(TelemetryType::default(), TelemetryType::from);

let extract_log = req
.headers()
.get(EXTRACT_LOG_KEY)
Expand Down Expand Up @@ -102,6 +110,7 @@ pub async fn ingest(
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
telemetry_type,
)
.await?;

Expand Down Expand Up @@ -186,6 +195,7 @@ pub async fn handle_otel_logs_ingestion(
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
TelemetryType::Logs,
)
.await?;

Expand Down Expand Up @@ -252,6 +262,7 @@ pub async fn handle_otel_metrics_ingestion(
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
TelemetryType::Metrics,
)
.await?;

Expand Down Expand Up @@ -318,6 +329,7 @@ pub async fn handle_otel_traces_ingestion(
StreamType::UserDefined,
None,
vec![log_source_entry.clone()],
TelemetryType::Traces,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ pub async fn put_stream(
body: Bytes,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

PARSEABLE
.create_update_stream(req.headers(), &body, &stream_name)
.await?;
Expand Down Expand Up @@ -377,6 +376,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
custom_partition: stream_meta.custom_partition.clone(),
static_schema_flag: stream_meta.static_schema_flag,
log_source: stream_meta.log_source.clone(),
telemetry_type: stream_meta.telemetry_type,
};

Ok((web::Json(stream_info), StatusCode::OK))
Expand Down
8 changes: 7 additions & 1 deletion src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{
event::format::LogSource,
handlers::{
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
UPDATE_STREAM_KEY,
},
storage::StreamType,
};
Expand All @@ -36,6 +37,7 @@ pub struct PutStreamHeaders {
pub update_stream_flag: bool,
pub stream_type: StreamType,
pub log_source: LogSource,
pub telemetry_type: TelemetryType,
}

impl From<&HeaderMap> for PutStreamHeaders {
Expand Down Expand Up @@ -65,6 +67,10 @@ impl From<&HeaderMap> for PutStreamHeaders {
log_source: headers
.get(LOG_SOURCE_KEY)
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
telemetry_type: headers
.get(TELEMETRY_TYPE_KEY)
.and_then(|v| v.to_str().ok())
.map_or(TelemetryType::Logs, TelemetryType::from),
}
}
}
38 changes: 38 additions & 0 deletions src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*
*/

use std::fmt::Display;

use serde::{Deserialize, Serialize};

pub mod airplane;
pub mod http;
pub mod livetail;
Expand All @@ -30,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
const USER_COOKIE_NAME: &str = "username";
Expand All @@ -39,3 +44,36 @@ const LOG_SOURCE_KINESIS: &str = "kinesis";

// AWS Kinesis constants
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TelemetryType {
#[default]
Logs,
Metrics,
Traces,
Events,
}

impl From<&str> for TelemetryType {
fn from(s: &str) -> Self {
match s.to_lowercase().as_str() {
"logs" => TelemetryType::Logs,
"metrics" => TelemetryType::Metrics,
"traces" => TelemetryType::Traces,
"events" => TelemetryType::Events,
_ => TelemetryType::Logs,
}
}
}

impl Display for TelemetryType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
TelemetryType::Logs => "logs",
TelemetryType::Metrics => "metrics",
TelemetryType::Traces => "traces",
TelemetryType::Events => "events",
})
}
}
4 changes: 4 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;

use crate::catalog::snapshot::ManifestItem;
use crate::event::format::LogSourceEntry;
use crate::handlers::TelemetryType;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
Expand Down Expand Up @@ -88,6 +89,7 @@ pub struct LogStreamMetadata {
pub hot_tier_enabled: bool,
pub stream_type: StreamType,
pub log_source: Vec<LogSourceEntry>,
pub telemetry_type: TelemetryType,
}

impl LogStreamMetadata {
Expand All @@ -102,6 +104,7 @@ impl LogStreamMetadata {
stream_type: StreamType,
schema_version: SchemaVersion,
log_source: Vec<LogSourceEntry>,
telemetry_type: TelemetryType,
) -> Self {
LogStreamMetadata {
created_at: if created_at.is_empty() {
Expand All @@ -125,6 +128,7 @@ impl LogStreamMetadata {
stream_type,
schema_version,
log_source,
telemetry_type,
..Default::default()
}
}
Expand Down
16 changes: 13 additions & 3 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
Expand All @@ -290,6 +291,7 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
Expand All @@ -304,6 +306,7 @@ async fn migrate_stream_metadata(
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
Expand All @@ -312,24 +315,29 @@ async fn migrate_stream_metadata(
Some("v4") => {
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);

storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
Some("v5") => {
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
_ => {
stream_metadata_value =
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
Some("v6") => {
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
storage
.put_object(&path, to_bytes(&stream_metadata_value))
.await?;
}
_ => {
// If the version is not recognized, we assume it's already in the latest format
return Ok(stream_metadata_value);
}
}

Ok(stream_metadata_value)
Expand All @@ -354,6 +362,7 @@ async fn setup_logstream_metadata(
hot_tier_enabled,
stream_type,
log_source,
telemetry_type,
..
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();

Expand Down Expand Up @@ -387,6 +396,7 @@ async fn setup_logstream_metadata(
hot_tier_enabled,
stream_type,
log_source,
telemetry_type,
};

Ok(metadata)
Expand Down
Loading
Loading