Skip to content

Commit 13e1341

Browse files
committed
update
1 parent d647ea1 commit 13e1341

70 files changed

Lines changed: 4966 additions & 2910 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/coordinator/execution/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl PlanVisitor for Executor {
225225
) -> PlanVisitorResult {
226226
let result = (|| -> Result<ExecuteResult, ExecuteError> {
227227
let catalog_table =
228-
CatalogTable::ConnectorTable(plan.connector_table.clone());
228+
CatalogTable::ConnectorTable(plan.source_table.clone());
229229
let mut schema_provider = StreamSchemaProvider::new();
230230
schema_provider.insert_catalog_table(catalog_table.clone());
231231

src/coordinator/plan/logical_plan_visitor.rs

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,19 @@ use crate::coordinator::statement::{
3333
};
3434
use crate::coordinator::tool::ConnectorOptions;
3535
use crate::sql::logical_node::logical::{LogicalProgram, ProgramConfig};
36-
use crate::sql::logical_planner::optimizers::ChainingOptimizer;
36+
use crate::sql::logical_planner::optimizers::{ChainingOptimizer, produce_optimized_plan};
3737
use crate::sql::schema::Table;
38-
use crate::sql::schema::connector::ConnectionType;
39-
use crate::sql::schema::connector_table::ConnectorTable;
40-
use crate::sql::schema::field_spec::FieldSpec;
41-
use crate::sql::schema::optimizer::produce_optimized_plan;
38+
use crate::sql::schema::ConnectionType;
39+
use crate::sql::schema::source_table::SourceTable;
40+
use crate::sql::schema::ColumnDescriptor;
4241
use crate::sql::functions::{is_json_union, serialize_outgoing_json};
43-
use crate::sql::extensions::sink::SinkExtension;
42+
use crate::sql::extensions::sink::StreamEgressNode;
4443
use crate::sql::logical_planner::planner;
4544
use crate::sql::analysis::{StreamSchemaProvider, maybe_add_key_extension_to_sink, rewrite_sinks};
4645
use crate::sql::rewrite_plan;
4746

4847
const CONNECTOR: &str = "connector";
4948
const PARTITION_BY: &str = "partition_by";
50-
const IDLE_MICROS: &str = "idle_time";
5149

5250
fn with_options_to_map(options: &[SqlOption]) -> std::collections::HashMap<String, String> {
5351
options
@@ -108,6 +106,8 @@ impl LogicalPlanVisitor {
108106
)
109107
})?;
110108

109+
let partition_exprs = self.resolve_partition_expressions(&mut opts)?;
110+
111111
let base_plan =
112112
produce_optimized_plan(&Statement::Query(query.clone()), &self.schema_provider)?;
113113
let mut plan = rewrite_plan(base_plan, &self.schema_provider)?;
@@ -121,38 +121,33 @@ impl LogicalPlanVisitor {
121121
plan = serialize_outgoing_json(&self.schema_provider, Arc::new(plan));
122122
}
123123

124-
let partition_exprs = self.resolve_partition_expressions(&mut opts)?;
125-
126-
let fields: Vec<FieldSpec> = plan
124+
let fields: Vec<ColumnDescriptor> = plan
127125
.schema()
128126
.fields()
129127
.iter()
130-
.map(|f| FieldSpec::Struct((**f).clone()))
128+
.map(|f| ColumnDescriptor::from((**f).clone()))
131129
.collect();
132130

133-
let connector_table = ConnectorTable {
134-
id: None,
135-
connector,
136-
name: table_name.clone(),
137-
connection_type: ConnectionType::Sink,
131+
let mut source_table = SourceTable::from_options(
132+
&table_name,
133+
&connector,
134+
false,
138135
fields,
139-
config: "".to_string(),
140-
description: comment.clone().unwrap_or_default(),
141-
event_time_field: None,
142-
watermark_field: None,
143-
idle_time: opts.pull_opt_duration(IDLE_MICROS)?,
144-
primary_keys: Arc::new(vec![]),
145-
inferred_fields: None,
146-
partition_exprs: Arc::new(partition_exprs),
147-
lookup_cache_ttl:None,
148-
lookup_cache_max_bytes:None,
149-
};
136+
vec![],
137+
None,
138+
&mut opts,
139+
None,
140+
&self.schema_provider,
141+
Some(ConnectionType::Sink),
142+
comment.clone().unwrap_or_default(),
143+
)?;
144+
source_table.partition_exprs = Arc::new(partition_exprs);
150145

151-
let sink_extension = SinkExtension::new(
146+
let sink_extension = StreamEgressNode::try_new(
152147
TableReference::bare(table_name.clone()),
153-
Table::ConnectorTable(connector_table.clone()),
148+
Table::ConnectorTable(source_table.clone()),
154149
plan.schema().clone(),
155-
Arc::new(plan),
150+
plan,
156151
)?;
157152

158153
let plan_with_keys = maybe_add_key_extension_to_sink(LogicalPlan::Extension(Extension {
@@ -196,7 +191,7 @@ impl LogicalPlanVisitor {
196191
Ok(Box::new(StreamingTable {
197192
name: table_name,
198193
comment: comment.clone(),
199-
connector_table,
194+
source_table,
200195
logical_plan: final_plan,
201196
}))
202197
}
@@ -322,7 +317,7 @@ mod create_streaming_table_tests {
322317

323318
use crate::sql::common::TIMESTAMP_FIELD;
324319
use crate::sql::rewrite_plan;
325-
use crate::sql::schema::optimizer::produce_optimized_plan;
320+
use crate::sql::logical_planner::optimizers::produce_optimized_plan;
326321
use crate::sql::schema::StreamSchemaProvider;
327322

328323
fn schema_provider_with_src() -> StreamSchemaProvider {

src/coordinator/plan/lookup_table_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
use crate::sql::schema::connector_table::ConnectorTable;
13+
use crate::sql::schema::source_table::SourceTable;
1414

1515
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
1616

1717
/// Plan node that exposes a lookup table config as a logical plan input.
1818
#[derive(Debug)]
1919
pub struct LookupTablePlan {
20-
pub table: ConnectorTable,
20+
pub table: SourceTable,
2121
}
2222

2323
impl PlanNode for LookupTablePlan {

src/coordinator/plan/streaming_table_connector_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
use crate::sql::schema::connector_table::ConnectorTable;
13+
use crate::sql::schema::source_table::SourceTable;
1414

1515
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
1616

1717
/// Plan node that exposes a connector table config as a logical plan input.
1818
#[derive(Debug)]
1919
pub struct StreamingTableConnectorPlan {
20-
pub table: ConnectorTable,
20+
pub table: SourceTable,
2121
}
2222

2323
impl PlanNode for StreamingTableConnectorPlan {

src/coordinator/plan/streaming_table_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
// limitations under the License.
1212

1313
use super::{PlanNode, PlanVisitor, PlanVisitorContext, PlanVisitorResult};
14-
use crate::sql::schema::connector_table::ConnectorTable;
14+
use crate::sql::schema::source_table::SourceTable;
1515
use datafusion::logical_expr::LogicalPlan;
1616

1717
/// Plan node representing a fully resolved streaming table (DDL).
1818
#[derive(Debug)]
1919
pub struct StreamingTable {
2020
pub name: String,
2121
pub comment: Option<String>,
22-
pub connector_table: ConnectorTable,
22+
pub source_table: SourceTable,
2323
pub logical_plan: LogicalPlan,
2424
}
2525

src/coordinator/tool/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1 @@
1-
mod connector_options;
2-
3-
pub use connector_options::{ConnectorOptions, FromOpts};
1+
pub use crate::sql::common::ConnectorOptions;

src/sql/analysis/aggregate_rewriter.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use datafusion::common::tree_node::{Transformed, TreeNodeRewriter};
22
use datafusion::common::{DFSchema, DataFusionError, Result, not_impl_err, plan_err};
33
use datafusion::functions_aggregate::expr_fn::max;
4-
use datafusion::logical_expr::{self, Aggregate, Expr, Extension, LogicalPlan, Projection};
4+
use datafusion::logical_expr::{Aggregate, Expr, Extension, LogicalPlan, Projection};
55
use datafusion::prelude::col;
66
use std::sync::Arc;
77

88
use crate::sql::schema::StreamSchemaProvider;
9-
use crate::sql::extensions::aggregate::AggregateExtension;
10-
use crate::sql::extensions::key_calculation::{KeyCalculationExtension, KeysOrExprs};
9+
use crate::sql::extensions::aggregate::StreamWindowAggregateNode;
10+
use crate::sql::extensions::key_calculation::{KeyExtractionNode, KeyExtractionStrategy};
1111
use crate::sql::analysis::streaming_window_analzer::StreamingWindowAnalzer;
1212
use crate::sql::types::{
1313
DFField, TIMESTAMP_FIELD, WindowBehavior, WindowType, fields_with_qualifiers, find_window,
@@ -83,7 +83,7 @@ impl TreeNodeRewriter for AggregateRewriter<'_> {
8383
let keyed_input =
8484
self.build_keyed_input(agg.input.clone(), &agg.group_expr, &key_fields)?;
8585

86-
// 5. Build the final AggregateExtension for the physical planner.
86+
// 5. Build the final StreamWindowAggregateNode for the physical planner.
8787
let mut internal_fields = fields_with_qualifiers(&agg.schema);
8888
if let WindowBehavior::FromOperator { window_index, .. } = &behavior {
8989
internal_fields.remove(*window_index);
@@ -100,11 +100,11 @@ impl TreeNodeRewriter for AggregateRewriter<'_> {
100100
internal_schema,
101101
)?;
102102

103-
let extension = AggregateExtension::new(
103+
let extension = StreamWindowAggregateNode::try_new(
104104
behavior,
105105
LogicalPlan::Aggregate(rewritten_agg),
106106
(0..key_count).collect(),
107-
);
107+
)?;
108108

109109
Ok(Transformed::yes(LogicalPlan::Extension(Extension {
110110
node: Arc::new(extension),
@@ -118,7 +118,7 @@ impl<'a> AggregateRewriter<'a> {
118118
}
119119

120120
/// [Internal] Builds the physical Key Calculation layer required for distributed Shuffling.
121-
/// This wraps the input in a Projection and a KeyCalculationExtension.
121+
/// This wraps the input in a Projection and a KeyExtractionNode.
122122
fn build_keyed_input(
123123
&self,
124124
input: Arc<LogicalPlan>,
@@ -151,9 +151,9 @@ impl<'a> AggregateRewriter<'a> {
151151
LogicalPlan::Projection(Projection::try_new_with_schema(exprs, input, key_schema)?);
152152

153153
Ok(LogicalPlan::Extension(Extension {
154-
node: Arc::new(KeyCalculationExtension::new(
154+
node: Arc::new(KeyExtractionNode::new(
155155
projection,
156-
KeysOrExprs::Keys((0..key_count).collect()),
156+
KeyExtractionStrategy::ColumnIndices((0..key_count).collect()),
157157
)),
158158
}))
159159
}

src/sql/analysis/async_udf_rewriter.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use crate::sql::extensions::remote_table::RemoteTableExtension;
2-
use crate::sql::extensions::{ASYNC_RESULT_FIELD, AsyncUDFExtension};
1+
use crate::sql::extensions::remote_table::RemoteTableBoundaryNode;
2+
use crate::sql::extensions::{ASYNC_RESULT_FIELD, AsyncFunctionExecutionNode};
33
use crate::sql::schema::StreamSchemaProvider;
44
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
55
use datafusion::common::{Column, Result as DFResult, TableReference, plan_err};
@@ -92,28 +92,28 @@ impl TreeNodeRewriter for AsyncUdfRewriter<'_> {
9292

9393
let input = if matches!(*projection.input, LogicalPlan::Projection(..)) {
9494
Arc::new(LogicalPlan::Extension(Extension {
95-
node: Arc::new(RemoteTableExtension {
96-
input: (*projection.input).clone(),
97-
name: TableReference::bare("subquery_projection"),
98-
schema: projection.input.schema().clone(),
99-
materialize: false,
95+
node: Arc::new(RemoteTableBoundaryNode {
96+
upstream_plan: (*projection.input).clone(),
97+
table_identifier: TableReference::bare("subquery_projection"),
98+
resolved_schema: projection.input.schema().clone(),
99+
requires_materialization: false,
100100
}),
101101
}))
102102
} else {
103103
projection.input
104104
};
105105

106106
Ok(Transformed::yes(LogicalPlan::Extension(Extension {
107-
node: Arc::new(AsyncUDFExtension {
108-
input,
109-
name,
110-
udf,
111-
arg_exprs,
112-
final_exprs: projection.expr,
113-
ordered: opts.ordered,
114-
max_concurrency: opts.max_concurrency,
115-
timeout: opts.timeout,
116-
final_schema: projection.schema,
107+
node: Arc::new(AsyncFunctionExecutionNode {
108+
upstream_plan: input,
109+
operator_name: name,
110+
function_config: udf,
111+
invocation_args: arg_exprs,
112+
result_projections: projection.expr,
113+
preserve_ordering: opts.ordered,
114+
concurrency_limit: opts.max_concurrency,
115+
execution_timeout: opts.timeout,
116+
resolved_schema: projection.schema,
117117
}),
118118
})))
119119
}

src/sql/analysis/join_rewriter.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::sql::schema::StreamSchemaProvider;
2-
use crate::sql::extensions::join::JoinExtension;
3-
use crate::sql::extensions::key_calculation::KeyCalculationExtension;
2+
use crate::sql::extensions::join::StreamingJoinNode;
3+
use crate::sql::extensions::key_calculation::KeyExtractionNode;
44
use crate::sql::analysis::streaming_window_analzer::StreamingWindowAnalzer;
55
use crate::sql::types::{WindowType, fields_with_qualifiers, schema_from_df_fields_with_metadata};
66
use crate::sql::common::TIMESTAMP_FIELD;
@@ -62,7 +62,7 @@ impl<'a> JoinRewriter<'a> {
6262
}
6363
}
6464

65-
/// [Internal] Wraps a join input in a KeyCalculation layer to facilitate Shuffle/KeyBy distribution.
65+
/// [Internal] Wraps a join input in a key-extraction layer to facilitate shuffle / key-by distribution.
6666
fn build_keyed_side(
6767
&self,
6868
input: Arc<LogicalPlan>,
@@ -85,11 +85,11 @@ impl<'a> JoinRewriter<'a> {
8585
.collect();
8686

8787
let projection = Projection::try_new(projection_exprs, input)?;
88-
let key_ext = KeyCalculationExtension::new_named_and_trimmed(
88+
let key_ext = KeyExtractionNode::try_new_with_projection(
8989
LogicalPlan::Projection(projection),
9090
(0..key_count).collect(),
9191
side.to_string(),
92-
);
92+
)?;
9393

9494
Ok(LogicalPlan::Extension(Extension {
9595
node: Arc::new(key_ext),
@@ -209,13 +209,13 @@ impl TreeNodeRewriter for JoinRewriter<'_> {
209209
// 4. Resolve Output Watermark (Timestamp Projection)
210210
let plan_with_timestamp = self.apply_timestamp_resolution(rewritten_join)?;
211211

212-
// 5. Wrap in JoinExtension for Physical Planning
213-
let ttl = (!is_instant).then_some(self.schema_provider.planning_options.ttl);
214-
let extension = JoinExtension {
215-
rewritten_join: plan_with_timestamp,
212+
// 5. Wrap in StreamingJoinNode for physical planning
213+
let state_retention_ttl = (!is_instant).then_some(self.schema_provider.planning_options.ttl);
214+
let extension = StreamingJoinNode::new(
215+
plan_with_timestamp,
216216
is_instant,
217-
ttl,
218-
};
217+
state_retention_ttl,
218+
);
219219

220220
Ok(Transformed::yes(LogicalPlan::Extension(Extension {
221221
node: Arc::new(extension),

0 commit comments

Comments
 (0)