1313use std:: sync:: Arc ;
1414
1515use datafusion:: common:: { Result , plan_datafusion_err, plan_err} ;
16+ use datafusion:: execution:: SessionStateBuilder ;
1617use datafusion:: sql:: sqlparser:: ast:: { SqlOption , Statement as DFStatement } ;
1718use datafusion_common:: TableReference ;
19+ use datafusion_execution:: config:: SessionConfig ;
1820use datafusion_expr:: { Expr , Extension , LogicalPlan , col} ;
1921use sqlparser:: ast:: Statement ;
2022use tracing:: debug;
@@ -30,21 +32,24 @@ use crate::coordinator::statement::{
3032 StreamingTableStatement ,
3133} ;
3234use crate :: coordinator:: tool:: ConnectorOptions ;
33- use crate :: sql:: catalog:: Table ;
34- use crate :: sql:: catalog:: connector:: ConnectionType ;
35- use crate :: sql:: catalog:: connector_table:: ConnectorTable ;
36- use crate :: sql:: catalog:: field_spec:: FieldSpec ;
37- use crate :: sql:: catalog:: optimizer:: produce_optimized_plan;
35+ use crate :: sql:: logical_node:: logical:: { LogicalProgram , ProgramConfig } ;
36+ use crate :: sql:: logical_planner:: optimizers:: ChainingOptimizer ;
37+ use crate :: sql:: schema:: Table ;
38+ use crate :: sql:: schema:: connector:: ConnectionType ;
39+ use crate :: sql:: schema:: connector_table:: ConnectorTable ;
40+ use crate :: sql:: schema:: field_spec:: FieldSpec ;
41+ use crate :: sql:: schema:: optimizer:: produce_optimized_plan;
3842use crate :: sql:: functions:: { is_json_union, serialize_outgoing_json} ;
39- use crate :: sql:: planner:: extension:: sink:: SinkExtension ;
40- use crate :: sql:: planner:: { StreamSchemaProvider , maybe_add_key_extension_to_sink, rewrite_sinks} ;
43+ use crate :: sql:: extensions:: sink:: SinkExtension ;
44+ use crate :: sql:: logical_planner:: planner;
45+ use crate :: sql:: analysis:: { StreamSchemaProvider , maybe_add_key_extension_to_sink, rewrite_sinks} ;
4146use crate :: sql:: rewrite_plan;
4247
4348const CONNECTOR : & str = "connector" ;
4449const PARTITION_BY : & str = "partition_by" ;
4550const IDLE_MICROS : & str = "idle_time" ;
4651
47- /// 将 WITH 选项列表转为 key-value map,便于读取 connector 等配置。
52+ /// Convert ` WITH` option list to a key-value map (e.g. connector settings).
4853fn with_options_to_map ( options : & [ SqlOption ] ) -> std:: collections:: HashMap < String , String > {
4954 options
5055 . iter ( )
@@ -153,6 +158,8 @@ impl LogicalPlanVisitor {
153158 primary_keys : Arc :: new ( vec ! [ ] ) , // PKs are inferred or explicitly set here
154159 inferred_fields : None ,
155160 partition_exprs : Arc :: new ( partition_exprs) ,
161+ lookup_cache_ttl : None ,
162+ lookup_cache_max_bytes : None ,
156163 } ;
157164
158165 // 6. Sink Extension & Final Rewrites
@@ -173,6 +180,37 @@ impl LogicalPlanVisitor {
173180 let final_extensions = rewrite_sinks ( vec ! [ plan_with_keys] ) ?;
174181 let final_plan = final_extensions. into_iter ( ) . next ( ) . unwrap ( ) ;
175182
183+
184+
185+ let mut config = SessionConfig :: new ( ) ;
186+ config
187+ . options_mut ( )
188+ . optimizer
189+ . enable_round_robin_repartition = false ;
190+ config. options_mut ( ) . optimizer . repartition_aggregations = false ;
191+ config. options_mut ( ) . optimizer . repartition_windows = false ;
192+ config. options_mut ( ) . optimizer . repartition_sorts = false ;
193+ config. options_mut ( ) . optimizer . repartition_joins = false ;
194+ config. options_mut ( ) . execution . target_partitions = 1 ;
195+
196+ let session_state = SessionStateBuilder :: new ( )
197+ . with_config ( config)
198+ . with_default_features ( )
199+ . with_physical_optimizer_rules ( vec ! [ ] )
200+ . build ( ) ;
201+
202+ let mut plan_to_graph_visitor =
203+ planner:: PlanToGraphVisitor :: new ( & self . schema_provider , & session_state) ;
204+
205+ plan_to_graph_visitor. add_plan ( final_plan. clone ( ) ) ?;
206+
207+ let graph = plan_to_graph_visitor. into_graph ( ) ;
208+
209+ let mut program = LogicalProgram :: new ( graph, ProgramConfig :: default ( ) ) ;
210+
211+ program. optimize ( & ChainingOptimizer { } ) ;
212+
213+
176214 Ok ( Box :: new ( StreamingTable {
177215 name : table_name,
178216 comment : comment. clone ( ) ,
0 commit comments