@@ -37,7 +37,7 @@ use crate::sql::catalog::field_spec::FieldSpec;
3737use crate :: sql:: catalog:: optimizer:: produce_optimized_plan;
3838use crate :: sql:: functions:: { is_json_union, serialize_outgoing_json} ;
3939use crate :: sql:: planner:: extension:: sink:: SinkExtension ;
40- use crate :: sql:: planner:: { StreamSchemaProvider , maybe_add_key_extension_to_sink} ;
40+ use crate :: sql:: planner:: { StreamSchemaProvider , maybe_add_key_extension_to_sink, rewrite_sinks } ;
4141use crate :: sql:: rewrite_plan;
4242
4343const CONNECTOR : & str = "connector" ;
@@ -78,98 +78,121 @@ impl LogicalPlanVisitor {
7878 _ => panic ! ( "LogicalPlanVisitor should return Plan" ) ,
7979 }
8080 }
81+ /// Builds the logical plan for 'CREATE STREAMING TABLE'.
82+ /// This orchestrates the transformation from a SQL Query to a stateful Sink.
8183 fn build_create_streaming_table_plan (
8284 & self ,
8385 stmt : & StreamingTableStatement ,
8486 ) -> Result < Box < dyn PlanNode > > {
85- let statement = & stmt. statement ;
86- match statement {
87- DFStatement :: CreateStreamingTable {
88- name,
89- with_options,
90- comment,
91- query,
92- } => {
93- let name_str = name. to_string ( ) ;
94-
95- let mut connector_opts = ConnectorOptions :: new ( with_options, & None ) ?;
96- let connector_type = connector_opts. pull_opt_str ( CONNECTOR ) ?. ok_or_else ( || {
97- plan_datafusion_err ! (
98- "Streaming Table '{}' must specify '{}' option" ,
99- name_str,
100- CONNECTOR
101- )
102- } ) ?;
103-
104- let synthetic_statement = Statement :: Query ( query. clone ( ) ) ;
105- let base_plan =
106- produce_optimized_plan ( & synthetic_statement, & self . schema_provider ) ?;
107-
108- let mut plan_rewrite = rewrite_plan ( base_plan, & self . schema_provider ) ?;
109-
110- if plan_rewrite
111- . schema ( )
112- . fields ( )
113- . iter ( )
114- . any ( |f| is_json_union ( f. data_type ( ) ) )
115- {
116- plan_rewrite =
117- serialize_outgoing_json ( & self . schema_provider , Arc :: new ( plan_rewrite) ) ;
118- }
119-
120- let fields: Vec < FieldSpec > = plan_rewrite
121- . schema ( )
122- . fields ( )
123- . iter ( )
124- . map ( |f| FieldSpec :: Struct ( ( * * f) . clone ( ) ) )
125- . collect ( ) ;
126-
127- let partition_exprs =
128- if let Some ( partition_cols) = connector_opts. pull_opt_str ( PARTITION_BY ) ? {
129- let cols: Vec < Expr > =
130- partition_cols. split ( ',' ) . map ( |c| col ( c. trim ( ) ) ) . collect ( ) ;
131- Some ( cols)
132- } else {
133- None
134- } ;
135-
136- let connector_table = ConnectorTable {
137- id : None ,
138- connector : connector_type,
139- name : name_str. clone ( ) ,
140- connection_type : ConnectionType :: Sink ,
141- fields,
142- config : "" . to_string ( ) ,
143- description : comment. clone ( ) . unwrap_or_default ( ) ,
144- event_time_field : None ,
145- watermark_field : None ,
146- idle_time : connector_opts. pull_opt_duration ( IDLE_MICROS ) ?,
147- primary_keys : Arc :: new ( vec ! [ ] ) ,
148- inferred_fields : None ,
149- partition_exprs : Arc :: new ( partition_exprs) ,
150- } ;
151-
152- let sink_extension = SinkExtension :: new (
153- TableReference :: bare ( name_str. clone ( ) ) ,
154- Table :: ConnectorTable ( connector_table. clone ( ) ) ,
155- plan_rewrite. schema ( ) . clone ( ) ,
156- Arc :: new ( plan_rewrite) ,
157- ) ?;
158-
159- let final_plan =
160- maybe_add_key_extension_to_sink ( LogicalPlan :: Extension ( Extension {
161- node : Arc :: new ( sink_extension) ,
162- } ) ) ?;
163-
164- Ok ( Box :: new ( StreamingTable {
165- name : name_str,
166- comment : comment. clone ( ) ,
167- connector_table,
168- logical_plan : final_plan,
169- } ) )
170- }
171- _ => plan_err ! ( "Only CREATE STREAMING TABLE supported" ) ,
87+ let DFStatement :: CreateStreamingTable {
88+ name,
89+ with_options,
90+ comment,
91+ query,
92+ } = & stmt. statement
93+ else {
94+ return plan_err ! ( "Only CREATE STREAMING TABLE is supported in this context" ) ;
95+ } ;
96+
97+ let table_name = name. to_string ( ) ;
98+ debug ! ( "Compiling Streaming Table Sink for: {}" , table_name) ;
99+
100+ // 1. Connector Options Extraction
101+ // Extract 'connector' (Kafka, Postgres, etc.) and other physical properties.
102+ let mut opts = ConnectorOptions :: new ( with_options, & None ) ?;
103+ let connector = opts. pull_opt_str ( CONNECTOR ) ?. ok_or_else ( || {
104+ plan_datafusion_err ! (
105+ "Streaming Table '{}' must specify the '{}' option" ,
106+ table_name,
107+ CONNECTOR
108+ )
109+ } ) ?;
110+
111+ // 2. Query Optimization & Streaming Rewrite
112+ // Convert the standard SQL query into a streaming-aware logical plan.
113+ let base_plan =
114+ produce_optimized_plan ( & Statement :: Query ( query. clone ( ) ) , & self . schema_provider ) ?;
115+ let mut plan = rewrite_plan ( base_plan, & self . schema_provider ) ?;
116+
117+ // 3. Outgoing Data Serialization
118+ // If the query produces internal types (like JSON Union), inject a serialization layer.
119+ if plan
120+ . schema ( )
121+ . fields ( )
122+ . iter ( )
123+ . any ( |f| is_json_union ( f. data_type ( ) ) )
124+ {
125+ plan = serialize_outgoing_json ( & self . schema_provider , Arc :: new ( plan) ) ;
172126 }
127+
128+ // 4. Sink Metadata & Partitioning Logic
129+ // Determine how data should be partitioned before hitting the external system.
130+ let partition_exprs = self . resolve_partition_expressions ( & mut opts) ?;
131+
132+ // Map DataFusion fields to Arroyo FieldSpecs for the connector.
133+ let fields: Vec < FieldSpec > = plan
134+ . schema ( )
135+ . fields ( )
136+ . iter ( )
137+ . map ( |f| FieldSpec :: Struct ( ( * * f) . clone ( ) ) )
138+ . collect ( ) ;
139+
140+ // 5. Connector Table Construction
141+ // This object acts as the 'Identity Card' for the Sink in the physical cluster.
142+ let connector_table = ConnectorTable {
143+ id : None ,
144+ connector,
145+ name : table_name. clone ( ) ,
146+ connection_type : ConnectionType :: Sink ,
147+ fields,
148+ config : "" . to_string ( ) , // Filled by the coordinator later
149+ description : comment. clone ( ) . unwrap_or_default ( ) ,
150+ event_time_field : None ,
151+ watermark_field : None ,
152+ idle_time : opts. pull_opt_duration ( IDLE_MICROS ) ?,
153+ primary_keys : Arc :: new ( vec ! [ ] ) , // PKs are inferred or explicitly set here
154+ inferred_fields : None ,
155+ partition_exprs : Arc :: new ( partition_exprs) ,
156+ } ;
157+
158+ // 6. Sink Extension & Final Rewrites
159+ // Wrap the plan in a SinkExtension and ensure Key/Partition alignment.
160+ let sink_extension = SinkExtension :: new (
161+ TableReference :: bare ( table_name. clone ( ) ) ,
162+ Table :: ConnectorTable ( connector_table. clone ( ) ) ,
163+ plan. schema ( ) . clone ( ) ,
164+ Arc :: new ( plan) ,
165+ ) ?;
166+
167+ // Ensure the data distribution matches the Sink's requirements (e.g., Shuffle by Partition Key)
168+ let plan_with_keys = maybe_add_key_extension_to_sink ( LogicalPlan :: Extension ( Extension {
169+ node : Arc :: new ( sink_extension) ,
170+ } ) ) ?;
171+
172+ // Global pass to wire inputs and handle shared sub-plans
173+ let final_extensions = rewrite_sinks ( vec ! [ plan_with_keys] ) ?;
174+ let final_plan = final_extensions. into_iter ( ) . next ( ) . unwrap ( ) ;
175+
176+ Ok ( Box :: new ( StreamingTable {
177+ name : table_name,
178+ comment : comment. clone ( ) ,
179+ connector_table,
180+ logical_plan : final_plan,
181+ } ) )
182+ }
183+
184+ fn resolve_partition_expressions (
185+ & self ,
186+ opts : & mut ConnectorOptions ,
187+ ) -> Result < Option < Vec < Expr > > > {
188+ opts. pull_opt_str ( PARTITION_BY ) ?
189+ . map ( |cols| {
190+ cols. split ( ',' )
191+ . map ( |c| col ( c. trim ( ) ) )
192+ . collect :: < Vec < Expr > > ( )
193+ } )
194+ . map ( Ok )
195+ . transpose ( )
173196 }
174197}
175198
0 commit comments