Skip to content

Commit 32bfc04

Browse files
feat: add telemetry type to a dataset (#1392)
1 parent fc29387 commit 32bfc04

File tree

13 files changed

+219
-84
lines changed

13 files changed

+219
-84
lines changed

src/connectors/kafka/processor.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@
1616
*
1717
*/
1818

19-
use async_trait::async_trait;
20-
use futures_util::StreamExt;
21-
use rdkafka::consumer::{CommitMode, Consumer};
22-
use serde_json::Value;
23-
use std::collections::HashMap;
24-
use std::sync::Arc;
25-
use tokio_stream::wrappers::ReceiverStream;
26-
use tracing::{debug, error};
27-
2819
use crate::{
2920
connectors::common::processor::Processor,
3021
event::{
3122
Event as ParseableEvent, USER_AGENT_KEY,
3223
format::{EventFormat, LogSourceEntry, json},
3324
},
25+
handlers::TelemetryType,
3426
parseable::PARSEABLE,
3527
storage::StreamType,
3628
};
29+
use async_trait::async_trait;
30+
use futures_util::StreamExt;
31+
use rdkafka::consumer::{CommitMode, Consumer};
32+
use serde_json::Value;
33+
use std::collections::HashMap;
34+
use std::sync::Arc;
35+
use tokio_stream::wrappers::ReceiverStream;
36+
use tracing::{debug, error};
3737

3838
use super::{ConsumerRecord, StreamConsumer, TopicPartition, config::BufferConfig};
3939

@@ -50,13 +50,13 @@ impl ParseableSinkProcessor {
5050
.map(|r| r.topic.as_str())
5151
.unwrap_or_default();
5252
let log_source_entry = LogSourceEntry::default();
53-
5453
PARSEABLE
5554
.create_stream_if_not_exists(
5655
stream_name,
5756
StreamType::UserDefined,
5857
None,
5958
vec![log_source_entry],
59+
TelemetryType::default(),
6060
)
6161
.await?;
6262

src/handlers/http/ingest.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ use crate::event::error::EventError;
2929
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3030
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3131
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
32-
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
32+
use crate::handlers::{
33+
EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
34+
};
3335
use crate::metadata::SchemaVersion;
3436
use crate::option::Mode;
3537
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
@@ -68,6 +70,12 @@ pub async fn ingest(
6870
.and_then(|h| h.to_str().ok())
6971
.map_or(LogSource::default(), LogSource::from);
7072

73+
let telemetry_type = req
74+
.headers()
75+
.get(TELEMETRY_TYPE_KEY)
76+
.and_then(|h| h.to_str().ok())
77+
.map_or(TelemetryType::default(), TelemetryType::from);
78+
7179
let extract_log = req
7280
.headers()
7381
.get(EXTRACT_LOG_KEY)
@@ -102,6 +110,7 @@ pub async fn ingest(
102110
StreamType::UserDefined,
103111
None,
104112
vec![log_source_entry.clone()],
113+
telemetry_type,
105114
)
106115
.await?;
107116

@@ -186,6 +195,7 @@ pub async fn handle_otel_logs_ingestion(
186195
StreamType::UserDefined,
187196
None,
188197
vec![log_source_entry.clone()],
198+
TelemetryType::Logs,
189199
)
190200
.await?;
191201

@@ -252,6 +262,7 @@ pub async fn handle_otel_metrics_ingestion(
252262
StreamType::UserDefined,
253263
None,
254264
vec![log_source_entry.clone()],
265+
TelemetryType::Metrics,
255266
)
256267
.await?;
257268

@@ -318,6 +329,7 @@ pub async fn handle_otel_traces_ingestion(
318329
StreamType::UserDefined,
319330
None,
320331
vec![log_source_entry.clone()],
332+
TelemetryType::Traces,
321333
)
322334
.await?;
323335

src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ pub async fn put_stream(
188188
body: Bytes,
189189
) -> Result<impl Responder, StreamError> {
190190
let stream_name = stream_name.into_inner();
191-
192191
PARSEABLE
193192
.create_update_stream(req.headers(), &body, &stream_name)
194193
.await?;
@@ -377,6 +376,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
377376
custom_partition: stream_meta.custom_partition.clone(),
378377
static_schema_flag: stream_meta.static_schema_flag,
379378
log_source: stream_meta.log_source.clone(),
379+
telemetry_type: stream_meta.telemetry_type,
380380
};
381381

382382
Ok((web::Json(stream_info), StatusCode::OK))

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::{
2222
event::format::LogSource,
2323
handlers::{
2424
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
25-
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
25+
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
26+
UPDATE_STREAM_KEY,
2627
},
2728
storage::StreamType,
2829
};
@@ -36,6 +37,7 @@ pub struct PutStreamHeaders {
3637
pub update_stream_flag: bool,
3738
pub stream_type: StreamType,
3839
pub log_source: LogSource,
40+
pub telemetry_type: TelemetryType,
3941
}
4042

4143
impl From<&HeaderMap> for PutStreamHeaders {
@@ -65,6 +67,10 @@ impl From<&HeaderMap> for PutStreamHeaders {
6567
log_source: headers
6668
.get(LOG_SOURCE_KEY)
6769
.map_or(LogSource::default(), |v| v.to_str().unwrap().into()),
70+
telemetry_type: headers
71+
.get(TELEMETRY_TYPE_KEY)
72+
.and_then(|v| v.to_str().ok())
73+
.map_or(TelemetryType::Logs, TelemetryType::from),
6874
}
6975
}
7076
}

src/handlers/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*
1717
*/
1818

19+
use std::fmt::Display;
20+
21+
use serde::{Deserialize, Serialize};
22+
1923
pub mod airplane;
2024
pub mod http;
2125
pub mod livetail;
@@ -30,6 +34,7 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3034
const AUTHORIZATION_KEY: &str = "authorization";
3135
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3236
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
37+
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3338
const COOKIE_AGE_DAYS: usize = 7;
3439
const SESSION_COOKIE_NAME: &str = "session";
3540
const USER_COOKIE_NAME: &str = "username";
@@ -39,3 +44,36 @@ const LOG_SOURCE_KINESIS: &str = "kinesis";
3944

4045
// AWS Kinesis constants
4146
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
47+
48+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
49+
#[serde(rename_all = "lowercase")]
50+
pub enum TelemetryType {
51+
#[default]
52+
Logs,
53+
Metrics,
54+
Traces,
55+
Events,
56+
}
57+
58+
impl From<&str> for TelemetryType {
59+
fn from(s: &str) -> Self {
60+
match s.to_lowercase().as_str() {
61+
"logs" => TelemetryType::Logs,
62+
"metrics" => TelemetryType::Metrics,
63+
"traces" => TelemetryType::Traces,
64+
"events" => TelemetryType::Events,
65+
_ => TelemetryType::Logs,
66+
}
67+
}
68+
}
69+
70+
impl Display for TelemetryType {
71+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72+
f.write_str(match self {
73+
TelemetryType::Logs => "logs",
74+
TelemetryType::Metrics => "metrics",
75+
TelemetryType::Traces => "traces",
76+
TelemetryType::Events => "events",
77+
})
78+
}
79+
}

src/metadata.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::sync::Arc;
2525

2626
use crate::catalog::snapshot::ManifestItem;
2727
use crate::event::format::LogSourceEntry;
28+
use crate::handlers::TelemetryType;
2829
use crate::metrics::{
2930
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
3031
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
@@ -88,6 +89,7 @@ pub struct LogStreamMetadata {
8889
pub hot_tier_enabled: bool,
8990
pub stream_type: StreamType,
9091
pub log_source: Vec<LogSourceEntry>,
92+
pub telemetry_type: TelemetryType,
9193
}
9294

9395
impl LogStreamMetadata {
@@ -102,6 +104,7 @@ impl LogStreamMetadata {
102104
stream_type: StreamType,
103105
schema_version: SchemaVersion,
104106
log_source: Vec<LogSourceEntry>,
107+
telemetry_type: TelemetryType,
105108
) -> Self {
106109
LogStreamMetadata {
107110
created_at: if created_at.is_empty() {
@@ -125,6 +128,7 @@ impl LogStreamMetadata {
125128
stream_type,
126129
schema_version,
127130
log_source,
131+
telemetry_type,
128132
..Default::default()
129133
}
130134
}

src/migration/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ async fn migrate_stream_metadata(
276276
stream_metadata_value = stream_metadata_migration::v1_v4(stream_metadata_value);
277277
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
278278
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
279+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
279280

280281
storage
281282
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -290,6 +291,7 @@ async fn migrate_stream_metadata(
290291
stream_metadata_value = stream_metadata_migration::v2_v4(stream_metadata_value);
291292
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
292293
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
294+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
293295

294296
storage
295297
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -304,6 +306,7 @@ async fn migrate_stream_metadata(
304306
stream_metadata_value = stream_metadata_migration::v3_v4(stream_metadata_value);
305307
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
306308
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
309+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
307310

308311
storage
309312
.put_object(&path, to_bytes(&stream_metadata_value))
@@ -312,24 +315,29 @@ async fn migrate_stream_metadata(
312315
Some("v4") => {
313316
stream_metadata_value = stream_metadata_migration::v4_v5(stream_metadata_value, stream);
314317
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
318+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
315319

316320
storage
317321
.put_object(&path, to_bytes(&stream_metadata_value))
318322
.await?;
319323
}
320324
Some("v5") => {
321325
stream_metadata_value = stream_metadata_migration::v5_v6(stream_metadata_value);
326+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
322327
storage
323328
.put_object(&path, to_bytes(&stream_metadata_value))
324329
.await?;
325330
}
326-
_ => {
327-
stream_metadata_value =
328-
stream_metadata_migration::rename_log_source_v6(stream_metadata_value);
331+
Some("v6") => {
332+
stream_metadata_value = stream_metadata_migration::v6_v7(stream_metadata_value);
329333
storage
330334
.put_object(&path, to_bytes(&stream_metadata_value))
331335
.await?;
332336
}
337+
_ => {
338+
// If the version is not recognized, we assume it's already in the latest format
339+
return Ok(stream_metadata_value);
340+
}
333341
}
334342

335343
Ok(stream_metadata_value)
@@ -354,6 +362,7 @@ async fn setup_logstream_metadata(
354362
hot_tier_enabled,
355363
stream_type,
356364
log_source,
365+
telemetry_type,
357366
..
358367
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
359368

@@ -387,6 +396,7 @@ async fn setup_logstream_metadata(
387396
hot_tier_enabled,
388397
stream_type,
389398
log_source,
399+
telemetry_type,
390400
};
391401

392402
Ok(metadata)

0 commit comments

Comments
 (0)