Skip to content

Commit eefd583

Browse files
committed
fix: Fix pushdown conjuncts instead of generating join columns
1 parent 60c415f commit eefd583

File tree

9 files changed

+159
-249
lines changed

9 files changed

+159
-249
lines changed

axiom/optimizer/DerivedTable.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -918,6 +918,19 @@ void DerivedTable::distributeConjuncts() {
918918
tables[0]->as<DerivedTable>()->setOp.value() ==
919919
logical_plan::SetOperation::kUnionAll));
920920

921+
PlanObjectSet noPushdownTables;
922+
for (const auto* join : joins) {
923+
if (join->leftOptional()) {
924+
// No pushdown to the left side of a RIGHT or FULL join.
925+
noPushdownTables.add(join->leftTable());
926+
}
927+
if (join->rightOptional()) {
928+
// No pushdown to the right side of a LEFT or FULL join.
929+
noPushdownTables.add(join->rightTable());
930+
}
931+
}
932+
VELOX_DCHECK(tables.size() > 1 || noPushdownTables.empty());
933+
921934
for (auto i = 0; i < conjuncts.size(); ++i) {
922935
// No pushdown of non-deterministic except if only pushdown target is a
923936
// union all.
@@ -938,6 +951,10 @@ void DerivedTable::distributeConjuncts() {
938951
continue; // ValuesTable does not have filter pushdown.
939952
}
940953

954+
if (noPushdownTables.contains(tables[0])) {
955+
continue; // No pushdown if depends on an optional side of a join.
956+
}
957+
941958
if (tables[0]->is(PlanType::kUnnestTableNode)) {
942959
// UnnestTable does not implement filter pushdown yet.
943960
// TODO: We can push down predicate to left side of unnest if

axiom/optimizer/Optimization.cpp

Lines changed: 35 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,53 +1098,6 @@ void Optimization::joinByIndex(
10981098
}
10991099
}
11001100

1101-
struct ProjectionBuilder {
1102-
ColumnVector columns;
1103-
ExprVector exprs;
1104-
1105-
// Project 'expr' as 'column'.
1106-
void add(ColumnCP column, ExprCP expr) {
1107-
columns.emplace_back(column);
1108-
exprs.emplace_back(expr);
1109-
}
1110-
1111-
RelationOp* build(RelationOp* input) {
1112-
return make<Project>(
1113-
input, exprs, columns, isRedundantProject(input, exprs, columns));
1114-
}
1115-
1116-
ColumnVector inputColumns() const {
1117-
ColumnVector columns;
1118-
columns.reserve(exprs.size());
1119-
for (const auto* expr : exprs) {
1120-
VELOX_DCHECK(expr->isColumn());
1121-
columns.emplace_back(expr->as<Column>());
1122-
}
1123-
1124-
return columns;
1125-
}
1126-
1127-
PlanObjectSet outputColumns() const {
1128-
PlanObjectSet columnSet;
1129-
columnSet.unionObjects(columns);
1130-
return columnSet;
1131-
}
1132-
};
1133-
1134-
folly::F14FastMap<PlanObjectCP, ExprCP> makeJoinColumnMapping(
1135-
JoinEdgeP joinEdge) {
1136-
folly::F14FastMap<PlanObjectCP, ExprCP> mapping;
1137-
for (auto i = 0; i < joinEdge->leftColumns().size(); ++i) {
1138-
mapping.emplace(joinEdge->leftColumns()[i], joinEdge->leftExprs()[i]);
1139-
}
1140-
1141-
for (auto i = 0; i < joinEdge->rightColumns().size(); ++i) {
1142-
mapping.emplace(joinEdge->rightColumns()[i], joinEdge->rightExprs()[i]);
1143-
}
1144-
1145-
return mapping;
1146-
}
1147-
11481101
namespace {
11491102

11501103
// Check if 'mark' column produced by a SemiProject join is used only to filter
@@ -1292,38 +1245,23 @@ void Optimization::joinByHash(
12921245
PlanObjectSet probeColumns;
12931246
probeColumns.unionObjects(plan->columns());
12941247

1248+
ColumnVector columns;
1249+
PlanObjectSet columnSet;
12951250
ColumnCP mark = nullptr;
12961251

1297-
auto* joinEdge = candidate.join;
1298-
1299-
PlanObjectSet joinColumns;
1300-
joinColumns.unionObjects(joinEdge->leftColumns());
1301-
joinColumns.unionObjects(joinEdge->rightColumns());
1302-
1303-
// Mapping from join output column to probe or build side input.
1304-
auto joinColumnMapping = makeJoinColumnMapping(joinEdge);
1305-
1306-
ProjectionBuilder projectionBuilder;
1307-
bool needsProjection = false;
1308-
13091252
state.downstreamColumns().forEach<Column>([&](auto column) {
13101253
if (column == build.markColumn) {
13111254
mark = column;
13121255
return;
13131256
}
13141257

1315-
if (joinColumns.contains(column)) {
1316-
projectionBuilder.add(column, joinColumnMapping.at(column));
1317-
needsProjection = true;
1318-
return;
1319-
}
1320-
13211258
if ((probeOnly || !buildColumns.contains(column)) &&
13221259
!probeColumns.contains(column)) {
13231260
return;
13241261
}
13251262

1326-
projectionBuilder.add(column, column);
1263+
columnSet.add(column);
1264+
columns.push_back(column);
13271265
});
13281266

13291267
if (mark) {
@@ -1335,10 +1273,11 @@ void Optimization::joinByHash(
13351273

13361274
// If there is an existence flag, it is the rightmost result column.
13371275
if (mark) {
1338-
projectionBuilder.add(mark, mark);
1276+
columnSet.add(mark);
1277+
columns.push_back(mark);
13391278
}
13401279

1341-
state.columns = projectionBuilder.outputColumns();
1280+
state.columns = columnSet;
13421281

13431282
const auto fanout = fanoutJoinTypeLimit(
13441283
joinType,
@@ -1359,15 +1298,11 @@ void Optimization::joinByHash(
13591298
std::move(buildKeys),
13601299
candidate.join->filter(),
13611300
fanout,
1362-
projectionBuilder.inputColumns());
1301+
std::move(columns));
13631302

13641303
state.addCost(*join);
13651304
state.cost.cost += buildState.cost.cost;
13661305

1367-
if (needsProjection) {
1368-
join = projectionBuilder.build(join);
1369-
}
1370-
13711306
state.addNextJoin(&candidate, join, toTry);
13721307
}
13731308

@@ -1447,38 +1382,23 @@ void Optimization::joinByHashRight(
14471382
rightJoinType == velox::core::JoinType::kRightSemiFilter ||
14481383
rightJoinType == velox::core::JoinType::kRightSemiProject;
14491384

1385+
ColumnVector columns;
1386+
PlanObjectSet columnSet;
14501387
ColumnCP mark = nullptr;
14511388

1452-
auto* joinEdge = candidate.join;
1453-
1454-
PlanObjectSet joinColumns;
1455-
joinColumns.unionObjects(joinEdge->leftColumns());
1456-
joinColumns.unionObjects(joinEdge->rightColumns());
1457-
1458-
// Mapping from join output column to probe or build side input.
1459-
auto joinColumnMapping = makeJoinColumnMapping(joinEdge);
1460-
1461-
ProjectionBuilder projectionBuilder;
1462-
bool needsProjection = false;
1463-
14641389
state.downstreamColumns().forEach<Column>([&](auto column) {
14651390
if (column == probe.markColumn) {
14661391
mark = column;
14671392
return;
14681393
}
14691394

1470-
if (joinColumns.contains(column)) {
1471-
projectionBuilder.add(column, joinColumnMapping.at(column));
1472-
needsProjection = true;
1473-
return;
1474-
}
1475-
14761395
if (!buildColumns.contains(column) &&
14771396
(buildOnly || !probeColumns.contains(column))) {
14781397
return;
14791398
}
14801399

1481-
projectionBuilder.add(column, column);
1400+
columnSet.add(column);
1401+
columns.push_back(column);
14821402
});
14831403

14841404
if (mark) {
@@ -1489,7 +1409,8 @@ void Optimization::joinByHashRight(
14891409
tryOptimizeSemiProject(rightJoinType, mark, state, negation_);
14901410

14911411
if (mark) {
1492-
projectionBuilder.add(mark, mark);
1412+
columnSet.add(mark);
1413+
columns.push_back(mark);
14931414
}
14941415

14951416
const auto fanout = fanoutJoinTypeLimit(
@@ -1500,7 +1421,7 @@ void Optimization::joinByHashRight(
15001421

15011422
const auto buildCost = state.cost;
15021423

1503-
state.columns = projectionBuilder.outputColumns();
1424+
state.columns = columnSet;
15041425
state.cost = probeState.cost;
15051426
state.cost.cost += buildCost.cost;
15061427

@@ -1517,13 +1438,9 @@ void Optimization::joinByHashRight(
15171438
std::move(buildKeys),
15181439
candidate.join->filter(),
15191440
fanout,
1520-
projectionBuilder.inputColumns());
1441+
std::move(columns));
15211442
state.addCost(*join);
15221443

1523-
if (needsProjection) {
1524-
join = projectionBuilder.build(join);
1525-
}
1526-
15271444
state.addNextJoin(&candidate, join, toTry);
15281445
}
15291446

@@ -1721,6 +1638,19 @@ bool Optimization::placeConjuncts(
17211638
state.dt->singleRowDts.forEach<DerivedTable>(
17221639
[&](auto dt) { columnsAndSingles.unionObjects(dt->columns); });
17231640

1641+
PlanObjectSet noPushdownTables;
1642+
if (!joinsPlaced) {
1643+
for (const auto* join : state.dt->joins) {
1644+
if (join->leftOptional()) {
1645+
// No pushdown to the left side of a RIGHT or FULL join.
1646+
noPushdownTables.add(join->leftTable());
1647+
}
1648+
if (join->rightOptional()) {
1649+
// No pushdown to the right side of a LEFT or FULL join.
1650+
noPushdownTables.add(join->rightTable());
1651+
}
1652+
}
1653+
}
17241654
ExprVector filters;
17251655
for (auto& conjunct : state.dt->conjuncts) {
17261656
if (!joinsPlaced && conjunct->containsNonDeterministic()) {
@@ -1729,6 +1659,12 @@ bool Optimization::placeConjuncts(
17291659
if (state.placed.contains(conjunct)) {
17301660
continue;
17311661
}
1662+
if (!joinsPlaced) {
1663+
const auto* singleTable = conjunct->singleTable();
1664+
if (singleTable && noPushdownTables.contains(singleTable)) {
1665+
continue;
1666+
}
1667+
}
17321668
if (conjunct->columns().isSubset(state.columns)) {
17331669
state.columns.add(conjunct);
17341670
filters.push_back(conjunct);

axiom/optimizer/Plan.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,6 @@ PlanObjectSet PlanState::computeDownstreamColumns(bool includeFilters) const {
256256
if (addFilter && !join->filter().empty()) {
257257
addExprs(join->filter());
258258
}
259-
260-
if (addFilter) {
261-
addExprs(join->leftExprs());
262-
addExprs(join->rightExprs());
263-
}
264259
}
265260

266261
// Filters.

axiom/optimizer/QueryGraph.h

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -511,20 +511,6 @@ class JoinEdge {
511511
/// Marker column produced by 'exists' or 'not exists' join.
512512
/// If set, the 'joinType' must be kSemi.
513513
ColumnCP markColumn{nullptr};
514-
515-
/// Columns produced by the 'left' side of a RIGHT or FULL OUTER join.
516-
/// Requires 'leftOptional' to be true.
517-
ColumnVector leftColumns;
518-
519-
/// Input expressions corresponding 1:1 to 'leftColumns'.
520-
ExprVector leftExprs;
521-
522-
/// Columns produced by the 'right' side of a LEFT or FULL OUTER join.
523-
/// Requires 'rightOptional' to be true.
524-
ColumnVector rightColumns;
525-
526-
/// Input expressions corresponding 1:1 to 'rightColumns'.
527-
ExprVector rightExprs;
528514
};
529515

530516
/// @param leftTable The left table of the join. May be nullptr if 'leftKeys'
@@ -536,28 +522,14 @@ class JoinEdge {
536522
rightTable_(rightTable),
537523
filter_(std::move(spec.filter)),
538524
joinType_(spec.joinType),
539-
markColumn_(spec.markColumn),
540-
leftColumns_{spec.leftColumns},
541-
leftExprs_{spec.leftExprs},
542-
rightColumns_{spec.rightColumns},
543-
rightExprs_{spec.rightExprs} {
525+
markColumn_(spec.markColumn) {
544526
// Only left join can have null left table.
545527
VELOX_DCHECK(leftTable_ || isLeftOuter());
546528
VELOX_DCHECK_NOT_NULL(rightTable_);
547529
// filter_ is only for non-inner joins.
548530
VELOX_DCHECK(filter_.empty() || !isInner());
549531
// Mark column only for semi joins.
550532
VELOX_DCHECK(!markColumn_ || isSemi());
551-
552-
if (!leftColumns_.empty()) {
553-
VELOX_DCHECK(leftOptional());
554-
VELOX_DCHECK_EQ(leftColumns_.size(), leftExprs_.size());
555-
}
556-
557-
if (!rightColumns_.empty()) {
558-
VELOX_DCHECK(rightOptional());
559-
VELOX_DCHECK_EQ(rightColumns_.size(), rightExprs_.size());
560-
}
561533
}
562534

563535
static JoinEdge* makeInner(PlanObjectCP leftTable, PlanObjectCP rightTable) {
@@ -642,22 +614,6 @@ class JoinEdge {
642614
return markColumn_;
643615
}
644616

645-
const ColumnVector& leftColumns() const {
646-
return leftColumns_;
647-
}
648-
649-
const ExprVector& leftExprs() const {
650-
return leftExprs_;
651-
}
652-
653-
const ColumnVector& rightColumns() const {
654-
return rightColumns_;
655-
}
656-
657-
const ExprVector& rightExprs() const {
658-
return rightExprs_;
659-
}
660-
661617
void addEquality(ExprCP left, ExprCP right, bool update = false);
662618

663619
/// True if inner join.
@@ -796,11 +752,6 @@ class JoinEdge {
796752

797753
// Flag to set if right side has a match.
798754
ColumnCP const markColumn_;
799-
800-
const ColumnVector leftColumns_;
801-
const ExprVector leftExprs_;
802-
const ColumnVector rightColumns_;
803-
const ExprVector rightExprs_;
804755
};
805756

806757
using JoinEdgeP = JoinEdge*;

0 commit comments

Comments
 (0)