Skip to content

Refactor and simplify missing index & unavailability handling #131252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.transport.ConnectTransportException;
Expand All @@ -37,11 +36,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class EsqlCCSUtils {

Expand Down Expand Up @@ -177,7 +176,11 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
}
}

static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
static void updateExecutionInfoWithUnavailableClusters(
EsqlExecutionInfo execInfo,
Map<String, List<FieldCapabilitiesFailure>> failures
) {
Map<String, FieldCapabilitiesFailure> unavailable = determineUnavailableRemoteClusters(failures);
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
String clusterAlias = entry.getKey();
boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
Expand All @@ -196,14 +199,16 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
static void updateExecutionInfoWithClustersWithNoMatchingIndices(
EsqlExecutionInfo executionInfo,
IndexResolution indexResolution,
Set<String> unavailableClusters,
QueryBuilder filter
boolean usedFilter
) {
final Set<String> clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases());
// Get the clusters which are still running, and we will check whether they have any matching indices.
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
.map(Cluster::getClusterAlias)
.collect(Collectors.toSet());
for (String indexName : indexResolution.resolvedIndices()) {
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
}
clustersWithNoMatchingIndices.removeAll(unavailableClusters);
/*
* Rules enforced at planning time around non-matching indices
* 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere
Expand All @@ -216,24 +221,20 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
* Mark it as SKIPPED with 0 shards searched and took=0.
*/
for (String c : clustersWithNoMatchingIndices) {
if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) {
// if cluster was already in a terminal state, we don't need to check it again
continue;
}
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) {
String error = Strings.format(
"Unknown index [%s]",
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
);
if (executionInfo.isSkipUnavailable(c) == false || filter != null) {
if (executionInfo.isSkipUnavailable(c) == false || usedFilter) {
if (fatalErrorMessage == null) {
fatalErrorMessage = error;
} else {
fatalErrorMessage += "; " + error;
}
}
if (filter == null) {
if (usedFilter == false) {
// We check for filter since the filter may be the reason why the index is missing, and then we don't want to mark yet
markClusterWithFinalStateAndNoShards(
executionInfo,
Expand Down Expand Up @@ -269,8 +270,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(

// Filter-less version, mainly for testing where we don't need filter support
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet();
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null);
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, false);
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,8 @@ public void analyzedPlan(
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures());
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters);
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
executionInfo,
result.indices,
unavailableClusters.keySet(),
null
);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
plan = analyzeAction.apply(result);
} catch (Exception e) {
l.onFailure(e);
Expand Down Expand Up @@ -528,10 +522,7 @@ private PreAnalysisResult receiveLookupIndexResolution(
EsqlExecutionInfo executionInfo,
IndexResolution lookupIndexResolution
) {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(
executionInfo,
EsqlCCSUtils.determineUnavailableRemoteClusters(lookupIndexResolution.failures())
);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, lookupIndexResolution.failures());
if (lookupIndexResolution.isValid() == false) {
// If the index resolution is invalid, don't bother with the rest of the analysis
return result.addLookupIndexResolution(index, lookupIndexResolution);
Expand Down Expand Up @@ -749,10 +740,7 @@ private boolean allCCSClustersSkipped(
ActionListener<LogicalPlan> logicalPlanListener
) {
IndexResolution indexResolution = result.indices;
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(
executionInfo,
EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures())
);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
Expand Down Expand Up @@ -782,13 +770,7 @@ private static void analyzeAndMaybeRetry(
if (result.indices.isValid() || requestFilter != null) {
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
// when the resolution result is not valid for a different reason.
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet();
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
executionInfo,
result.indices,
unavailableClusters,
requestFilter
);
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
}
plan = analyzeAction.apply(result);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));

var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure);
var unvailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure), REMOTE2_ALIAS, List.of(failure));
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters);

assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS)));
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
RemoteTransportException e = expectThrows(
RemoteTransportException.class,
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure))
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, List.of(failure)))
);
assertThat(e.status().getStatus(), equalTo(500));
assertThat(
Expand Down Expand Up @@ -337,8 +337,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
);
// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand All @@ -348,9 +348,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
// since remote1 is in the failures Map (passed to IndexResolution.valid),
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
Expand Down Expand Up @@ -380,18 +379,17 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
);

var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);

EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
// skipped since remote1 is in the failures Map
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
Expand Down Expand Up @@ -429,8 +427,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand All @@ -440,9 +438,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
// skipped since remote1 is in the failures Map
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));

EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
Expand Down