Skip to content

Commit bd3f0ae

Browse files
author
Devdutt Shenoi
committed
refactor: associate ObjectStorage::fetch_schemas
1 parent 24ac1bb commit bd3f0ae

File tree

4 files changed

+56
-61
lines changed

4 files changed

+56
-61
lines changed

src/event/mod.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,12 @@
2020
pub mod format;
2121

2222
use arrow_array::RecordBatch;
23-
use arrow_schema::{Field, Fields, Schema};
23+
use arrow_schema::Field;
2424
use itertools::Itertools;
2525
use std::sync::Arc;
2626

2727
use self::error::EventError;
28-
use crate::{
29-
metadata::update_stats,
30-
parseable::{StagingError, PARSEABLE},
31-
storage::StreamType,
32-
LOCK_EXPECT,
33-
};
28+
use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType};
3429
use chrono::NaiveDateTime;
3530
use std::collections::HashMap;
3631

@@ -64,11 +59,13 @@ impl Event {
6459
}
6560
}
6661

62+
let stream = PARSEABLE.get_stream(&self.stream_name)?;
6763
if self.is_first_event {
68-
commit_schema(&self.stream_name, self.rb.schema())?;
64+
let schema = self.rb.schema().as_ref().clone();
65+
stream.commit_schema(schema)?;
6966
}
7067

71-
PARSEABLE.get_or_create_stream(&self.stream_name).push(
68+
stream.push(
7269
&key,
7370
&self.rb,
7471
self.parsed_timestamp,
@@ -114,31 +111,20 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
114111
format!("{hash:x}")
115112
}
116113

117-
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), StagingError> {
118-
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");
119-
120-
let map = &mut stream_metadata
121-
.get_mut(stream_name)
122-
.expect("map has entry for this stream name")
123-
.metadata
124-
.write()
125-
.expect(LOCK_EXPECT)
126-
.schema;
127-
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
128-
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
129-
map.clear();
130-
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
131-
Ok(())
132-
}
133-
134114
pub mod error {
135-
136-
use crate::{parseable::StagingError, storage::ObjectStorageError};
115+
use crate::{
116+
parseable::{StagingError, StreamNotFound},
117+
storage::ObjectStorageError,
118+
};
137119

138120
#[derive(Debug, thiserror::Error)]
139121
pub enum EventError {
140122
#[error("Staging Failed: {0}")]
141123
Staging(#[from] StagingError),
124+
#[error("{0}")]
125+
Stream(#[from] StreamNotFound),
126+
#[error("Arrow Error: {0}")]
127+
Arrow(#[from] arrow_schema::ArrowError),
142128
#[error("ObjectStorage Error: {0}")]
143129
ObjectStorage(#[from] ObjectStorageError),
144130
}

src/parseable/mod.rs

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use bytes::Bytes;
2525
use chrono::Utc;
2626
use clap::{error::ErrorKind, Parser};
2727
use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
28-
use itertools::Itertools;
2928
use once_cell::sync::Lazy;
30-
use relative_path::RelativePathBuf;
3129
pub use staging::StagingError;
3230
use streams::StreamRef;
3331
pub use streams::{StreamNotFound, Streams};
@@ -37,7 +35,7 @@ use tracing::error;
3735
use crate::connectors::kafka::config::KafkaConfig;
3836
use crate::{
3937
cli::{Cli, Options, StorageOptions},
40-
event::{commit_schema, error::EventError, format::LogSource},
38+
event::{error::EventError, format::LogSource},
4139
handlers::{
4240
http::{
4341
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
@@ -52,7 +50,7 @@ use crate::{
5250
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
5351
storage::{
5452
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
55-
ObjectStoreFormat, Owner, Permisssion, StreamType, STREAM_ROOT_DIRECTORY,
53+
ObjectStoreFormat, Owner, Permisssion, StreamType,
5654
},
5755
validator,
5856
};
@@ -768,47 +766,21 @@ impl Parseable {
768766
Some(first_event_at.to_string())
769767
}
770768

771-
/// Fetches the schema for the specified stream.
772-
///
773-
/// # Arguments
774-
///
775-
/// * `stream_name` - The name of the stream to fetch the schema for.
776-
///
777-
/// # Returns
778-
///
779-
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
780-
pub async fn fetch_schema(&self, stream_name: &str) -> anyhow::Result<Schema> {
781-
let path_prefix = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
782-
let store = self.storage.get_object_store();
783-
let res: Vec<Schema> = store
784-
.get_objects(
785-
Some(&path_prefix),
786-
Box::new(|file_name: String| file_name.contains(".schema")),
787-
)
788-
.await?
789-
.iter()
790-
// we should be able to unwrap as we know the data is valid schema
791-
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
792-
.collect_vec();
793-
794-
let new_schema = Schema::try_merge(res)?;
795-
Ok(new_schema)
796-
}
797-
798769
pub async fn update_schema_when_distributed(
799770
&self,
800771
tables: &Vec<String>,
801772
) -> Result<(), EventError> {
802773
if self.options.mode == Mode::Query {
803774
for table in tables {
804-
if let Ok(new_schema) = self.fetch_schema(table).await {
775+
if let Ok(schemas) = self.storage.get_object_store().fetch_schemas(table).await {
776+
let new_schema = Schema::try_merge(schemas)?;
805777
// commit schema merges the schema internally and updates the schema in storage.
806778
self.storage
807779
.get_object_store()
808780
.commit_schema(table, new_schema.clone())
809781
.await?;
810782

811-
commit_schema(table, Arc::new(new_schema))?;
783+
self.get_stream(table)?.commit_schema(new_schema)?;
812784
}
813785
}
814786
}

src/parseable/streams.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,17 @@ impl Stream {
547547
Schema::try_merge(vec![schema, current_schema]).unwrap()
548548
}
549549

550+
pub fn commit_schema(&self, schema: Schema) -> Result<(), StagingError> {
551+
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
552+
let current_schema = Schema::new(metadata.schema.values().cloned().collect::<Fields>());
553+
let schema = Schema::try_merge(vec![current_schema, schema])?;
554+
metadata.schema.clear();
555+
metadata
556+
.schema
557+
.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
558+
Ok(())
559+
}
560+
550561
/// Stores the provided stream metadata in memory mapping
551562
pub async fn set_metadata(&self, updated_metadata: LogStreamMetadata) {
552563
*self.metadata.write().expect(LOCK_EXPECT) = updated_metadata;

src/storage/object_storage.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,32 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
799799
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
800800
self.put_schema(stream_name, &new_schema).await
801801
}
802+
803+
/// Fetches all schemas associated with a specified stream.
804+
///
805+
/// # Arguments
806+
///
807+
/// * `stream_name` - The name of the stream to fetch the schema for.
808+
///
809+
/// # Returns
810+
///
811+
/// An array of `arrow_schema::Schema` for the specified stream.
812+
async fn fetch_schemas(&self, stream_name: &str) -> Result<Vec<Schema>, ObjectStorageError> {
813+
let path_prefix = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
814+
let mut schemas: Vec<Schema> = vec![];
815+
for byte_obj in self
816+
.get_objects(
817+
Some(&path_prefix),
818+
Box::new(|file_name: String| file_name.contains(".schema")),
819+
)
820+
.await?
821+
{
822+
let schema = serde_json::from_slice(&byte_obj)?;
823+
schemas.push(schema);
824+
}
825+
826+
Ok(schemas)
827+
}
802828
}
803829

804830
#[inline(always)]

0 commit comments

Comments
 (0)