diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 4216f1076..e41b11117 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -206,7 +206,7 @@ async fn migration_stream( ) -> anyhow::Result> { let mut arrow_schema: Schema = Schema::empty(); - let schema = fetch_or_create_schema(stream, storage).await?; + let schema = storage.create_schema_from_storage(stream).await?; let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?; let mut stream_meta_found = true; @@ -234,29 +234,6 @@ async fn migration_stream( Ok(Some(metadata)) } -async fn fetch_or_create_schema( - stream: &str, - storage: &dyn ObjectStorage, -) -> anyhow::Result { - let schema_path = schema_path(stream); - if let Ok(schema) = storage.get_object(&schema_path).await { - Ok(schema) - } else { - let querier_schema = storage - .create_schema_from_querier(stream) - .await - .unwrap_or_default(); - if !querier_schema.is_empty() { - Ok(querier_schema) - } else { - Ok(storage - .create_schema_from_ingestor(stream) - .await - .unwrap_or_default()) - } - } -} - async fn fetch_or_create_stream_metadata( stream: &str, storage: &dyn ObjectStorage, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 87496cfc1..e1f09ee23 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -281,7 +281,7 @@ impl Parseable { let (stream_metadata_bytes, schema_bytes) = try_join!( storage.create_stream_from_ingestor(stream_name), - storage.create_schema_from_ingestor(stream_name) + storage.create_schema_from_storage(stream_name) )?; let stream_metadata = if stream_metadata_bytes.is_empty() { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 0289995c3..ee385b7c1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -49,6 +49,7 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::http::fetch_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::CORRELATION_DIR; @@ -652,44 +653,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(Bytes::new()) } - ///create schema from querier schema from storage - async fn create_schema_from_querier( + ///create schema from storage + async fn create_schema_from_storage( &self, stream_name: &str, ) -> Result { - let path = - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - if let Ok(querier_schema_bytes) = self.get_object(&path).await { - self.put_object(&schema_path(stream_name), querier_schema_bytes.clone()) - .await?; - return Ok(querier_schema_bytes); - } - Ok(Bytes::new()) - } - - ///create schema from ingestor schema from storage - async fn create_schema_from_ingestor( - &self, - stream_name: &str, - ) -> Result { - let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); - if let Some(schema_obs) = self - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("schema") - }), - ) - .await - .into_iter() - .next() - { - let schema_ob = &schema_obs[0]; - self.put_object(&schema_path(stream_name), schema_ob.clone()) - .await?; - return Ok(schema_ob.clone()); - } - Ok(Bytes::new()) + let schema = fetch_schema(stream_name).await?; + // convert to bytes + let schema = serde_json::to_vec(&schema)?; + let schema_bytes = Bytes::from(schema); + self.put_object(&schema_path(stream_name), schema_bytes.clone()) + .await?; + Ok(schema_bytes) } async fn get_stream_meta_from_storage(