2323import org .apache .flink .table .planner .plan .nodes .physical .stream .StreamPhysicalChangelogNormalize ;
2424import org .apache .flink .table .planner .plan .optimize .program .FlinkOptimizeProgram ;
2525import org .apache .flink .table .planner .plan .optimize .program .StreamOptimizeContext ;
26- import org .apache .flink .table .planner .plan .utils .FlinkRexUtil ;
2726
27+ import org .apache .calcite .plan .RelOptUtil ;
2828import org .apache .calcite .rel .RelNode ;
2929import org .apache .calcite .rel .core .TableScan ;
30+ import org .apache .calcite .rex .RexBuilder ;
31+ import org .apache .calcite .rex .RexCall ;
32+ import org .apache .calcite .rex .RexLocalRef ;
3033import org .apache .calcite .rex .RexNode ;
34+ import org .apache .calcite .rex .RexUtil ;
35+ import org .apache .calcite .sql .SqlKind ;
36+ import org .apache .calcite .sql .fun .SqlStdOperatorTable ;
3137
3238import java .util .ArrayList ;
33- import java .util .Comparator ;
3439import java .util .HashMap ;
35- import java .util .HashSet ;
3640import java .util .List ;
3741import java .util .Map ;
38- import java .util .Set ;
42+ import java .util .Objects ;
43+ import java .util .stream .Collectors ;
3944
4045/**
4146 * A {@link FlinkOptimizeProgram} that marks ChangelogNormalize nodes using the same source and
@@ -149,27 +154,47 @@ public RelNode optimize(RelNode root, StreamOptimizeContext context) {
149154 continue ;
150155 }
151156
152- final Set <RexNode > common = calculateCommonCondition (changelogNormalizeContexts );
157+ final List <RexNode > commons =
158+ calculateCommonCondition (rexBuilder , changelogNormalizeContexts );
153159 for (ChangelogNormalizeContext ctx : changelogNormalizeContexts ) {
154160 ctx .getChangelogNormalize ().markSourceReuse ();
155- if (!common .isEmpty ()) {
156- ctx .getChangelogNormalize ().setCommonFilter (common .toArray (new RexNode [0 ]));
161+ if (!commons .isEmpty ()) {
162+ ctx .getChangelogNormalize ().setCommonFilter (commons .toArray (new RexNode [0 ]));
157163 }
158164 }
159165 }
160166 return root ;
161167 }
162168
163- private Set <RexNode > calculateCommonCondition (
164- List <ChangelogNormalizeContext > changelogNormalizeContexts ) {
165- changelogNormalizeContexts .sort (Comparator .comparingInt (o -> o .getConditions ().size ()));
166- final Set <RexNode > common =
167- new HashSet <>(changelogNormalizeContexts .get (0 ).getConditions ());
169+ private List <RexNode > calculateCommonCondition (
170+ RexBuilder rexBuilder , List <ChangelogNormalizeContext > changelogNormalizeContexts ) {
171+ if (changelogNormalizeContexts .stream ()
172+ .map (ChangelogNormalizeContext ::getConditions )
173+ .anyMatch (Objects ::isNull )) {
174+ return List .of ();
175+ }
176+
177+ final RexNode or =
178+ rexBuilder .makeCall (
179+ SqlStdOperatorTable .OR ,
180+ changelogNormalizeContexts .stream ()
181+ .map (ChangelogNormalizeContext ::getConditions )
182+ .collect (Collectors .toList ()));
183+ final RexCall factors = (RexCall ) RexUtil .pullFactors (rexBuilder , or );
168184
169- for (int i = 1 ; i < changelogNormalizeContexts .size () && !common .isEmpty (); i ++) {
170- common .retainAll (changelogNormalizeContexts .get (i ).getConditions ());
185+ final List <RexNode > commonCondition = new ArrayList <>();
186+ // Since we are interested in factors only then look for AND
187+ if (factors .getKind () == SqlKind .AND ) {
188+ for (RexNode node : factors .getOperands ()) {
189+ // If there is OR on top level then it is not a common factor anymore
190+ if (node .getKind () == SqlKind .OR ) {
191+ break ;
192+ }
193+ commonCondition .addAll (RelOptUtil .conjunctions (RexUtil .toCnf (rexBuilder , node )));
194+ }
195+ return commonCondition ;
171196 }
172- return common ;
197+ return List . of () ;
173198 }
174199
175200 private void gatherTableScanToChangelogNormalizeMap (
@@ -182,12 +207,24 @@ private void gatherTableScanToChangelogNormalizeMap(
182207 (StreamPhysicalChangelogNormalize ) input ;
183208 if (curRelNode instanceof StreamPhysicalCalc ) {
184209 StreamPhysicalCalc calc = (StreamPhysicalCalc ) curRelNode ;
185- final List <RexNode > conditions =
186- FlinkRexUtil .extractConjunctiveConditions (
187- rexBuilder , calc .getProgram ());
210+ RexLocalRef localRef = calc .getProgram ().getCondition ();
211+ final RexNode condition ;
212+ if (localRef == null ) {
213+ condition = null ;
214+ } else {
215+ // Expanded Sarg allows to extract partial common filter out of it
216+ RexNode rexNodeWithExpandedSearch =
217+ RexUtil .expandSearch (
218+ rexBuilder ,
219+ calc .getProgram (),
220+ calc .getProgram ().expandLocalRef (localRef ));
221+ // First pull factors from conditions per Changelog Normalize node
222+ // then find the common for all of them
223+ condition = RexUtil .pullFactors (rexBuilder , rexNodeWithExpandedSearch );
224+ }
188225 gatherTableScanToChangelogNormalizeMap (
189226 input ,
190- ChangelogNormalizeContext .of (changelogNormalize , conditions ),
227+ ChangelogNormalizeContext .of (changelogNormalize , condition ),
191228 map );
192229 }
193230 } else {
@@ -211,24 +248,24 @@ private void gatherTableScanToChangelogNormalizeMap(
211248
212249 private static class ChangelogNormalizeContext {
213250 private final StreamPhysicalChangelogNormalize changelogNormalize ;
214- private final List < RexNode > conditions ;
251+ private final RexNode conditions ;
215252
216- public ChangelogNormalizeContext (
217- StreamPhysicalChangelogNormalize changelogNormalize , List < RexNode > conditions ) {
253+ private ChangelogNormalizeContext (
254+ StreamPhysicalChangelogNormalize changelogNormalize , RexNode conditions ) {
218255 this .changelogNormalize = changelogNormalize ;
219256 this .conditions = conditions ;
220257 }
221258
222259 public static ChangelogNormalizeContext of (
223- StreamPhysicalChangelogNormalize changelogNormalize , List < RexNode > conditions ) {
260+ StreamPhysicalChangelogNormalize changelogNormalize , RexNode conditions ) {
224261 return new ChangelogNormalizeContext (changelogNormalize , conditions );
225262 }
226263
227264 public StreamPhysicalChangelogNormalize getChangelogNormalize () {
228265 return changelogNormalize ;
229266 }
230267
231- public List < RexNode > getConditions () {
268+ public RexNode getConditions () {
232269 return conditions ;
233270 }
234271 }
0 commit comments