Skip to content

Commit d647ea1

Browse files
committed
update
1 parent 94879da commit d647ea1

8 files changed

Lines changed: 1370 additions & 61 deletions

File tree

src/coordinator/coordinator.rs

Lines changed: 373 additions & 5 deletions
Large diffs are not rendered by default.

src/coordinator/plan/logical_plan_visitor.rs

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ const CONNECTOR: &str = "connector";
4949
const PARTITION_BY: &str = "partition_by";
5050
const IDLE_MICROS: &str = "idle_time";
5151

52-
/// Convert `WITH` option list to a key-value map (e.g. connector settings).
5352
fn with_options_to_map(options: &[SqlOption]) -> std::collections::HashMap<String, String> {
5453
options
5554
.iter()
@@ -83,8 +82,6 @@ impl LogicalPlanVisitor {
8382
_ => panic!("LogicalPlanVisitor should return Plan"),
8483
}
8584
}
86-
/// Builds the logical plan for 'CREATE STREAMING TABLE'.
87-
/// This orchestrates the transformation from a SQL Query to a stateful Sink.
8885
fn build_create_streaming_table_plan(
8986
&self,
9087
stmt: &StreamingTableStatement,
@@ -102,8 +99,6 @@ impl LogicalPlanVisitor {
10299
let table_name = name.to_string();
103100
debug!("Compiling Streaming Table Sink for: {}", table_name);
104101

105-
// 1. Connector Options Extraction
106-
// Extract 'connector' (Kafka, Postgres, etc.) and other physical properties.
107102
let mut opts = ConnectorOptions::new(with_options, &None)?;
108103
let connector = opts.pull_opt_str(CONNECTOR)?.ok_or_else(|| {
109104
plan_datafusion_err!(
@@ -113,14 +108,10 @@ impl LogicalPlanVisitor {
113108
)
114109
})?;
115110

116-
// 2. Query Optimization & Streaming Rewrite
117-
// Convert the standard SQL query into a streaming-aware logical plan.
118111
let base_plan =
119112
produce_optimized_plan(&Statement::Query(query.clone()), &self.schema_provider)?;
120113
let mut plan = rewrite_plan(base_plan, &self.schema_provider)?;
121114

122-
// 3. Outgoing Data Serialization
123-
// If the query produces internal types (like JSON Union), inject a serialization layer.
124115
if plan
125116
.schema()
126117
.fields()
@@ -130,53 +121,44 @@ impl LogicalPlanVisitor {
130121
plan = serialize_outgoing_json(&self.schema_provider, Arc::new(plan));
131122
}
132123

133-
// 4. Sink Metadata & Partitioning Logic
134-
// Determine how data should be partitioned before hitting the external system.
135124
let partition_exprs = self.resolve_partition_expressions(&mut opts)?;
136125

137-
// Map DataFusion fields to Arroyo FieldSpecs for the connector.
138126
let fields: Vec<FieldSpec> = plan
139127
.schema()
140128
.fields()
141129
.iter()
142130
.map(|f| FieldSpec::Struct((**f).clone()))
143131
.collect();
144132

145-
// 5. Connector Table Construction
146-
// This object acts as the 'Identity Card' for the Sink in the physical cluster.
147133
let connector_table = ConnectorTable {
148134
id: None,
149135
connector,
150136
name: table_name.clone(),
151137
connection_type: ConnectionType::Sink,
152138
fields,
153-
config: "".to_string(), // Filled by the coordinator later
139+
config: "".to_string(),
154140
description: comment.clone().unwrap_or_default(),
155141
event_time_field: None,
156142
watermark_field: None,
157143
idle_time: opts.pull_opt_duration(IDLE_MICROS)?,
158-
primary_keys: Arc::new(vec![]), // PKs are inferred or explicitly set here
144+
primary_keys: Arc::new(vec![]),
159145
inferred_fields: None,
160146
partition_exprs: Arc::new(partition_exprs),
161147
lookup_cache_ttl:None,
162148
lookup_cache_max_bytes:None,
163149
};
164150

165-
// 6. Sink Extension & Final Rewrites
166-
// Wrap the plan in a SinkExtension and ensure Key/Partition alignment.
167151
let sink_extension = SinkExtension::new(
168152
TableReference::bare(table_name.clone()),
169153
Table::ConnectorTable(connector_table.clone()),
170154
plan.schema().clone(),
171155
Arc::new(plan),
172156
)?;
173157

174-
// Ensure the data distribution matches the Sink's requirements (e.g., Shuffle by Partition Key)
175158
let plan_with_keys = maybe_add_key_extension_to_sink(LogicalPlan::Extension(Extension {
176159
node: Arc::new(sink_extension),
177160
}))?;
178161

179-
// Global pass to wire inputs and handle shared sub-plans
180162
let final_extensions = rewrite_sinks(vec![plan_with_keys])?;
181163
let final_plan = final_extensions.into_iter().next().unwrap();
182164

@@ -328,3 +310,57 @@ impl StatementVisitor for LogicalPlanVisitor {
328310
}
329311
}
330312
}
313+
314+
#[cfg(test)]
315+
mod create_streaming_table_tests {
316+
use std::sync::Arc;
317+
318+
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
319+
use datafusion::sql::sqlparser::ast::Statement as DFStatement;
320+
use datafusion::sql::sqlparser::dialect::FunctionStreamDialect;
321+
use datafusion::sql::sqlparser::parser::Parser;
322+
323+
use crate::sql::common::TIMESTAMP_FIELD;
324+
use crate::sql::rewrite_plan;
325+
use crate::sql::schema::optimizer::produce_optimized_plan;
326+
use crate::sql::schema::StreamSchemaProvider;
327+
328+
fn schema_provider_with_src() -> StreamSchemaProvider {
329+
let mut provider = StreamSchemaProvider::new();
330+
let schema = Arc::new(Schema::new(vec![
331+
Field::new("id", DataType::Int64, false),
332+
Field::new(
333+
TIMESTAMP_FIELD,
334+
DataType::Timestamp(TimeUnit::Nanosecond, None),
335+
false,
336+
),
337+
]));
338+
provider.add_source_table(
339+
"src".to_string(),
340+
schema,
341+
Some(TIMESTAMP_FIELD.to_string()),
342+
None,
343+
);
344+
provider
345+
}
346+
347+
#[test]
348+
fn create_streaming_table_query_plans_and_rewrites() {
349+
let sql =
350+
"CREATE STREAMING TABLE my_sink WITH ('connector' = 'kafka') AS SELECT * FROM src";
351+
let dialect = FunctionStreamDialect {};
352+
let ast = Parser::parse_sql(&dialect, sql).expect("parse CREATE STREAMING TABLE");
353+
let DFStatement::CreateStreamingTable { query, .. } = &ast[0] else {
354+
panic!("expected CreateStreamingTable, got {:?}", ast[0]);
355+
};
356+
let provider = schema_provider_with_src();
357+
let base = produce_optimized_plan(&DFStatement::Query(query.clone()), &provider)
358+
.expect("produce optimized logical plan for sink query");
359+
let rewritten = rewrite_plan(base, &provider).expect("streaming rewrite_plan");
360+
let dot = format!("{}", rewritten.display_graphviz());
361+
assert!(
362+
dot.contains("src") || dot.contains("Src"),
363+
"rewritten plan should reference source; got subgraph:\n{dot}"
364+
);
365+
}
366+
}

src/coordinator/statement/streaming_table.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,10 @@ use datafusion::sql::sqlparser::ast::Statement as DFStatement;
1414

1515
use super::{Statement, StatementVisitor, StatementVisitorContext, StatementVisitorResult};
1616

17-
/// Represents an INSERT INTO or standalone SELECT/query that creates a streaming table/pipeline.
17+
/// Wrapper for **`CREATE STREAMING TABLE ... WITH (...) AS SELECT ...`** (parsed AST).
1818
///
19-
/// In the streaming SQL context, both INSERT INTO (writing to a sink)
20-
/// and standalone SELECT (anonymous computation) are treated as
21-
/// data-producing operations that create/feed into the streaming pipeline.
19+
/// The coordinator `parse_sql` frontend does **not** support `INSERT`; streaming sinks are
20+
/// defined only via **`CREATE STREAMING TABLE`** (and regular tables via **`CREATE TABLE`**).
2221
#[derive(Debug)]
2322
pub struct StreamingTableStatement {
2423
pub statement: DFStatement,

src/sql/extensions/remote_table.rs

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,67 @@ pub(crate) struct RemoteTableExtension {
2828

2929
multifield_partial_ord!(RemoteTableExtension, input, name, materialize);
3030

31+
impl RemoteTableExtension {
32+
fn plan_node_inlined(
33+
planner: &Planner,
34+
index: usize,
35+
this: &RemoteTableExtension,
36+
) -> Result<NodeWithIncomingEdges> {
37+
let physical_plan = planner.sync_plan(&this.input)?;
38+
let physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
39+
physical_plan,
40+
&FsPhysicalExtensionCodec::default(),
41+
)?;
42+
let config = ValuePlanOperator {
43+
name: format!("value_calculation({})", this.name),
44+
physical_plan: physical_plan_node.encode_to_vec(),
45+
};
46+
let node = LogicalNode::single(
47+
index as u32,
48+
format!("value_{index}"),
49+
OperatorName::ArrowValue,
50+
config.encode_to_vec(),
51+
this.name.to_string(),
52+
1,
53+
);
54+
Ok(NodeWithIncomingEdges {
55+
node,
56+
edges: vec![],
57+
})
58+
}
59+
60+
fn plan_node_with_edges(
61+
planner: &Planner,
62+
index: usize,
63+
this: &RemoteTableExtension,
64+
input_schemas: Vec<FsSchemaRef>,
65+
) -> Result<NodeWithIncomingEdges> {
66+
let physical_plan = planner.sync_plan(&this.input)?;
67+
let physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
68+
physical_plan,
69+
&FsPhysicalExtensionCodec::default(),
70+
)?;
71+
let config = ValuePlanOperator {
72+
name: format!("value_calculation({})", this.name),
73+
physical_plan: physical_plan_node.encode_to_vec(),
74+
};
75+
let node = LogicalNode::single(
76+
index as u32,
77+
format!("value_{index}"),
78+
OperatorName::ArrowValue,
79+
config.encode_to_vec(),
80+
this.name.to_string(),
81+
1,
82+
);
83+
84+
let edges = input_schemas
85+
.into_iter()
86+
.map(|schema| LogicalEdge::project_all(LogicalEdgeType::Forward, (*schema).clone()))
87+
.collect();
88+
Ok(NodeWithIncomingEdges { node, edges })
89+
}
90+
}
91+
3192
impl StreamExtension for RemoteTableExtension {
3293
fn node_name(&self) -> Option<NamedNode> {
3394
if self.materialize {
@@ -44,10 +105,11 @@ impl StreamExtension for RemoteTableExtension {
44105
input_schemas: Vec<FsSchemaRef>,
45106
) -> Result<NodeWithIncomingEdges> {
46107
match input_schemas.len() {
47-
0 => return plan_err!("RemoteTableExtension should have exactly one input"),
108+
0 => {
109+
return Self::plan_node_inlined(planner, index, self);
110+
}
48111
1 => {}
49112
_multiple_inputs => {
50-
// check they are all the same
51113
let first = input_schemas[0].clone();
52114
for schema in input_schemas.iter().skip(1) {
53115
if *schema != first {
@@ -58,29 +120,7 @@ impl StreamExtension for RemoteTableExtension {
58120
}
59121
}
60122
}
61-
let physical_plan = planner.sync_plan(&self.input)?;
62-
let physical_plan_node = PhysicalPlanNode::try_from_physical_plan(
63-
physical_plan,
64-
&FsPhysicalExtensionCodec::default(),
65-
)?;
66-
let config = ValuePlanOperator {
67-
name: format!("value_calculation({})", self.name),
68-
physical_plan: physical_plan_node.encode_to_vec(),
69-
};
70-
let node = LogicalNode::single(
71-
index as u32,
72-
format!("value_{index}"),
73-
OperatorName::ArrowValue,
74-
config.encode_to_vec(),
75-
self.name.to_string(),
76-
1,
77-
);
78-
79-
let edges = input_schemas
80-
.into_iter()
81-
.map(|schema| LogicalEdge::project_all(LogicalEdgeType::Forward, (*schema).clone()))
82-
.collect();
83-
Ok(NodeWithIncomingEdges { node, edges })
123+
Self::plan_node_with_edges(planner, index, self, input_schemas)
84124
}
85125

86126
fn output_schema(&self) -> FsSchema {

0 commit comments

Comments
 (0)