Skip to content

Add checks that optimizers do not modify the layout #130757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130409.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130409
summary: Add Dependency Checker for `LogicalLocalPlanOptimizer`
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,16 @@ public String nodeString() {
}

protected abstract String label();

public static boolean semanticEquals(List<Attribute> left, List<Attribute> right) {
if (left.size() != right.size()) {
return false;
}
for (int i = 0; i < left.size(); i++) {
if (left.get(i).semanticEquals(right.get(i)) == false) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch;
Expand Down Expand Up @@ -35,6 +37,8 @@
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

private final LogicalVerifier verifier = LogicalVerifier.INSTANCE;

private static final List<Batch<LogicalPlan>> RULES = arrayAsArrayList(
new Batch<>(
"Local rewrite",
Expand Down Expand Up @@ -81,6 +85,12 @@ private static Batch<LogicalPlan> localOperators() {
}

public LogicalPlan localOptimize(LogicalPlan plan) {
return execute(plan);
LogicalPlan optimized = execute(plan);
Failures failures = verifier.verify(optimized, true, plan);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return optimized;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public LocalPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) {
}

public PhysicalPlan localOptimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan);
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan);
PhysicalPlan verify(PhysicalPlan planAfter, PhysicalPlan planBefore) {
Failures failures = verifier.verify(planAfter, true, planBefore);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return planAfter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public LogicalPlanOptimizer(LogicalOptimizerContext optimizerContext) {
public LogicalPlan optimize(LogicalPlan verified) {
var optimized = execute(verified);

Failures failures = verifier.verify(optimized);
Failures failures = verifier.verify(optimized, false, verified);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,32 @@
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;

import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.core.expression.Attribute.semanticEquals;

public final class LogicalVerifier {

public static final LogicalVerifier INSTANCE = new LogicalVerifier();

private LogicalVerifier() {}

/** Verifies the optimized logical plan. */
public Failures verify(LogicalPlan plan) {
public Failures verify(LogicalPlan planAfter, boolean skipRemoteEnrichVerification, LogicalPlan planBefore) {
Failures failures = new Failures();
Failures dependencyFailures = new Failures();

plan.forEachUp(p -> {
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = planAfter.collectFirstChildren(Enrich.class::isInstance);
if (enriches.isEmpty() == false && ((Enrich) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

planAfter.forEachUp(p -> {
PlanConsistencyChecker.checkPlan(p, dependencyFailures);

if (failures.hasFailures() == false) {
Expand All @@ -38,6 +50,12 @@ public Failures verify(LogicalPlan plan) {
}
});

if (semanticEquals(planBefore.output(), planAfter.output()) == false) {
failures.add(
fail(planAfter, "Layout has changed from [{}] to [{}]. ", planBefore.output().toString(), planAfter.output().toString())
);
}

if (dependencyFailures.hasFailures()) {
throw new IllegalStateException(dependencyFailures.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public PhysicalPlanOptimizer(PhysicalOptimizerContext context) {
}

public PhysicalPlan optimize(PhysicalPlan plan) {
return verify(execute(plan));
return verify(execute(plan), plan);
}

PhysicalPlan verify(PhysicalPlan plan) {
Failures failures = verifier.verify(plan);
PhysicalPlan verify(PhysicalPlan planAfter, PhysicalPlan planBefore) {
Failures failures = verifier.verify(planAfter, false, planBefore);
if (failures.hasFailures()) {
throw new VerificationException(failures);
}
return plan;
return planAfter;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ public final class PhysicalVerifier {
private PhysicalVerifier() {}

/** Verifies the physical plan. */
public Failures verify(PhysicalPlan plan) {
public Failures verify(PhysicalPlan planAfter, boolean skipRemoteEnrichVerification, PhysicalPlan planBefore) {
Failures failures = new Failures();
Failures depFailures = new Failures();

// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
if (skipRemoteEnrichVerification) {
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = planAfter.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}
}

plan.forEachDown(p -> {
planAfter.forEachDown(p -> {
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand Down Expand Up @@ -65,6 +67,12 @@ public Failures verify(PhysicalPlan plan) {
}
});

if (planBefore.output().equals(planAfter.output()) == false) {
failures.add(
fail(planAfter, "Layout has changed from [{}] to [{}]. ", planBefore.output().toString(), planAfter.output().toString())
);
}

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.type.InvalidMappedField;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.StartsWith;
Expand All @@ -49,6 +52,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
Expand All @@ -64,6 +68,7 @@
import java.util.Map;
import java.util.Set;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.ONE;
Expand All @@ -84,6 +89,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -774,6 +780,32 @@ public void testGroupingByMissingFields() {
as(eval.child(), EsRelation.class);
}

public void testPlanSanityCheck() throws Exception {
var plan = localPlan("""
from test
| stats a = min(salary) by emp_no
""");

var limit = as(plan, Limit.class);
var aggregate = as(limit.child(), Aggregate.class);
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
var salary = as(min.field(), NamedExpression.class);
assertThat(salary.name(), is("salary"));
// emulate a rule that adds an invalid field
var invalidPlan = new OrderBy(
limit.source(),
limit,
asList(new Order(limit.source(), salary, Order.OrderDirection.ASC, Order.NullsPosition.FIRST))
);

var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), TEST_SEARCH_STATS);
LocalLogicalPlanOptimizer localLogicalPlanOptimizer = new LocalLogicalPlanOptimizer(localContext);

IllegalStateException e = expectThrows(IllegalStateException.class, () -> localLogicalPlanOptimizer.localOptimize(invalidPlan));
assertThat(e.getMessage(), containsString("Plan [OrderBy[[Order[salary"));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [salary"));
}

private IsNotNull isNotNull(Expression field) {
return new IsNotNull(EMPTY, field);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
Expand All @@ -52,9 +53,11 @@
import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextFunction;
import org.elasticsearch.xpack.esql.expression.function.fulltext.Kql;
import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
Expand Down Expand Up @@ -131,8 +134,12 @@
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.indexWithDateDateNanosUnionType;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATE_NANOS;
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
import static org.elasticsearch.xpack.esql.core.util.TestUtils.getFieldAttribute;
import static org.elasticsearch.xpack.esql.plan.physical.AbstractPhysicalPlanSerializationTests.randomEstimatedRowSize;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -2056,6 +2063,37 @@ public void testToDateNanosPushDown() {
assertThat(expected.toString(), is(esQuery.query().toString()));
}

public void testVerifierOnMissingReferences() throws Exception {

PhysicalPlan plan = plannerOptimizer.plan("""
from test
| stats a = min(salary) by emp_no
""");

var limit = as(plan, LimitExec.class);
var aggregate = as(limit.child(), AggregateExec.class);
var min = as(Alias.unwrap(aggregate.aggregates().get(0)), Min.class);
var salary = as(min.field(), NamedExpression.class);
assertThat(salary.name(), is("salary"));
// emulate a rule that adds a missing attribute
FieldAttribute missingAttr = getFieldAttribute("missing attr");
List<Order> orders = List.of(new Order(plan.source(), missingAttr, Order.OrderDirection.ASC, Order.NullsPosition.FIRST));
TopNExec topNExec = new TopNExec(plan.source(), plan, orders, new Literal(Source.EMPTY, limit, INTEGER), randomEstimatedRowSize());

// We want to verify that the localOptimize detects the missing attribute.
// However, it also throws an error in one of the rules before we get to the verifier.
// So we use an implementation of LocalPhysicalPlanOptimizer that does not have any rules.
LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext(config, FoldContext.small(), SearchStats.EMPTY);
LocalPhysicalPlanOptimizer optimizerWithNoopExecute = new LocalPhysicalPlanOptimizer(context) {
@Override
protected List<Batch<PhysicalPlan>> batches() {
return List.of();
}
};
Exception e = expectThrows(IllegalStateException.class, () -> optimizerWithNoopExecute.localOptimize(topNExec));
assertThat(e.getMessage(), containsString(" optimized incorrectly due to missing references [missing attr"));
}

private boolean isMultiTypeEsField(Expression e) {
return e instanceof FieldAttribute fa && fa.field() instanceof MultiTypeEsField;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5554,7 +5554,7 @@ public void testPushShadowingGeneratingPlanPastProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5605,7 +5605,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProject() {
List<Attribute> initialGeneratedExprs = ((GeneratingPlan) initialPlan).generatedAttributes();
LogicalPlan optimizedPlan = testCase.rule.apply(initialPlan);

Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down Expand Up @@ -5661,7 +5661,7 @@ public void testPushShadowingGeneratingPlanPastRenamingProjectWithResolution() {

// This ensures that our generating plan doesn't use invalid references, resp. that any rename from the Project has
// been propagated into the generating plan.
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan);
Failures inconsistencies = LogicalVerifier.INSTANCE.verify(optimizedPlan, false, initialPlan);
assertFalse(inconsistencies.hasFailures());

Project project = as(optimizedPlan, Project.class);
Expand Down
Loading
Loading