1010// See the License for the specific language governing permissions and
1111// limitations under the License.
1212
13- use std:: collections:: HashSet ;
1413use std:: sync:: Arc ;
1514use std:: time:: Duration ;
1615
@@ -25,7 +24,6 @@ use crate::sql::schema::source_table::SourceTable;
2524use crate :: sql:: schema:: ColumnDescriptor ;
2625use crate :: sql:: schema:: table:: Table ;
2726use crate :: sql:: schema:: StreamSchemaProvider ;
28- use crate :: sql:: common:: constants:: sql_field;
2927use crate :: sql:: common:: UPDATING_META_FIELD ;
3028use crate :: sql:: extensions:: debezium:: UnrollDebeziumPayloadNode ;
3129use crate :: sql:: extensions:: remote_table:: RemoteTableBoundaryNode ;
@@ -48,35 +46,6 @@ impl<'a> SourceRewriter<'a> {
4846}
4947
5048impl SourceRewriter < ' _ > {
51- /// Output column names after stream-catalog source projection (physical fields plus optional
52- /// `_timestamp` alias when event time is renamed).
53- fn stream_source_projected_column_names (
54- schema : & datafusion:: arrow:: datatypes:: Schema ,
55- event_time_field : Option < & str > ,
56- ) -> HashSet < String > {
57- let mut names: HashSet < String > =
58- schema. fields ( ) . iter ( ) . map ( |f| f. name ( ) . clone ( ) ) . collect ( ) ;
59- if let Some ( et) = event_time_field {
60- if et != TIMESTAMP_FIELD {
61- names. insert ( TIMESTAMP_FIELD . to_string ( ) ) ;
62- }
63- }
64- names
65- }
66-
67- /// Resolves watermark column for [`StreamTable::Source`]: drop computed `__watermark` and any
68- /// name not present in the projected schema (defaults to `_timestamp` − delay).
69- fn stream_source_effective_watermark_field < ' b > (
70- watermark_field : Option < & ' b str > ,
71- projected : & HashSet < String > ,
72- ) -> Option < & ' b str > {
73- let w = watermark_field?;
74- if w == sql_field:: COMPUTED_WATERMARK {
75- return None ;
76- }
77- projected. contains ( w) . then_some ( w)
78- }
79-
8049 fn projection_expr_for_column ( col : & ColumnDescriptor , qualifier : & TableReference ) -> Expr {
8150 if let Some ( logic) = col. computation_logic ( ) {
8251 logic
0 commit comments