Skip to content

Commit 8eb4934

Browse files
chore: simplify schema creation from storage
remove functions that creates schema from ingestors and queriers separately reused function `fetch_schema` that fetches all schema files and merges the schemas into one this ensures the schema is always the latest
1 parent 5d34b02 commit 8eb4934

File tree

5 files changed

+25
-63
lines changed

5 files changed

+25
-63
lines changed

src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ pub struct Options {
451451
help = "Object store sync threshold in seconds"
452452
)]
453453
pub object_store_sync_threshold: u64,
454-
// the oidc scope
454+
// the oidc scope
455455
#[arg(
456456
long = "oidc-scope",
457457
name = "oidc-scope",

src/handlers/http/oidc.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,13 @@ pub async fn login(
7777
let session_key = extract_session_key_from_req(&req).ok();
7878
let (session_key, oidc_client) = match (session_key, oidc_client) {
7979
(None, None) => return Ok(redirect_no_oauth_setup(query.redirect.clone())),
80-
(None, Some(client)) => return Ok(redirect_to_oidc(query, client, PARSEABLE.options.scope.to_string().as_str())),
80+
(None, Some(client)) => {
81+
return Ok(redirect_to_oidc(
82+
query,
83+
client,
84+
PARSEABLE.options.scope.to_string().as_str(),
85+
))
86+
}
8187
(Some(session_key), client) => (session_key, client),
8288
};
8389
// try authorize
@@ -113,7 +119,11 @@ pub async fn login(
113119
} else {
114120
Users.remove_session(&key);
115121
if let Some(oidc_client) = oidc_client {
116-
redirect_to_oidc(query, oidc_client, PARSEABLE.options.scope.to_string().as_str())
122+
redirect_to_oidc(
123+
query,
124+
oidc_client,
125+
PARSEABLE.options.scope.to_string().as_str(),
126+
)
117127
} else {
118128
redirect_to_client(query.redirect.as_str(), None)
119129
}

src/migration/mod.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ async fn migration_stream(
184184
) -> anyhow::Result<Option<LogStreamMetadata>> {
185185
let mut arrow_schema: Schema = Schema::empty();
186186

187-
let schema = fetch_or_create_schema(stream, storage).await?;
187+
let schema = storage.create_schema_from_storage(stream).await?;
188188
let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?;
189189

190190
let mut stream_meta_found = true;
@@ -212,29 +212,6 @@ async fn migration_stream(
212212
Ok(Some(metadata))
213213
}
214214

215-
async fn fetch_or_create_schema(
216-
stream: &str,
217-
storage: &dyn ObjectStorage,
218-
) -> anyhow::Result<Bytes> {
219-
let schema_path = schema_path(stream);
220-
if let Ok(schema) = storage.get_object(&schema_path).await {
221-
Ok(schema)
222-
} else {
223-
let querier_schema = storage
224-
.create_schema_from_querier(stream)
225-
.await
226-
.unwrap_or_default();
227-
if !querier_schema.is_empty() {
228-
Ok(querier_schema)
229-
} else {
230-
Ok(storage
231-
.create_schema_from_ingestor(stream)
232-
.await
233-
.unwrap_or_default())
234-
}
235-
}
236-
}
237-
238215
async fn fetch_or_create_stream_metadata(
239216
stream: &str,
240217
storage: &dyn ObjectStorage,

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ impl Parseable {
273273

274274
let (stream_metadata_bytes, schema_bytes) = try_join!(
275275
storage.create_stream_from_ingestor(stream_name),
276-
storage.create_schema_from_ingestor(stream_name)
276+
storage.create_schema_from_storage(stream_name)
277277
)?;
278278

279279
let stream_metadata = if stream_metadata_bytes.is_empty() {

src/storage/object_storage.rs

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot};
6565
use crate::correlation::{CorrelationConfig, CorrelationError};
6666
use crate::event::format::LogSource;
6767
use crate::event::format::LogSourceEntry;
68+
use crate::handlers::http::fetch_schema;
6869
use crate::handlers::http::ingest::PostError;
6970
use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT;
7071
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
@@ -671,44 +672,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
671672
Ok(Bytes::new())
672673
}
673674

674-
///create schema from querier schema from storage
675-
async fn create_schema_from_querier(
675+
///create schema from storage
676+
async fn create_schema_from_storage(
676677
&self,
677678
stream_name: &str,
678679
) -> Result<Bytes, ObjectStorageError> {
679-
let path =
680-
RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]);
681-
if let Ok(querier_schema_bytes) = self.get_object(&path).await {
682-
self.put_object(&schema_path(stream_name), querier_schema_bytes.clone())
683-
.await?;
684-
return Ok(querier_schema_bytes);
685-
}
686-
Ok(Bytes::new())
687-
}
688-
689-
///create schema from ingestor schema from storage
690-
async fn create_schema_from_ingestor(
691-
&self,
692-
stream_name: &str,
693-
) -> Result<Bytes, ObjectStorageError> {
694-
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
695-
if let Some(schema_obs) = self
696-
.get_objects(
697-
Some(&path),
698-
Box::new(|file_name| {
699-
file_name.starts_with(".ingestor") && file_name.ends_with("schema")
700-
}),
701-
)
702-
.await
703-
.into_iter()
704-
.next()
705-
{
706-
let schema_ob = &schema_obs[0];
707-
self.put_object(&schema_path(stream_name), schema_ob.clone())
708-
.await?;
709-
return Ok(schema_ob.clone());
710-
}
711-
Ok(Bytes::new())
680+
let schema = fetch_schema(stream_name).await?;
681+
// convert to bytes
682+
let schema = serde_json::to_vec(&schema)?;
683+
let schema_bytes = Bytes::from(schema);
684+
self.put_object(&schema_path(stream_name), schema_bytes.clone())
685+
.await?;
686+
Ok(schema_bytes)
712687
}
713688

714689
async fn get_stream_meta_from_storage(

0 commit comments

Comments
 (0)