From a016af72ccf3dab56719858ef6b0d07cfccc8749 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 01:51:12 +0530 Subject: [PATCH 01/15] refactor: simplify query param extraction --- src/handlers/http/logstream.rs | 49 ++++++++++++++++------------------ 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index e38f42db7..e02f5b3f8 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -41,14 +41,15 @@ use crate::{event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; -use actix_web::web::{Json, Path}; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::web::{Json, Path, Query}; +use actix_web::{web, HttpRequest, HttpResponse, Responder}; use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; use itertools::Itertools; +use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -261,10 +262,15 @@ pub async fn get_stats_date(stream_name: &str, date: &str) -> Result, +} + pub async fn get_stats( - req: HttpRequest, stream_name: Path, -) -> Result { + Query(StatsParams { date }): Query, +) -> Result { let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { @@ -281,23 +287,9 @@ pub async fn get_stats( } } - let query_string = req.query_string(); - if !query_string.is_empty() { - let tokens = query_string.split('=').collect::>(); - let date_key = tokens[0]; - let date_value = tokens[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !date_value.is_empty() { - let stats = get_stats_date(&stream_name, date_value).await?; - let stats = serde_json::to_value(stats)?; - return Ok((web::Json(stats), StatusCode::OK)); - } + if let Some(date) = date { + let stats = get_stats_date(&stream_name, &date).await?; + return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } let stats = stats::get_current_stats(&stream_name, "json") @@ -362,7 +354,7 @@ pub async fn get_stats( let stats = serde_json::to_value(stats)?; - Ok((web::Json(stats), StatusCode::OK)) + Ok(HttpResponse::build(StatusCode::OK).json(stats)) } #[allow(clippy::too_many_arguments)] @@ -767,9 +759,11 @@ mod tests { use crate::handlers::http::logstream::get_stats; use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; - use actix_web::web; + use actix_web::web::{self, Query}; use anyhow::bail; + use super::StatsParams; + // TODO: Fix this test with routes // #[actix_web::test] // #[should_panic] @@ -780,9 +774,12 @@ mod tests { #[actix_web::test] async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - let req = TestRequest::default().to_http_request(); - - match get_stats(req, web::Path::from("test".to_string())).await { + match get_stats( + web::Path::from("test".to_string()), + Query(StatsParams { date: None }), + ) + .await + { Err(StreamError::StreamNotFound(_)) => Ok(()), _ => bail!("expected StreamNotFound error"), } From 28764cd8c6407ce1e6bb42975b0b86a1398681f4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 15:22:33 +0530 Subject: [PATCH 02/15] refactor: stats params in query API --- src/handlers/http/logstream.rs | 71 +++++++++---------- .../http/modal/query/querier_logstream.rs | 47 +++--------- 2 files changed, 43 insertions(+), 75 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index e02f5b3f8..0cf2da56d 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -238,38 +238,40 @@ pub async fn put_retention( )) } -pub async fn get_stats_date(stream_name: &str, date: &str) -> Result { - let event_labels = event_labels_date(stream_name, "json", date); - let storage_size_labels = storage_size_labels_date(stream_name, date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - - let stats = Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }; - Ok(stats) -} - #[derive(Debug, Deserialize)] pub struct StatsParams { - date: Option, + date: String, +} + +impl StatsParams { + pub async fn get_stats(&self, stream_name: &str) -> Result { + let event_labels = event_labels_date(stream_name, "json", &self.date); + let storage_size_labels = storage_size_labels_date(stream_name, &self.date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + + let stats = Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }; + Ok(stats) + } } pub async fn get_stats( stream_name: Path, - Query(StatsParams { date }): Query, + params: Option>, ) -> Result { let stream_name = stream_name.into_inner(); @@ -287,8 +289,8 @@ pub async fn get_stats( } } - if let Some(date) = date { - let stats = get_stats_date(&stream_name, &date).await?; + if let Some(params) = params { + let stats = params.get_stats(&stream_name).await?; return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } @@ -352,8 +354,6 @@ pub async fn get_stats( stats }; - let stats = serde_json::to_value(stats)?; - Ok(HttpResponse::build(StatusCode::OK).json(stats)) } @@ -759,11 +759,9 @@ mod tests { use crate::handlers::http::logstream::get_stats; use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; use actix_web::test::TestRequest; - use actix_web::web::{self, Query}; + use actix_web::web; use anyhow::bail; - use super::StatsParams; - // TODO: Fix this test with routes // #[actix_web::test] // #[should_panic] @@ -774,12 +772,7 @@ mod tests { #[actix_web::test] async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - match get_stats( - web::Path::from("test".to_string()), - Query(StatsParams { date: None }), - ) - .await - { + match get_stats(web::Path::from("test".to_string()), None).await { Err(StreamError::StreamNotFound(_)) => Ok(()), _ => bail!("expected StreamNotFound error"), } diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 0dcb5719b..520176368 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -15,13 +15,11 @@ * along with this program. If not, see . * */ - -use core::str; use std::fs; use actix_web::{ - web::{self, Path}, - HttpRequest, Responder, + web::{Path, Query}, + HttpRequest, HttpResponse, Responder, }; use bytes::Bytes; use chrono::Utc; @@ -36,11 +34,10 @@ use crate::{ handlers::http::{ base_path_without_preceding_slash, cluster::{ - self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_streams_with_ingestors, + self, fetch_stats_from_ingestors, sync_streams_with_ingestors, utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, - logstream::{error::StreamError, get_stats_date}, + logstream::{error::StreamError, StatsParams}, modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, }, @@ -48,7 +45,7 @@ use crate::{ hottier::HotTierManager, metadata::{self, STREAM_INFO}, option::CONFIG, - stats::{self, Stats}, + stats, storage::{StorageDir, StreamType}, }; @@ -124,9 +121,9 @@ pub async fn put_stream( } pub async fn get_stats( - req: HttpRequest, stream_name: Path, -) -> Result { + params: Option>, +) -> Result { let stream_name = stream_name.into_inner(); // if the stream not found in memory map, //check if it exists in the storage @@ -139,29 +136,9 @@ pub async fn get_stats( return Err(StreamError::StreamNotFound(stream_name.clone())); } - let query_string = req.query_string(); - if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !date_value.is_empty() { - let querier_stats = get_stats_date(&stream_name, date_value).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&stream_name, date_value).await?; - let total_stats = Stats { - events: querier_stats.events + ingestor_stats.events, - ingestion: querier_stats.ingestion + ingestor_stats.ingestion, - storage: querier_stats.storage + ingestor_stats.storage, - }; - let stats = serde_json::to_value(total_stats)?; - - return Ok((web::Json(stats), StatusCode::OK)); - } + if let Some(params) = params { + let stats = params.get_stats(&stream_name).await?; + return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } let stats = stats::get_current_stats(&stream_name, "json") @@ -231,7 +208,5 @@ pub async fn get_stats( stats }; - let stats = serde_json::to_value(stats)?; - - Ok((web::Json(stats), StatusCode::OK)) + Ok(HttpResponse::build(StatusCode::OK).json(stats)) } From 3703343a914ee49bf9d630a2873945a3ec6f93de Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 15:32:25 +0530 Subject: [PATCH 03/15] DRY `get_stats` --- src/handlers/http/logstream.rs | 17 ++- .../http/modal/query/querier_logstream.rs | 108 +----------------- src/handlers/http/modal/query_server.rs | 2 +- 3 files changed, 18 insertions(+), 109 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 0cf2da56d..68436d242 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -18,7 +18,7 @@ use self::error::{CreateStreamError, StreamError}; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; +use super::cluster::{fetch_stats_from_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, @@ -276,9 +276,9 @@ pub async fn get_stats( let stream_name = stream_name.into_inner(); if !STREAM_INFO.stream_exists(&stream_name) { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage + // Only for query mode, if the stream not found in memory map, + // check if it exists in the storage + // create stream and schema from storage if cfg!(not(test)) && CONFIG.options.mode == Mode::Query { match create_stream_and_schema_from_storage(&stream_name).await { Ok(true) => {} @@ -297,7 +297,14 @@ pub async fn get_stats( let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats: Option> = None; + let ingestor_stats = if CONFIG.options.mode == Mode::Query && STREAM_INFO + .stream_type(&stream_name) + .is_ok_and(|t| t == StreamType::Internal) + { + Some(fetch_stats_from_ingestors(&stream_name).await?) + } else { + None + }; let hash_map = STREAM_INFO.read().expect("Readable"); let stream_meta = &hash_map diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 520176368..150cddae9 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -17,12 +17,8 @@ */ use std::fs; -use actix_web::{ - web::{Path, Query}, - HttpRequest, HttpResponse, Responder, -}; +use actix_web::{web::Path, HttpRequest, Responder}; use bytes::Bytes; -use chrono::Utc; use http::StatusCode; use tokio::sync::Mutex; use tracing::{error, warn}; @@ -33,20 +29,17 @@ use crate::{ event, handlers::http::{ base_path_without_preceding_slash, - cluster::{ - self, fetch_stats_from_ingestors, sync_streams_with_ingestors, - utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, - }, - logstream::{error::StreamError, StatsParams}, + cluster::{self, sync_streams_with_ingestors}, + logstream::error::StreamError, modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, }, }, hottier::HotTierManager, - metadata::{self, STREAM_INFO}, + metadata, option::CONFIG, stats, - storage::{StorageDir, StreamType}, + storage::StorageDir, }; pub async fn delete(stream_name: Path) -> Result { @@ -119,94 +112,3 @@ pub async fn put_stream( Ok(("Log stream created", StatusCode::OK)) } - -pub async fn get_stats( - stream_name: Path, - params: Option>, -) -> Result { - let stream_name = stream_name.into_inner(); - // if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if !metadata::STREAM_INFO.stream_exists(&stream_name) - && !create_stream_and_schema_from_storage(&stream_name) - .await - .unwrap_or(false) - { - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - - if let Some(params) = params { - let stats = params.get_stats(&stream_name).await?; - return Ok(HttpResponse::build(StatusCode::OK).json(stats)); - } - - let stats = stats::get_current_stats(&stream_name, "json") - .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - - let ingestor_stats = if STREAM_INFO - .stream_type(&stream_name) - .is_ok_and(|t| t == StreamType::Internal) - { - Some(fetch_stats_from_ingestors(&stream_name).await?) - } else { - None - }; - - let hash_map = STREAM_INFO.read().expect("Readable"); - let stream_meta = &hash_map - .get(&stream_name) - .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - - let time = Utc::now(); - - let stats = match &stream_meta.first_event_at { - Some(_) => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } - - None => { - let ingestion_stats = IngestionStats::new( - stats.current_stats.events, - format!("{} {}", stats.current_stats.ingestion, "Bytes"), - stats.lifetime_stats.events, - format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), - stats.deleted_stats.events, - format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), - "json", - ); - let storage_stats = StorageStats::new( - format!("{} {}", stats.current_stats.storage, "Bytes"), - format!("{} {}", stats.lifetime_stats.storage, "Bytes"), - format!("{} {}", stats.deleted_stats.storage, "Bytes"), - "parquet", - ); - - QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) - } - }; - let stats = if let Some(mut ingestor_stats) = ingestor_stats { - ingestor_stats.push(stats); - merge_quried_stats(ingestor_stats) - } else { - stats - }; - - Ok(HttpResponse::build(StatusCode::OK).json(stats)) -} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 4c77e226c..248577867 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -309,7 +309,7 @@ impl QueryServer { // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream web::resource("/stats").route( web::get() - .to(querier_logstream::get_stats) + .to(logstream::get_stats) .authorize_for_stream(Action::GetStats), ), ) From dad51d2ef743b95a71ec1efd550391371f27fd00 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 16:40:10 +0530 Subject: [PATCH 04/15] semantic locality --- src/handlers/http/logstream.rs | 50 +++++++--------------------------- src/stats.rs | 31 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 68436d242..2fae2b3f3 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -18,7 +18,9 @@ use self::error::{CreateStreamError, StreamError}; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{fetch_stats_from_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; +use super::cluster::{ + fetch_stats_from_ingestors, sync_streams_with_ingestors, INTERNAL_STREAM_NAME, +}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, @@ -28,11 +30,10 @@ use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::{SchemaVersion, STREAM_INFO}; -use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; use crate::option::{Mode, CONFIG}; use crate::rbac::role::Action; use crate::rbac::Users; -use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; +use crate::stats::StatsParams; use crate::storage::{retention::Retention, StorageDir}; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; @@ -49,7 +50,6 @@ use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; use itertools::Itertools; -use serde::Deserialize; use serde_json::Value; use std::collections::HashMap; use std::fs; @@ -238,37 +238,6 @@ pub async fn put_retention( )) } -#[derive(Debug, Deserialize)] -pub struct StatsParams { - date: String, -} - -impl StatsParams { - pub async fn get_stats(&self, stream_name: &str) -> Result { - let event_labels = event_labels_date(stream_name, "json", &self.date); - let storage_size_labels = storage_size_labels_date(stream_name, &self.date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - - let stats = Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }; - Ok(stats) - } -} - pub async fn get_stats( stream_name: Path, params: Option>, @@ -289,17 +258,18 @@ pub async fn get_stats( } } - if let Some(params) = params { - let stats = params.get_stats(&stream_name).await?; + if let Some(Query(params)) = params { + let stats = params.get_stats(&stream_name); return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if CONFIG.options.mode == Mode::Query && STREAM_INFO - .stream_type(&stream_name) - .is_ok_and(|t| t == StreamType::Internal) + let ingestor_stats = if CONFIG.options.mode == Mode::Query + && STREAM_INFO + .stream_type(&stream_name) + .is_ok_and(|t| t == StreamType::Internal) { Some(fetch_stats_from_ingestors(&stream_name).await?) } else { diff --git a/src/stats.rs b/src/stats.rs index 0016dbdbe..0fac66c56 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,3 +1,4 @@ +use serde::Deserialize; use tracing::warn; /* @@ -202,3 +203,33 @@ pub fn event_labels_date<'a>( pub fn storage_size_labels_date<'a>(stream_name: &'a str, date: &'a str) -> [&'a str; 4] { ["data", stream_name, "parquet", date] } + +#[derive(Debug, Deserialize)] +pub struct StatsParams { + date: String, +} + +impl StatsParams { + pub fn get_stats(&self, stream_name: &str) -> Stats { + let event_labels = event_labels_date(stream_name, "json", &self.date); + let storage_size_labels = storage_size_labels_date(stream_name, &self.date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + + Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + } + } +} From 150eb710f9290ca8055686227e6198d579077a31 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 23:19:57 +0530 Subject: [PATCH 05/15] style imports --- src/stats.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 0fac66c56..080f55d6b 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,6 +1,3 @@ -use serde::Deserialize; -use tracing::warn; - /* * Parseable Server (C) 2022 - 2024 Parseable, Inc. * @@ -18,24 +15,31 @@ use tracing::warn; * along with this program. If not, see . * */ -use crate::metrics::{ - DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, - EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, - EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, - LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, -}; -use crate::storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}; + use std::sync::Arc; +use serde::{Deserialize, Serialize}; +use tracing::warn; + +use crate::{ + metrics::{ + DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, + EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, + EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, + LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, + }, + storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}, +}; + /// Helper struct type created by copying stats values from metadata -#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct Stats { pub events: u64, pub ingestion: u64, pub storage: u64, } -#[derive(Debug, Default, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct FullStats { pub lifetime_stats: Stats, pub current_stats: Stats, From ede0f456f09a7ef8411928577ba3e5d75f9e6177 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 4 Feb 2025 11:23:53 +0530 Subject: [PATCH 06/15] fix: failure to deserialize `StatsParams` --- src/handlers/http/logstream.rs | 22 +++++++++++++--------- src/stats.rs | 13 +++++++------ 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 11b08a743..c01a13738 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -241,7 +241,7 @@ pub async fn put_retention( pub async fn get_stats( stream_name: Path, - params: Option>, + Query(params): Query, ) -> Result { let stream_name = stream_name.into_inner(); @@ -259,8 +259,7 @@ pub async fn get_stats( } } - if let Some(Query(params)) = params { - let stats = params.get_stats(&stream_name); + if let Some(stats) = params.get_stats(&stream_name) { return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } @@ -733,13 +732,13 @@ pub mod error { #[cfg(test)] mod tests { - use crate::handlers::http::logstream::error::StreamError; - use crate::handlers::http::logstream::get_stats; - use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; - use actix_web::test::TestRequest; - use actix_web::web; + use actix_web::{test::TestRequest, web}; use anyhow::bail; + use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; + + use super::*; + // TODO: Fix this test with routes // #[actix_web::test] // #[should_panic] @@ -750,7 +749,12 @@ mod tests { #[actix_web::test] async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - match get_stats(web::Path::from("test".to_string()), None).await { + match get_stats( + web::Path::from("test".to_string()), + Query(StatsParams { date: None }), + ) + .await + { Err(StreamError::StreamNotFound(_)) => Ok(()), _ => bail!("expected StreamNotFound error"), } diff --git a/src/stats.rs b/src/stats.rs index 080f55d6b..52acd8343 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -210,13 +210,14 @@ pub fn storage_size_labels_date<'a>(stream_name: &'a str, date: &'a str) -> [&'a #[derive(Debug, Deserialize)] pub struct StatsParams { - date: String, + pub date: Option, } impl StatsParams { - pub fn get_stats(&self, stream_name: &str) -> Stats { - let event_labels = event_labels_date(stream_name, "json", &self.date); - let storage_size_labels = storage_size_labels_date(stream_name, &self.date); + pub fn get_stats(self, stream_name: &str) -> Option { + let date = self.date?; + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); let events_ingested = EVENTS_INGESTED_DATE .get_metric_with_label_values(&event_labels) .unwrap() @@ -230,10 +231,10 @@ impl StatsParams { .unwrap() .get() as u64; - Stats { + Some(Stats { events: events_ingested, ingestion: ingestion_size, storage: storage_size, - } + }) } } From cc689b4b725cfeff69dba0bb4d747e8c4f9c44fe Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 12:00:30 +0530 Subject: [PATCH 07/15] test makes no sense --- src/handlers/http/logstream.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 142e0aa11..d3cb930bb 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -647,18 +647,18 @@ mod tests { // let _ = get_stats(req).await; // } - #[actix_web::test] - async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - match get_stats( - web::Path::from("test".to_string()), - Query(StatsParams { date: None }), - ) - .await - { - Err(StreamError::StreamNotFound(_)) => Ok(()), - _ => bail!("expected StreamNotFound error"), - } - } + // #[actix_web::test] + // async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { + // match get_stats( + // web::Path::from("test".to_string()), + // Query(StatsParams { date: None }), + // ) + // .await + // { + // Err(StreamError::StreamNotFound(_)) => Ok(()), + // _ => bail!("expected StreamNotFound error"), + // } + // } #[actix_web::test] async fn header_without_log_source() { From 3902bd253e4869e83f6171874765b6c3645df17f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 12:01:11 +0530 Subject: [PATCH 08/15] style: less code == better --- src/handlers/http/logstream.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index d3cb930bb..d49541dfd 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -630,14 +630,9 @@ pub mod error { #[cfg(test)] mod tests { - use actix_web::{test::TestRequest, web}; - use anyhow::bail; + use actix_web::test::TestRequest; - use crate::{ - handlers::http::modal::utils::logstream_utils::PutStreamHeaders, stats::StatsParams, - }; - - use super::{error::StreamError, *}; + use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; // TODO: Fix this test with routes // #[actix_web::test] From 6340a8e8b80cab5dd30d74a83230ae60c61afae6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 16:14:44 +0530 Subject: [PATCH 09/15] don't handle test path --- src/handlers/http/logstream.rs | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index d49541dfd..863c24868 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -227,7 +227,7 @@ pub async fn get_stats( ) -> Result { let stream_name = stream_name.into_inner(); - if cfg!(not(test)) && !PARSEABLE.streams.contains(&stream_name) { + if !PARSEABLE.streams.contains(&stream_name) { // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -634,27 +634,6 @@ mod tests { use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; - // TODO: Fix this test with routes - // #[actix_web::test] - // #[should_panic] - // async fn get_stats_panics_without_logstream() { - // let req = TestRequest::default().to_http_request(); - // let _ = get_stats(req).await; - // } - - // #[actix_web::test] - // async fn get_stats_stream_not_found_error_for_unknown_logstream() -> anyhow::Result<()> { - // match get_stats( - // web::Path::from("test".to_string()), - // Query(StatsParams { date: None }), - // ) - // .await - // { - // Err(StreamError::StreamNotFound(_)) => Ok(()), - // _ => bail!("expected StreamNotFound error"), - // } - // } - #[actix_web::test] async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); From 92fba76b639773cecf7578875b354216d3487656 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 16:25:27 +0530 Subject: [PATCH 10/15] refactor: with less code --- src/handlers/http/logstream.rs | 121 +++++++++++++++++---------------- 1 file changed, 61 insertions(+), 60 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 863c24868..296ba3ab5 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -165,14 +165,15 @@ pub async fn get_retention(stream_name: Path) -> Result {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.options.mode == Mode::Query + && matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { + return Err(StreamNotFound(stream_name.clone()).into()); } let retention = PARSEABLE @@ -191,17 +192,18 @@ pub async fn put_retention( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.options.mode == Mode::Query + && matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { + return Err(StreamNotFound(stream_name.clone()).into()); } - let stream = PARSEABLE.get_stream(&stream_name)?; + let stream = PARSEABLE.get_stream(&stream_name)?; let retention: Retention = match serde_json::from_value(json) { Ok(retention) => retention, Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), @@ -231,15 +233,14 @@ pub async fn get_stats( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { + if PARSEABLE.options.mode != Mode::Query + || matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { return Err(StreamNotFound(stream_name).into()); } } @@ -325,15 +326,14 @@ pub async fn get_stats( pub async fn get_stream_info(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); if !PARSEABLE.streams.contains(&stream_name) { - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { + if PARSEABLE.options.mode != Mode::Query + || matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { return Err(StreamNotFound(stream_name).into()); } } @@ -385,14 +385,15 @@ pub async fn put_stream_hot_tier( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.options.mode == Mode::Query + && matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { + return Err(StreamNotFound(stream_name.clone()).into()); } let stream = PARSEABLE.get_stream(&stream_name)?; @@ -439,15 +440,14 @@ pub async fn get_stream_hot_tier(stream_name: Path) -> Result {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } - } else { + if PARSEABLE.options.mode != Mode::Query + || matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { return Err(StreamNotFound(stream_name).into()); } } @@ -468,14 +468,15 @@ pub async fn delete_stream_hot_tier( // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage - if PARSEABLE.options.mode == Mode::Query { - match PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await - { - Ok(true) => {} - Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()), - } + if PARSEABLE.options.mode == Mode::Query + && matches!( + PARSEABLE + .create_stream_and_schema_from_storage(&stream_name) + .await, + Ok(false) | Err(_) + ) + { + return Err(StreamNotFound(stream_name.clone()).into()); } if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal { From 9f54e9e3dc15140bd50626a651185831d059e5d0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 16:56:19 +0530 Subject: [PATCH 11/15] coderabbit suggestions --- src/handlers/http/logstream.rs | 45 ++++++++++++++++------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 296ba3ab5..4bf3d2031 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -229,20 +229,19 @@ pub async fn get_stats( ) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if PARSEABLE.options.mode != Mode::Query + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !PARSEABLE.streams.contains(&stream_name) + && (PARSEABLE.options.mode != Mode::Query || matches!( PARSEABLE .create_stream_and_schema_from_storage(&stream_name) .await, Ok(false) | Err(_) - ) - { - return Err(StreamNotFound(stream_name).into()); - } + )) + { + return Err(StreamNotFound(stream_name).into()); } if let Some(stats) = params.get_stats(&stream_name) { @@ -325,17 +324,16 @@ pub async fn get_stats( pub async fn get_stream_info(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - if PARSEABLE.options.mode != Mode::Query + if !PARSEABLE.streams.contains(&stream_name) + && (PARSEABLE.options.mode != Mode::Query || matches!( PARSEABLE .create_stream_and_schema_from_storage(&stream_name) .await, Ok(false) | Err(_) - ) - { - return Err(StreamNotFound(stream_name).into()); - } + )) + { + return Err(StreamNotFound(stream_name).into()); } let storage = PARSEABLE.storage.get_object_store(); // if first_event_at is not found in memory map, check if it exists in the storage @@ -436,20 +434,19 @@ pub async fn put_stream_hot_tier( pub async fn get_stream_hot_tier(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); - if !PARSEABLE.streams.contains(&stream_name) { - // For query mode, if the stream not found in memory map, - //check if it exists in the storage - //create stream and schema from storage - if PARSEABLE.options.mode != Mode::Query + // For query mode, if the stream not found in memory map, + //check if it exists in the storage + //create stream and schema from storage + if !PARSEABLE.streams.contains(&stream_name) + && (PARSEABLE.options.mode != Mode::Query || matches!( PARSEABLE .create_stream_and_schema_from_storage(&stream_name) .await, Ok(false) | Err(_) - ) - { - return Err(StreamNotFound(stream_name).into()); - } + )) + { + return Err(StreamNotFound(stream_name).into()); } let Some(hot_tier_manager) = HotTierManager::global() else { From 5e7d8d11ee41547f73a0832dd04e1faacf28629e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Feb 2025 17:25:36 +0530 Subject: [PATCH 12/15] refactor: date validation --- src/stats.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 52acd8343..9ada9bdad 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -18,7 +18,8 @@ use std::sync::Arc; -use serde::{Deserialize, Serialize}; +use chrono::NaiveDate; +use serde::{Deserialize, Deserializer, Serialize}; use tracing::warn; use crate::{ @@ -210,12 +211,13 @@ pub fn storage_size_labels_date<'a>(stream_name: &'a str, date: &'a str) -> [&'a #[derive(Debug, Deserialize)] pub struct StatsParams { - pub date: Option, + #[serde(deserialize_with = "deserialize_date")] + pub date: Option, } impl StatsParams { pub fn get_stats(self, stream_name: &str) -> Option { - let date = self.date?; + let date = self.date?.to_string(); let event_labels = event_labels_date(stream_name, "json", &date); let storage_size_labels = storage_size_labels_date(stream_name, &date); let events_ingested = EVENTS_INGESTED_DATE @@ -238,3 +240,16 @@ impl StatsParams { }) } } + +pub fn deserialize_date<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let Some(s) = Option::::deserialize(deserializer)? else { + return Ok(None); + }; + + NaiveDate::parse_from_str(&s, "%Y-%m-%d") + .map(Some) + .map_err(serde::de::Error::custom) +} From 4b0c0dc33b110650bfe78b52646002e0c6d65c9d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 09:57:50 +0530 Subject: [PATCH 13/15] style: fmt --- src/stats.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index f011bf2d8..37c00abd3 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -25,12 +25,15 @@ use prometheus::IntGaugeVec; use serde::{Deserialize, Deserializer, Serialize}; use tracing::warn; -use crate::{metrics::{ - DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, - EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, - EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, - LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, -}, storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}}; +use crate::{ + metrics::{ + DELETED_EVENTS_STORAGE_SIZE, EVENTS_DELETED, EVENTS_DELETED_SIZE, EVENTS_INGESTED, + EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, + EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, + LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, + }, + storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}, +}; /// Helper struct type created by copying stats values from metadata #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] From 43db1b6999bc0ecaf75acf3829ba58a8a4f0d76b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Mar 2025 22:35:19 +0530 Subject: [PATCH 14/15] fix: `DatedStats` merge --- src/analytics.rs | 4 +- src/catalog/mod.rs | 6 +- src/handlers/http/cluster/mod.rs | 28 ---- src/handlers/http/logstream.rs | 7 +- src/prism/home/mod.rs | 45 ++---- src/prism/logstream/mod.rs | 4 +- src/stats.rs | 232 ++++++++++++++++++++----------- 7 files changed, 166 insertions(+), 160 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index 42af72d36..c8c3e8038 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -41,7 +41,7 @@ use crate::{ }, option::Mode, parseable::PARSEABLE, - stats::{self, Stats}, + stats::{FullStats, Stats}, storage, HTTP_CLIENT, }; @@ -170,7 +170,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) { let mut deleted_json_bytes: u64 = 0; for stream in PARSEABLE.streams.list() { - let Some(stats) = stats::get_current_stats(&stream, "json") else { + let Some(stats) = FullStats::get_current(&stream, "json") else { continue; }; total_events += stats.lifetime_stats.events; diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index d44ac877b..3b0c8617e 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -33,7 +33,7 @@ use crate::{ option::Mode, parseable::PARSEABLE, query::PartialTimeFilter, - stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats}, + stats::{event_labels_date, storage_size_labels_date, update_deleted_stats, FullStats}, storage::{ object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat, }, @@ -181,7 +181,7 @@ pub async fn update_snapshot( if let Some(mut manifest) = storage.get_manifest(&path).await? { manifest.apply_change(change); storage.put_manifest(&path, manifest).await?; - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { meta.stats = stats; } @@ -307,7 +307,7 @@ async fn create_manifest( }; manifests.push(new_snapshot_entry); meta.snapshot.manifest_list = manifests; - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { meta.stats = stats; } diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 3232d54cb..d88a7a34d 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -42,7 +42,6 @@ use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; use crate::rbac::user::User; -use crate::stats::Stats; use crate::storage::{ ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY, STREAM_ROOT_DIRECTORY, }; @@ -367,33 +366,6 @@ pub async fn sync_role_update_with_ingestors( Ok(()) } -pub fn fetch_daily_stats_from_ingestors( - date: &str, - stream_meta_list: &[ObjectStoreFormat], -) -> Result { - // for the given date, get the stats from the ingestors - let mut events_ingested = 0; - let mut ingestion_size = 0; - let mut storage_size = 0; - - for meta in stream_meta_list.iter() { - for manifest in meta.snapshot.manifest_list.iter() { - if manifest.time_lower_bound.date_naive().to_string() == date { - events_ingested += manifest.events_ingested; - ingestion_size += manifest.ingestion_size; - storage_size += manifest.storage_size; - } - } - } - - let stats = Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }; - Ok(stats) -} - /// get the cumulative stats from all ingestors pub async fn fetch_stats_from_ingestors( stream_name: &str, diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 58e94c05c..6b7a11f52 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -35,7 +35,7 @@ use crate::metadata::SchemaVersion; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::rbac::role::Action; use crate::rbac::Users; -use crate::stats::StatsParams; +use crate::stats::{FullStats, Stats, StatsParams}; use crate::storage::retention::Retention; use crate::storage::{StreamInfo, StreamType}; use crate::utils::actix::extract_session_key_from_req; @@ -211,11 +211,12 @@ pub async fn get_stats( return Err(StreamNotFound(stream_name.clone()).into()); } - if let Some(stats) = params.get_stats(&stream_name) { + if let Some(date) = params.date { + let stats = Stats::for_stream_on_date(date, &stream_name); return Ok(HttpResponse::build(StatusCode::OK).json(stats)); } - let stats = stats::get_current_stats(&stream_name, "json") + let stats = FullStats::get_current(&stream_name, "json") .ok_or_else(|| StreamNotFound(stream_name.clone()))?; let time = Utc::now(); let stats = { diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 48b062e31..b5deb4f97 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -29,13 +29,10 @@ use tracing::error; use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, - handlers::http::{ - cluster::fetch_daily_stats_from_ingestors, - logstream::{error::StreamError, get_stats_date}, - }, + handlers::http::logstream::error::StreamError, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, - stats::Stats, + stats::{DatedStats, Stats}, storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, users::{dashboards::DASHBOARDS, filters::FILTERS}, }; @@ -47,14 +44,6 @@ struct StreamInfo { stats_summary: Stats, } -#[derive(Debug, Serialize, Default)] -struct DatedStats { - date: String, - events: u64, - ingestion_size: u64, - storage_size: u64, -} - #[derive(Debug, Serialize)] struct TitleAndId { title: String, @@ -155,8 +144,8 @@ pub async fn generate_home_response(key: &SessionKey) -> Result Result Result>, -) -> Result { - // collect stats for all the streams for the given date - let mut details = DatedStats { - date: date.clone(), - ..Default::default() - }; - - for (stream, meta) in stream_wise_meta { - let querier_stats = get_stats_date(&stream, &date).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; - // collect date-wise stats for all streams - details.events += querier_stats.events + ingestor_stats.events; - details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; - details.storage_size += querier_stats.storage + ingestor_stats.storage; - } - - Ok(details) -} - #[derive(Debug, thiserror::Error)] pub enum PrismHomeError { #[error("Error: {0}")] diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 2e63d68c5..91867e450 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -34,7 +34,7 @@ use crate::{ query::update_schema_when_distributed, }, parseable::{StreamNotFound, PARSEABLE}, - stats, + stats::FullStats, storage::{retention::Retention, StreamInfo, StreamType}, LOCK_EXPECT, }; @@ -93,7 +93,7 @@ async fn get_stream_schema_helper(stream_name: &str) -> Result, Stre } async fn get_stats(stream_name: &str) -> Result { - let stats = stats::get_current_stats(stream_name, "json") + let stats = FullStats::get_current(stream_name, "json") .ok_or_else(|| StreamNotFound(stream_name.to_owned()))?; let ingestor_stats = if PARSEABLE diff --git a/src/stats.rs b/src/stats.rs index 37c00abd3..6499be92d 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -16,7 +16,7 @@ * */ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use chrono::NaiveDate; use prometheus::core::Collector; @@ -32,6 +32,7 @@ use crate::{ EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE, }, + prism::home::PrismHomeError, storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat}, }; @@ -43,6 +44,91 @@ pub struct Stats { pub storage: u64, } +impl Stats { + pub fn for_stream_on_date(date: NaiveDate, stream_name: &str) -> Stats { + let date = date.to_string(); + let event_labels = event_labels_date(stream_name, "json", &date); + let storage_size_labels = storage_size_labels_date(stream_name, &date); + let events_ingested = EVENTS_INGESTED_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE_DATE + .get_metric_with_label_values(&event_labels) + .unwrap() + .get() as u64; + let storage_size = EVENTS_STORAGE_SIZE_DATE + .get_metric_with_label_values(&storage_size_labels) + .unwrap() + .get() as u64; + + Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + } + } + + pub fn fetch_from_ingestors( + date: NaiveDate, + stream_meta_list: &[ObjectStoreFormat], + ) -> Result { + // for the given date, get the stats from the ingestors + let mut events_ingested = 0; + let mut ingestion_size = 0; + let mut storage_size = 0; + + for meta in stream_meta_list.iter() { + for manifest in meta.snapshot.manifest_list.iter() { + if manifest.time_lower_bound.date_naive() == date { + events_ingested += manifest.events_ingested; + ingestion_size += manifest.ingestion_size; + storage_size += manifest.storage_size; + } + } + } + + let stats = Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }; + Ok(stats) + } +} + +#[derive(Debug, Serialize, Default)] +pub struct DatedStats { + pub date: NaiveDate, + pub events: u64, + pub ingestion_size: u64, + pub storage_size: u64, +} + +impl DatedStats { + pub fn for_all_streams( + date: NaiveDate, + stream_wise_meta: &HashMap>, + ) -> Result, PrismHomeError> { + // collect stats for all the streams for the given date + let mut details = DatedStats { + date, + ..Default::default() + }; + + for (stream, meta) in stream_wise_meta { + let querier_stats = Stats::for_stream_on_date(date, stream); + let ingestor_stats = Stats::fetch_from_ingestors(date, meta)?; + // collect date-wise stats for all streams + details.events += querier_stats.events + ingestor_stats.events; + details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion; + details.storage_size += querier_stats.storage + ingestor_stats.storage; + } + + Ok(Some(details)) + } +} + #[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] pub struct FullStats { pub lifetime_stats: Stats, @@ -50,64 +136,66 @@ pub struct FullStats { pub deleted_stats: Stats, } -pub fn get_current_stats(stream_name: &str, format: &'static str) -> Option { - let event_labels = event_labels(stream_name, format); - let storage_size_labels = storage_size_labels(stream_name); +impl FullStats { + pub fn get_current(stream_name: &str, format: &'static str) -> Option { + let event_labels = event_labels(stream_name, format); + let storage_size_labels = storage_size_labels(stream_name); - let events_ingested = EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let storage_size = STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let events_deleted = EVENTS_DELETED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let events_deleted_size = EVENTS_DELETED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let deleted_events_storage_size = DELETED_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let lifetime_events_ingested = LIFETIME_EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_ingestion_size = LIFETIME_EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_events_storage_size = LIFETIME_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - - Some(FullStats { - lifetime_stats: Stats { - events: lifetime_events_ingested, - ingestion: lifetime_ingestion_size, - storage: lifetime_events_storage_size, - }, - current_stats: Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }, - deleted_stats: Stats { - events: events_deleted, - ingestion: events_deleted_size, - storage: deleted_events_storage_size, - }, - }) + let events_ingested = EVENTS_INGESTED + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let ingestion_size = EVENTS_INGESTED_SIZE + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let storage_size = STORAGE_SIZE + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; + let events_deleted = EVENTS_DELETED + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let events_deleted_size = EVENTS_DELETED_SIZE + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let deleted_events_storage_size = DELETED_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; + let lifetime_events_ingested = LIFETIME_EVENTS_INGESTED + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let lifetime_ingestion_size = LIFETIME_EVENTS_INGESTED_SIZE + .get_metric_with_label_values(&event_labels) + .ok()? + .get() as u64; + let lifetime_events_storage_size = LIFETIME_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(&storage_size_labels) + .ok()? + .get() as u64; + + Some(FullStats { + lifetime_stats: Stats { + events: lifetime_events_ingested, + ingestion: lifetime_ingestion_size, + storage: lifetime_events_storage_size, + }, + current_stats: Stats { + events: events_ingested, + ingestion: ingestion_size, + storage: storage_size, + }, + deleted_stats: Stats { + events: events_deleted, + ingestion: events_deleted_size, + storage: deleted_events_storage_size, + }, + }) + } } pub async fn update_deleted_stats( @@ -161,7 +249,7 @@ pub async fn update_deleted_stats( STORAGE_SIZE .with_label_values(&["data", stream_name, "parquet"]) .sub(storage_size); - let stats = get_current_stats(stream_name, "json"); + let stats = FullStats::get_current(stream_name, "json"); if let Some(stats) = stats { if let Err(e) = storage.put_stats(stream_name, &stats).await { warn!("Error updating stats to objectstore due to error [{}]", e); @@ -231,32 +319,6 @@ pub struct StatsParams { pub date: Option, } -impl StatsParams { - pub fn get_stats(self, stream_name: &str) -> Option { - let date = self.date?.to_string(); - let event_labels = event_labels_date(stream_name, "json", &date); - let storage_size_labels = storage_size_labels_date(stream_name, &date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - - Some(Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }) - } -} - pub fn deserialize_date<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, From f820090f0b6ecc472bf21a0c6af191d29073592b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 14:14:06 +0530 Subject: [PATCH 15/15] refactor: separate constructors --- src/stats.rs | 118 ++++++++++++++++++++++++++++----------------------- 1 file changed, 66 insertions(+), 52 deletions(-) diff --git a/src/stats.rs b/src/stats.rs index 6499be92d..4b2612fa4 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -45,6 +45,69 @@ pub struct Stats { } impl Stats { + fn get_current(event_labels: &[&str], storage_size_labels: &[&str]) -> Option { + let events = EVENTS_INGESTED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = EVENTS_INGESTED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + + fn get_lifetime(event_labels: &[&str], storage_size_labels: &[&str]) -> Option { + let events = LIFETIME_EVENTS_INGESTED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = LIFETIME_EVENTS_INGESTED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = LIFETIME_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + + fn get_deleted(event_labels: &[&str], storage_size_labels: &[&str]) -> Option { + let events = EVENTS_DELETED + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let ingestion = EVENTS_DELETED_SIZE + .get_metric_with_label_values(event_labels) + .ok()? + .get() as u64; + let storage = DELETED_EVENTS_STORAGE_SIZE + .get_metric_with_label_values(storage_size_labels) + .ok()? + .get() as u64; + + Some(Self { + events, + ingestion, + storage, + }) + } + pub fn for_stream_on_date(date: NaiveDate, stream_name: &str) -> Stats { let date = date.to_string(); let event_labels = event_labels_date(stream_name, "json", &date); @@ -141,59 +204,10 @@ impl FullStats { let event_labels = event_labels(stream_name, format); let storage_size_labels = storage_size_labels(stream_name); - let events_ingested = EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let storage_size = STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let events_deleted = EVENTS_DELETED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let events_deleted_size = EVENTS_DELETED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let deleted_events_storage_size = DELETED_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - let lifetime_events_ingested = LIFETIME_EVENTS_INGESTED - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_ingestion_size = LIFETIME_EVENTS_INGESTED_SIZE - .get_metric_with_label_values(&event_labels) - .ok()? - .get() as u64; - let lifetime_events_storage_size = LIFETIME_EVENTS_STORAGE_SIZE - .get_metric_with_label_values(&storage_size_labels) - .ok()? - .get() as u64; - Some(FullStats { - lifetime_stats: Stats { - events: lifetime_events_ingested, - ingestion: lifetime_ingestion_size, - storage: lifetime_events_storage_size, - }, - current_stats: Stats { - events: events_ingested, - ingestion: ingestion_size, - storage: storage_size, - }, - deleted_stats: Stats { - events: events_deleted, - ingestion: events_deleted_size, - storage: deleted_events_storage_size, - }, + lifetime_stats: Stats::get_lifetime(&event_labels, &storage_size_labels)?, + current_stats: Stats::get_current(&event_labels, &storage_size_labels)?, + deleted_stats: Stats::get_deleted(&event_labels, &storage_size_labels)?, }) } }