Skip to content

Commit a883589

Browse files
fix: find tables from DFParser, schema merge when required (#1380)
find table list from DFParser create logical plan for the query if fails, create from storage, merge schema then create logical plan again
1 parent b99913f commit a883589

File tree

11 files changed

+149
-153
lines changed

11 files changed

+149
-153
lines changed

src/alerts/alerts_utils.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@ use datafusion::{
2929
logical_expr::{BinaryExpr, Literal, Operator},
3030
prelude::{col, lit, DataFrame, Expr},
3131
};
32-
use tracing::trace;
32+
use tokio::task::JoinSet;
33+
use tracing::{trace, warn};
3334

3435
use crate::{
35-
alerts::LogicalOperator, parseable::PARSEABLE, query::QUERY_SESSION, utils::time::TimeRange,
36+
alerts::LogicalOperator,
37+
handlers::http::query::update_schema_when_distributed,
38+
parseable::PARSEABLE,
39+
query::{resolve_stream_names, QUERY_SESSION},
40+
utils::time::TimeRange,
3641
};
3742

3843
use super::{
@@ -71,11 +76,37 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
7176

7277
let session_state = QUERY_SESSION.state();
7378
let select_query = alert.get_base_query();
74-
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
75-
7679
let time_range = TimeRange::parse_human_time(start_time, end_time)
7780
.map_err(|err| AlertError::CustomError(err.to_string()))?;
7881

82+
let streams = resolve_stream_names(&select_query)?;
83+
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
84+
Ok(plan) => plan,
85+
Err(_) => {
86+
let mut join_set = JoinSet::new();
87+
for stream_name in streams {
88+
let stream_name = stream_name.clone();
89+
join_set.spawn(async move {
90+
let result = PARSEABLE
91+
.create_stream_and_schema_from_storage(&stream_name)
92+
.await;
93+
94+
if let Err(e) = &result {
95+
warn!("Failed to create stream '{}': {}", stream_name, e);
96+
}
97+
98+
(stream_name, result)
99+
});
100+
}
101+
102+
while let Some(result) = join_set.join_next().await {
103+
if let Err(join_error) = result {
104+
warn!("Task join error: {}", join_error);
105+
}
106+
}
107+
session_state.create_logical_plan(&select_query).await?
108+
}
109+
};
79110
Ok(crate::query::Query {
80111
raw_logical_plan,
81112
time_range,
@@ -87,11 +118,18 @@ async fn execute_base_query(
87118
query: &crate::query::Query,
88119
original_query: &str,
89120
) -> Result<DataFrame, AlertError> {
90-
let stream_name = query.first_table_name().ok_or_else(|| {
121+
let streams = resolve_stream_names(original_query)?;
122+
let stream_name = streams.first().ok_or_else(|| {
91123
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
92124
})?;
93-
94-
let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
125+
update_schema_when_distributed(&streams)
126+
.await
127+
.map_err(|err| {
128+
AlertError::CustomError(format!(
129+
"Failed to update schema for distributed streams: {err}"
130+
))
131+
})?;
132+
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
95133
query
96134
.get_dataframe(time_partition.as_ref())
97135
.await

src/alerts/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use async_trait::async_trait;
2121
use chrono::Utc;
22+
use datafusion::sql::sqlparser::parser::ParserError;
2223
use derive_more::derive::FromStr;
2324
use derive_more::FromStrError;
2425
use http::StatusCode;
@@ -860,6 +861,8 @@ pub enum AlertError {
860861
InvalidTargetModification(String),
861862
#[error("Can't delete a Target which is being used")]
862863
TargetInUse,
864+
#[error("{0}")]
865+
ParserError(#[from] ParserError),
863866
}
864867

865868
impl actix_web::ResponseError for AlertError {
@@ -880,6 +883,7 @@ impl actix_web::ResponseError for AlertError {
880883
Self::InvalidTargetID(_) => StatusCode::BAD_REQUEST,
881884
Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST,
882885
Self::TargetInUse => StatusCode::CONFLICT,
886+
Self::ParserError(_) => StatusCode::BAD_REQUEST,
883887
}
884888
}
885889

src/correlation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ impl Correlations {
8787
.iter()
8888
.map(|t| t.table_name.clone())
8989
.collect_vec();
90-
if user_auth_for_datasets(&permissions, tables).is_ok() {
90+
if user_auth_for_datasets(&permissions, tables).await.is_ok() {
9191
user_correlations.push(correlation.clone());
9292
}
9393
}
@@ -281,7 +281,7 @@ impl CorrelationConfig {
281281
.map(|t| t.table_name.clone())
282282
.collect_vec();
283283

284-
user_auth_for_datasets(&permissions, tables)?;
284+
user_auth_for_datasets(&permissions, tables).await?;
285285

286286
// to validate table config, we need to check whether the mentioned fields
287287
// are present in the table or not

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), Stagi
123123

124124
let map = &mut stream_metadata
125125
.get_mut(stream_name)
126-
.expect("map has entry for this stream name")
126+
.ok_or_else(|| StagingError::NotFound(stream_name.to_string()))?
127127
.metadata
128128
.write()
129129
.expect(LOCK_EXPECT)

src/handlers/airplane.rs

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ use arrow_array::RecordBatch;
2020
use arrow_flight::flight_service_server::FlightServiceServer;
2121
use arrow_flight::PollInfo;
2222
use arrow_schema::ArrowError;
23-
24-
use datafusion::common::tree_node::TreeNode;
2523
use serde_json::json;
2624
use std::net::SocketAddr;
2725
use std::time::Instant;
@@ -35,11 +33,11 @@ use tonic_web::GrpcWebLayer;
3533

3634
use crate::handlers::http::cluster::get_node_info;
3735
use crate::handlers::http::modal::{NodeMetadata, NodeType};
38-
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
36+
use crate::handlers::http::query::into_query;
3937
use crate::handlers::livetail::cross_origin_config;
4038
use crate::metrics::QUERY_EXECUTE_TIME;
4139
use crate::parseable::PARSEABLE;
42-
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
40+
use crate::query::{execute, resolve_stream_names, QUERY_SESSION};
4341
use crate::utils::arrow::flight::{
4442
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
4543
send_to_ingester,
@@ -131,40 +129,26 @@ impl FlightService for AirServiceImpl {
131129

132130
let ticket =
133131
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;
134-
132+
let streams = resolve_stream_names(&ticket.query).map_err(|e| {
133+
error!("Failed to extract table names from SQL: {}", e);
134+
Status::invalid_argument("Invalid SQL query syntax")
135+
})?;
135136
info!("query requested to airplane: {:?}", ticket);
136137

137138
// get the query session_state
138139
let session_state = QUERY_SESSION.state();
139140

140-
// get the logical plan and extract the table name
141-
let raw_logical_plan = session_state
142-
.create_logical_plan(&ticket.query)
143-
.await
144-
.map_err(|err| {
145-
error!("Datafusion Error: Failed to create logical plan: {}", err);
146-
Status::internal("Failed to create logical plan")
147-
})?;
148-
149141
let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
150142
.map_err(|e| Status::internal(e.to_string()))?;
151143
// create a visitor to extract the table name
152-
let mut visitor = TableScanVisitor::default();
153-
let _ = raw_logical_plan.visit(&mut visitor);
154-
155-
let streams = visitor.into_inner();
156144

157145
let stream_name = streams
158146
.first()
159147
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
160148
.to_owned();
161149

162-
update_schema_when_distributed(&streams)
163-
.await
164-
.map_err(|err| Status::internal(err.to_string()))?;
165-
166150
// map payload to query
167-
let query = into_query(&ticket, &session_state, time_range)
151+
let query = into_query(&ticket, &session_state, time_range, &streams)
168152
.await
169153
.map_err(|_| Status::internal("Failed to parse query"))?;
170154

@@ -214,9 +198,11 @@ impl FlightService for AirServiceImpl {
214198

215199
let permissions = Users.get_permissions(&key);
216200

217-
user_auth_for_datasets(&permissions, &streams).map_err(|_| {
218-
Status::permission_denied("User Does not have permission to access this")
219-
})?;
201+
user_auth_for_datasets(&permissions, &streams)
202+
.await
203+
.map_err(|_| {
204+
Status::permission_denied("User Does not have permission to access this")
205+
})?;
220206
let time = Instant::now();
221207

222208
let (records, _) = execute(query, &stream_name, false)

src/handlers/http/correlation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub async fn get(
5454
.map(|t| t.table_name.clone())
5555
.collect_vec();
5656

57-
user_auth_for_datasets(&permissions, tables)?;
57+
user_auth_for_datasets(&permissions, tables).await?;
5858

5959
Ok(web::Json(correlation))
6060
}

0 commit comments

Comments
 (0)