Skip to content

Commit 67b65a9

Browse files
committed
update
1 parent 29b19d9 commit 67b65a9

23 files changed

Lines changed: 1024 additions & 728 deletions

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/coordinator/analyze/analyzer.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
use super::Analysis;
1414
use crate::coordinator::execution_context::ExecutionContext;
1515
use crate::coordinator::statement::{
16-
CreateFunction, CreatePythonFunction, CreateTable, DropFunction, InsertStatement,
17-
ShowFunctions, StartFunction, Statement, StatementVisitor, StatementVisitorContext,
18-
StatementVisitorResult, StopFunction,
16+
CreateFunction, CreatePythonFunction, CreateTable, DropFunction, ShowFunctions, StartFunction,
17+
Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult, StopFunction,
18+
StreamingTableStatement,
1919
};
2020
use std::fmt;
2121

@@ -125,11 +125,13 @@ impl StatementVisitor for Analyzer<'_> {
125125
StatementVisitorResult::Analyze(Box::new(CreateTable::new(stmt.statement.clone())))
126126
}
127127

128-
fn visit_insert_statement(
128+
fn visit_streaming_table_statement(
129129
&self,
130-
stmt: &InsertStatement,
130+
stmt: &StreamingTableStatement,
131131
_context: &StatementVisitorContext,
132132
) -> StatementVisitorResult {
133-
StatementVisitorResult::Analyze(Box::new(InsertStatement::new(stmt.statement.clone())))
133+
StatementVisitorResult::Analyze(Box::new(StreamingTableStatement::new(
134+
stmt.statement.clone(),
135+
)))
134136
}
135137
}

src/coordinator/execution/executor.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
use crate::coordinator::dataset::{ExecuteResult, ShowFunctionsResult, empty_record_batch};
1414
use crate::coordinator::plan::{
1515
CreateFunctionPlan, CreatePythonFunctionPlan, CreateTablePlan, DropFunctionPlan,
16-
InsertStatementPlan, PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult,
17-
ShowFunctionsPlan, StartFunctionPlan, StopFunctionPlan,
16+
LookupTablePlan, PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult,
17+
ShowFunctionsPlan, StartFunctionPlan, StopFunctionPlan, StreamingTable,
18+
StreamingTableConnectorPlan,
1819
};
1920
use crate::coordinator::statement::{ConfigSource, FunctionSource};
2021
use crate::runtime::taskexecutor::TaskManager;
@@ -215,17 +216,36 @@ impl PlanVisitor for Executor {
215216
PlanVisitorResult::Execute(result)
216217
}
217218

218-
fn visit_insert_statement_plan(
219+
fn visit_streaming_table(
219220
&self,
220-
plan: &InsertStatementPlan,
221+
_plan: &StreamingTable,
221222
_context: &PlanVisitorContext,
222223
) -> PlanVisitorResult {
223-
// TODO: start streaming pipeline for INSERT / anonymous query
224-
let result = Err(ExecuteError::Internal(format!(
225-
"INSERT statement execution not yet implemented. Program graph has {} node(s), {} connection(s)",
226-
plan.program.graph.node_count(),
227-
plan.connection_ids.len(),
228-
)));
224+
let result = Err(ExecuteError::Internal(
225+
"StreamingTable execution not yet implemented".to_string(),
226+
));
227+
PlanVisitorResult::Execute(result)
228+
}
229+
230+
fn visit_lookup_table(
231+
&self,
232+
_plan: &LookupTablePlan,
233+
_context: &PlanVisitorContext,
234+
) -> PlanVisitorResult {
235+
let result = Err(ExecuteError::Internal(
236+
"LookupTable execution not yet implemented".to_string(),
237+
));
238+
PlanVisitorResult::Execute(result)
239+
}
240+
241+
fn visit_streaming_connector_table(
242+
&self,
243+
_plan: &StreamingTableConnectorPlan,
244+
_context: &PlanVisitorContext,
245+
) -> PlanVisitorResult {
246+
let result = Err(ExecuteError::Internal(
247+
"StreamingTableConnector execution not yet implemented".to_string(),
248+
));
229249
PlanVisitorResult::Execute(result)
230250
}
231251
}

src/coordinator/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ mod execution;
1818
mod execution_context;
1919
mod plan;
2020
mod statement;
21+
mod tool;
2122

2223
pub use coordinator::Coordinator;
2324
pub use dataset::{DataSet, ShowFunctionsResult};
2425
pub use statement::{
25-
CreateFunction, CreatePythonFunction, CreateTable, DropFunction, InsertStatement, PythonModule,
26-
ShowFunctions, StartFunction, Statement, StopFunction,
26+
CreateFunction, CreatePythonFunction, CreateTable, DropFunction, PythonModule, ShowFunctions,
27+
StartFunction, Statement, StopFunction, StreamingTableStatement,
2728
};

0 commit comments

Comments
 (0)