From 2c43745dbf9fa231ad0ccea8cd63f0fe28c52faa Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 5 Mar 2025 23:40:13 +0530 Subject: [PATCH 1/3] refactor: associate `ObjectStore::commit_schema` --- src/handlers/http/query.rs | 7 +++++-- src/storage/object_storage.rs | 20 ++++++++++---------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7d9c33a45..ffc8ec8a6 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -45,7 +45,6 @@ use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; -use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; @@ -173,7 +172,11 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), for table in tables { if let Ok(new_schema) = fetch_schema(table).await { // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(table, new_schema.clone()).await?; + PARSEABLE + .storage + .get_object_store() + .commit_schema(table, new_schema.clone()) + .await?; commit_schema(table, Arc::new(new_schema))?; } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index dea669975..64e31a11e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -869,7 +869,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { for path in stream.schema_files() { let file = File::open(&path)?; let schema: Schema = serde_json::from_reader(file)?; - commit_schema_to_storage(&stream_name, schema).await?; + self.commit_schema(&stream_name, schema).await?; if let Err(e) = remove_file(path) { warn!("Failed to remove staged file: {e}"); } @@ -878,16 +878,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } -} -pub async fn commit_schema_to_storage( - stream_name: &str, - schema: Schema, -) -> Result<(), ObjectStorageError> { - let storage = PARSEABLE.storage().get_object_store(); - let stream_schema = storage.get_schema(stream_name).await?; - let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); - storage.put_schema(stream_name, &new_schema).await + async fn commit_schema( + &self, + stream_name: &str, + schema: Schema, + ) -> Result<(), ObjectStorageError> { + let stream_schema = self.get_schema(stream_name).await?; + let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + self.put_schema(stream_name, &new_schema).await + } } #[inline(always)] From 426397f904c7849bdf2016cc4f0fe49107df24c9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 12:54:25 +0530 Subject: [PATCH 2/3] refactor: return instead of panic --- src/storage/mod.rs | 3 +++ src/storage/object_storage.rs | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b6bc9bb25..aca514e5f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -262,6 +262,9 @@ pub enum ObjectStorageError { #[error("JoinError: {0}")] JoinError(#[from] JoinError), + + #[error("Arrow Error: {0}")] + Arrow(#[from] arrow_schema::ArrowError), } pub fn to_object_store_path(path: &RelativePath) -> Path { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 64e31a11e..eaaa07f40 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -879,13 +879,14 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + /// Stores updated schema in storage async fn commit_schema( &self, stream_name: &str, schema: Schema, ) -> Result<(), ObjectStorageError> { let stream_schema = self.get_schema(stream_name).await?; - let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + let new_schema = Schema::try_merge(vec![schema, stream_schema])?; self.put_schema(stream_name, &new_schema).await } } From 4e26ba50d99ec9c1eeb3d93cf56efd5eb1a6bf6b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Mar 2025 13:06:53 +0530 Subject: [PATCH 3/3] refactor: associate `Stream::commit_schema` --- src/event/mod.rs | 50 +++++++++++++------------------------- src/handlers/http/query.rs | 6 ++--- src/parseable/streams.rs | 14 ++++++++++- 3 files changed, 33 insertions(+), 37 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index b641643cb..3da339d55 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -17,22 +17,17 @@ * */ -pub mod format; +use std::{collections::HashMap, sync::Arc}; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; -use itertools::Itertools; -use std::sync::Arc; - -use self::error::EventError; -use crate::{ - metadata::update_stats, - parseable::{StagingError, PARSEABLE}, - storage::StreamType, - LOCK_EXPECT, -}; +use arrow_schema::Field; use chrono::NaiveDateTime; -use std::collections::HashMap; +use error::EventError; +use itertools::Itertools; + +use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType}; + +pub mod format; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const USER_AGENT_KEY: &str = "p_user_agent"; @@ -67,11 +62,12 @@ impl Event { } } + let stream = PARSEABLE.get_or_create_stream(&self.stream_name); if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; + stream.commit_schema(self.rb.schema())?; } - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, @@ -117,26 +113,12 @@ pub fn get_schema_key(fields: &[Arc]) -> String { format!("{hash:x}") } -pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), StagingError> { - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); - - let map = &mut stream_metadata - .get_mut(stream_name) - .expect("map has entry for this stream name") - .metadata - .write() - .expect(LOCK_EXPECT) - .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); - let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; - map.clear(); - map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); - Ok(()) -} - pub mod error { - use crate::{parseable::StagingError, storage::ObjectStorageError}; + use crate::{ + parseable::{StagingError, StreamNotFound}, + storage::ObjectStorageError, + }; #[derive(Debug, thiserror::Error)] pub enum EventError { @@ -144,5 +126,7 @@ pub mod error { Staging(#[from] StagingError), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), + #[error("{0}")] + NotFound(#[from] StreamNotFound), } } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index ffc8ec8a6..94dd5334c 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -35,8 +35,6 @@ use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; - -use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; @@ -178,7 +176,9 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), .commit_schema(table, new_schema.clone()) .await?; - commit_schema(table, Arc::new(new_schema))?; + PARSEABLE + .get_stream(table)? + .commit_schema(Arc::new(new_schema))?; } } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..ba1ccab25 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,7 +28,7 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::{Field, Fields, Schema, SchemaRef}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; @@ -591,6 +591,18 @@ impl Stream { self.metadata.read().expect(LOCK_EXPECT).schema_version } + /// Stores updated schema in-memory + pub fn commit_schema(&self, schema: SchemaRef) -> Result<(), StagingError> { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + let current_schema = Schema::new(metadata.schema.values().cloned().collect::()); + let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; + metadata.schema.clear(); + metadata + .schema + .extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); + Ok(()) + } + pub fn get_schema(&self) -> Arc { let metadata = self.metadata.read().expect(LOCK_EXPECT);