Skip to content

Commit 376dbda

Browse files
Merge branch 'main' into user-group
2 parents eb9de1d + 5d34b02 commit 376dbda

File tree

5 files changed

+69
-24
lines changed

5 files changed

+69
-24
lines changed

src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,16 @@ pub struct Options {
451451
help = "Object store sync threshold in seconds"
452452
)]
453453
pub object_store_sync_threshold: u64,
454+
// the oidc scope
455+
#[arg(
456+
long = "oidc-scope",
457+
name = "oidc-scope",
458+
env = "P_OIDC_SCOPE",
459+
default_value = "openid profile email",
460+
required = false,
461+
help = "OIDC scope to request (default: openid profile email)"
462+
)]
463+
pub scope: String,
454464
}
455465

456466
#[derive(Parser, Debug)]

src/handlers/http/oidc.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use ulid::Ulid;
3232
use url::Url;
3333

3434
use crate::{
35-
handlers::{COOKIE_AGE_DAYS, OIDC_SCOPE, SESSION_COOKIE_NAME, USER_COOKIE_NAME},
35+
handlers::{COOKIE_AGE_DAYS, SESSION_COOKIE_NAME, USER_COOKIE_NAME},
3636
oidc::{Claims, DiscoveredClient},
3737
parseable::PARSEABLE,
3838
rbac::{
@@ -77,7 +77,7 @@ pub async fn login(
7777
let session_key = extract_session_key_from_req(&req).ok();
7878
let (session_key, oidc_client) = match (session_key, oidc_client) {
7979
(None, None) => return Ok(redirect_no_oauth_setup(query.redirect.clone())),
80-
(None, Some(client)) => return Ok(redirect_to_oidc(query, client)),
80+
(None, Some(client)) => return Ok(redirect_to_oidc(query, client, PARSEABLE.options.scope.to_string().as_str())),
8181
(Some(session_key), client) => (session_key, client),
8282
};
8383
// try authorize
@@ -113,7 +113,7 @@ pub async fn login(
113113
} else {
114114
Users.remove_session(&key);
115115
if let Some(oidc_client) = oidc_client {
116-
redirect_to_oidc(query, oidc_client)
116+
redirect_to_oidc(query, oidc_client, PARSEABLE.options.scope.to_string().as_str())
117117
} else {
118118
redirect_to_client(query.redirect.as_str(), None)
119119
}
@@ -226,10 +226,11 @@ fn exchange_basic_for_cookie(user: &User, key: SessionKey) -> Cookie<'static> {
226226
fn redirect_to_oidc(
227227
query: web::Query<RedirectAfterLogin>,
228228
oidc_client: &DiscoveredClient,
229+
scope: &str,
229230
) -> HttpResponse {
230231
let redirect = query.into_inner().redirect.to_string();
231232
let auth_url = oidc_client.auth_url(&Options {
232-
scope: Some(OIDC_SCOPE.into()),
233+
scope: Some(scope.to_string()),
233234
state: Some(redirect),
234235
..Default::default()
235236
});

src/handlers/http/query.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ use http::StatusCode;
3535
use itertools::Itertools;
3636
use serde::{Deserialize, Serialize};
3737
use serde_json::{json, Value};
38-
use std::collections::HashMap;
38+
use std::collections::{HashMap, HashSet};
3939
use std::pin::Pin;
4040
use std::sync::Arc;
4141
use std::time::Instant;
42-
use tracing::error;
42+
use tokio::task::JoinSet;
43+
use tracing::{error, warn};
4344

4445
use crate::event::commit_schema;
4546
use crate::metrics::QUERY_EXECUTE_TIME;
@@ -126,7 +127,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
126127
{
127128
Ok(raw_logical_plan) => raw_logical_plan,
128129
Err(_) => {
129-
create_streams_for_querier().await;
130+
create_streams_for_querier().await?;
130131
session_state
131132
.create_logical_plan(&query_request.query)
132133
.await?
@@ -433,17 +434,45 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
433434
/// Create streams for querier if they do not exist
434435
/// get list of streams from memory and storage
435436
/// create streams for memory from storage if they do not exist
436-
pub async fn create_streams_for_querier() {
437-
let querier_streams = PARSEABLE.streams.list();
437+
pub async fn create_streams_for_querier() -> Result<(), QueryError> {
438438
let store = PARSEABLE.storage.get_object_store();
439-
let storage_streams = store.list_streams().await.unwrap();
440-
for stream_name in storage_streams {
441-
if !querier_streams.contains(&stream_name) {
442-
let _ = PARSEABLE
439+
let querier_streams = PARSEABLE.streams.list();
440+
441+
let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect();
442+
443+
let storage_streams = store.list_streams().await?;
444+
445+
let missing_streams: Vec<_> = storage_streams
446+
.into_iter()
447+
.filter(|stream_name| !querier_streams_set.contains(stream_name))
448+
.collect();
449+
450+
if missing_streams.is_empty() {
451+
return Ok(());
452+
}
453+
454+
let mut join_set = JoinSet::new();
455+
for stream_name in missing_streams {
456+
join_set.spawn(async move {
457+
let result = PARSEABLE
443458
.create_stream_and_schema_from_storage(&stream_name)
444459
.await;
460+
461+
if let Err(e) = &result {
462+
warn!("Failed to create stream '{}': {}", stream_name, e);
463+
}
464+
465+
(stream_name, result)
466+
});
467+
}
468+
469+
while let Some(result) = join_set.join_next().await {
470+
if let Err(join_error) = result {
471+
warn!("Task join error: {}", join_error);
445472
}
446473
}
474+
475+
Ok(())
447476
}
448477

449478
impl FromRequest for Query {

src/handlers/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
3030
const AUTHORIZATION_KEY: &str = "authorization";
3131
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3232
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
33-
const OIDC_SCOPE: &str = "openid profile email";
3433
const COOKIE_AGE_DAYS: usize = 7;
3534
const SESSION_COOKIE_NAME: &str = "session";
3635
const USER_COOKIE_NAME: &str = "username";

src/parseable/mod.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use once_cell::sync::Lazy;
3535
pub use staging::StagingError;
3636
use streams::StreamRef;
3737
pub use streams::{Stream, StreamNotFound, Streams};
38+
use tokio::try_join;
3839
use tracing::error;
3940

4041
#[cfg(feature = "kafka")]
@@ -270,17 +271,22 @@ impl Parseable {
270271
return Ok(false);
271272
}
272273

273-
let mut stream_metadata = ObjectStoreFormat::default();
274-
let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?;
275-
if !stream_metadata_bytes.is_empty() {
276-
stream_metadata = serde_json::from_slice::<ObjectStoreFormat>(&stream_metadata_bytes)?;
277-
}
274+
let (stream_metadata_bytes, schema_bytes) = try_join!(
275+
storage.create_stream_from_ingestor(stream_name),
276+
storage.create_schema_from_ingestor(stream_name)
277+
)?;
278278

279-
let mut schema = Arc::new(Schema::empty());
280-
let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?;
281-
if !schema_bytes.is_empty() {
282-
schema = serde_json::from_slice::<Arc<Schema>>(&schema_bytes)?;
283-
}
279+
let stream_metadata = if stream_metadata_bytes.is_empty() {
280+
ObjectStoreFormat::default()
281+
} else {
282+
serde_json::from_slice::<ObjectStoreFormat>(&stream_metadata_bytes)?
283+
};
284+
285+
let schema = if schema_bytes.is_empty() {
286+
Arc::new(Schema::empty())
287+
} else {
288+
serde_json::from_slice::<Arc<Schema>>(&schema_bytes)?
289+
};
284290

285291
let static_schema: HashMap<String, Arc<Field>> = schema
286292
.fields

0 commit comments

Comments
 (0)