diff --git a/src/hottier.rs b/src/hottier.rs index b01a344d3..6321823c6 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -27,7 +27,7 @@ use crate::{ catalog::manifest::{File, Manifest}, handlers::http::cluster::INTERNAL_STREAM_NAME, parseable::PARSEABLE, - storage::{ObjectStorage, ObjectStorageError}, + storage::{ObjectStorage, ObjectStorageError, field_stats::DATASET_STATS_STREAM_NAME}, utils::{extract_datetime, human_size::bytes_to_human_size}, validator::error::HotTierValidationError, }; @@ -252,6 +252,11 @@ impl HotTierManager { ///sync the hot tier files from S3 to the hot tier directory for all streams async fn sync_hot_tier(&self) -> Result<(), HotTierError> { + // Before syncing, check if pstats stream was created and needs hot tier + if let Err(e) = self.create_pstats_hot_tier().await { + tracing::trace!("Skipping pstats hot tier creation because of error: {e}"); + } + let mut sync_hot_tier_tasks = FuturesUnordered::new(); for stream in PARSEABLE.streams.list() { if self.check_stream_hot_tier_exists(&stream) { @@ -708,6 +713,30 @@ impl HotTierManager { Ok(()) } + /// Creates hot tier for pstats internal stream if the stream exists in storage + async fn create_pstats_hot_tier(&self) -> Result<(), HotTierError> { + // Check if pstats hot tier already exists + if !self.check_stream_hot_tier_exists(DATASET_STATS_STREAM_NAME) { + // Check if pstats stream exists in storage by attempting to load it + if PARSEABLE + .check_or_load_stream(DATASET_STATS_STREAM_NAME) + .await + { + let mut stream_hot_tier = StreamHotTier { + version: Some(CURRENT_HOT_TIER_VERSION.to_string()), + size: MIN_STREAM_HOT_TIER_SIZE_BYTES, + used_size: 0, + available_size: MIN_STREAM_HOT_TIER_SIZE_BYTES, + oldest_date_time_entry: None, + }; + self.put_hot_tier(DATASET_STATS_STREAM_NAME, &mut stream_hot_tier) + .await?; + } + } + + Ok(()) + } + /// Get the disk usage for the hot tier storage path. If we have a three disk paritions /// mounted as follows: /// 1. /