From ee4350d886c5d2fb8fbe2218c20178f682ae9d48 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 14 Jul 2025 17:34:41 -0600 Subject: [PATCH] Refactor and simplify missing index & unavailability handling --- .../xpack/esql/session/EsqlCCSUtils.java | 30 +++++++++--------- .../xpack/esql/session/EsqlSession.java | 28 +++-------------- .../xpack/esql/session/EsqlCCSUtilsTests.java | 31 +++++++++---------- 3 files changed, 34 insertions(+), 55 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 4bed369650bcc..69d7d5999db63 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -18,7 +18,6 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; @@ -37,11 +36,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; public class EsqlCCSUtils { @@ -177,7 +176,11 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu } } - static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map unavailable) { + static void updateExecutionInfoWithUnavailableClusters( + EsqlExecutionInfo execInfo, + Map> failures + ) { + Map unavailable = determineUnavailableRemoteClusters(failures); for (Map.Entry entry : unavailable.entrySet()) { String clusterAlias = entry.getKey(); boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable(); @@ -196,14 +199,16 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf static void updateExecutionInfoWithClustersWithNoMatchingIndices( EsqlExecutionInfo executionInfo, IndexResolution indexResolution, - Set unavailableClusters, - QueryBuilder filter + boolean usedFilter ) { - final Set clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases()); + // Get the clusters which are still running, and we will check whether they have any matching indices. + // NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters. + final Set clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING) + .map(Cluster::getClusterAlias) + .collect(Collectors.toSet()); for (String indexName : indexResolution.resolvedIndices()) { clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName)); } - clustersWithNoMatchingIndices.removeAll(unavailableClusters); /* * Rules enforced at planning time around non-matching indices * 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere @@ -216,24 +221,20 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) { - // if cluster was already in a terminal state, we don't need to check it again - continue; - } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) { String error = Strings.format( "Unknown index [%s]", (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression) ); - if (executionInfo.isSkipUnavailable(c) == false || filter != null) { + if (executionInfo.isSkipUnavailable(c) == false || usedFilter) { if (fatalErrorMessage == null) { fatalErrorMessage = error; } else { fatalErrorMessage += "; " + error; } } - if (filter == null) { + if (usedFilter == false) { // We check for filter since the filter may be the reason why the index is missing, and then we don't want to mark yet markClusterWithFinalStateAndNoShards( executionInfo, @@ -269,8 +270,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( // Filter-less version, mainly for testing where we don't need filter support static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet(); - updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null); + updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, false); } // visible for testing diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e8a13848c7576..b1b8e8af3680b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -457,14 +457,8 @@ public void analyzedPlan( try { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()); - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - result.indices, - unavailableClusters.keySet(), - null - ); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -528,10 +522,7 @@ private PreAnalysisResult receiveLookupIndexResolution( EsqlExecutionInfo executionInfo, IndexResolution lookupIndexResolution ) { - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters( - executionInfo, - EsqlCCSUtils.determineUnavailableRemoteClusters(lookupIndexResolution.failures()) - ); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, lookupIndexResolution.failures()); if (lookupIndexResolution.isValid() == false) { // If the index resolution is invalid, don't bother with the rest of the analysis return result.addLookupIndexResolution(index, lookupIndexResolution); @@ -749,10 +740,7 @@ private boolean allCCSClustersSkipped( ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters( - executionInfo, - EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()) - ); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception @@ -782,13 +770,7 @@ private static void analyzeAndMaybeRetry( if (result.indices.isValid() || requestFilter != null) { // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report // when the resolution result is not valid for a different reason. - var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet(); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - result.indices, - unavailableClusters, - requestFilter - ); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null); } plan = analyzeAction.apply(result); } catch (Exception e) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index a3d719cf91276..e2338a12f6179 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -152,7 +152,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure); + var unvailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure), REMOTE2_ALIAS, List.of(failure)); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); @@ -184,7 +184,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure)) + () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, List.of(failure))) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -337,8 +337,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -348,9 +348,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // since remote1 is in the failures Map (passed to IndexResolution.valid), + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); @@ -380,8 +379,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); @@ -389,9 +388,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // skipped since remote1 is in the failures Map + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); @@ -429,8 +427,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure)); - IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); + var failures = Map.of(REMOTE1_ALIAS, List.of(failure)); + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); @@ -440,9 +438,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed - // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) - assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + // skipped since remote1 is in the failures Map + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));