@@ -35,11 +35,12 @@ use http::StatusCode;
35
35
use itertools:: Itertools ;
36
36
use serde:: { Deserialize , Serialize } ;
37
37
use serde_json:: { json, Value } ;
38
- use std:: collections:: HashMap ;
38
+ use std:: collections:: { HashMap , HashSet } ;
39
39
use std:: pin:: Pin ;
40
40
use std:: sync:: Arc ;
41
41
use std:: time:: Instant ;
42
- use tracing:: error;
42
+ use tokio:: task:: JoinSet ;
43
+ use tracing:: { error, warn} ;
43
44
44
45
use crate :: event:: commit_schema;
45
46
use crate :: metrics:: QUERY_EXECUTE_TIME ;
@@ -126,7 +127,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
126
127
{
127
128
Ok ( raw_logical_plan) => raw_logical_plan,
128
129
Err ( _) => {
129
- create_streams_for_querier ( ) . await ;
130
+ create_streams_for_querier ( ) . await ? ;
130
131
session_state
131
132
. create_logical_plan ( & query_request. query )
132
133
. await ?
@@ -433,17 +434,45 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
433
434
/// Create streams for querier if they do not exist
434
435
/// get list of streams from memory and storage
435
436
/// create streams for memory from storage if they do not exist
436
- pub async fn create_streams_for_querier ( ) {
437
- let querier_streams = PARSEABLE . streams . list ( ) ;
437
+ pub async fn create_streams_for_querier ( ) -> Result < ( ) , QueryError > {
438
438
let store = PARSEABLE . storage . get_object_store ( ) ;
439
- let storage_streams = store. list_streams ( ) . await . unwrap ( ) ;
440
- for stream_name in storage_streams {
441
- if !querier_streams. contains ( & stream_name) {
442
- let _ = PARSEABLE
439
+ let querier_streams = PARSEABLE . streams . list ( ) ;
440
+
441
+ let querier_streams_set: HashSet < _ > = querier_streams. into_iter ( ) . collect ( ) ;
442
+
443
+ let storage_streams = store. list_streams ( ) . await ?;
444
+
445
+ let missing_streams: Vec < _ > = storage_streams
446
+ . into_iter ( )
447
+ . filter ( |stream_name| !querier_streams_set. contains ( stream_name) )
448
+ . collect ( ) ;
449
+
450
+ if missing_streams. is_empty ( ) {
451
+ return Ok ( ( ) ) ;
452
+ }
453
+
454
+ let mut join_set = JoinSet :: new ( ) ;
455
+ for stream_name in missing_streams {
456
+ join_set. spawn ( async move {
457
+ let result = PARSEABLE
443
458
. create_stream_and_schema_from_storage ( & stream_name)
444
459
. await ;
460
+
461
+ if let Err ( e) = & result {
462
+ warn ! ( "Failed to create stream '{}': {}" , stream_name, e) ;
463
+ }
464
+
465
+ ( stream_name, result)
466
+ } ) ;
467
+ }
468
+
469
+ while let Some ( result) = join_set. join_next ( ) . await {
470
+ if let Err ( join_error) = result {
471
+ warn ! ( "Task join error: {}" , join_error) ;
445
472
}
446
473
}
474
+
475
+ Ok ( ( ) )
447
476
}
448
477
449
478
impl FromRequest for Query {
0 commit comments