Skip to content

Commit 366bc00

Browse files
authored
Refactor and simplify missing index & unavailability handling (#131252)
1 parent 516990c commit 366bc00

File tree

3 files changed

+34
-55
lines changed

3 files changed

+34
-55
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.operator.DriverCompletionInfo;
1919
import org.elasticsearch.core.Nullable;
2020
import org.elasticsearch.index.IndexNotFoundException;
21-
import org.elasticsearch.index.query.QueryBuilder;
2221
import org.elasticsearch.indices.IndicesExpressionGrouper;
2322
import org.elasticsearch.license.XPackLicenseState;
2423
import org.elasticsearch.transport.ConnectTransportException;
@@ -37,11 +36,11 @@
3736
import java.util.ArrayList;
3837
import java.util.Collections;
3938
import java.util.HashMap;
40-
import java.util.HashSet;
4139
import java.util.List;
4240
import java.util.Map;
4341
import java.util.Objects;
4442
import java.util.Set;
43+
import java.util.stream.Collectors;
4544

4645
public class EsqlCCSUtils {
4746

@@ -177,7 +176,11 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu
177176
}
178177
}
179178

180-
static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInfo, Map<String, FieldCapabilitiesFailure> unavailable) {
179+
static void updateExecutionInfoWithUnavailableClusters(
180+
EsqlExecutionInfo execInfo,
181+
Map<String, List<FieldCapabilitiesFailure>> failures
182+
) {
183+
Map<String, FieldCapabilitiesFailure> unavailable = determineUnavailableRemoteClusters(failures);
181184
for (Map.Entry<String, FieldCapabilitiesFailure> entry : unavailable.entrySet()) {
182185
String clusterAlias = entry.getKey();
183186
boolean skipUnavailable = execInfo.getCluster(clusterAlias).isSkipUnavailable();
@@ -196,14 +199,16 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
196199
static void updateExecutionInfoWithClustersWithNoMatchingIndices(
197200
EsqlExecutionInfo executionInfo,
198201
IndexResolution indexResolution,
199-
Set<String> unavailableClusters,
200-
QueryBuilder filter
202+
boolean usedFilter
201203
) {
202-
final Set<String> clustersWithNoMatchingIndices = new HashSet<>(executionInfo.clusterAliases());
204+
// Get the clusters which are still running, and we will check whether they have any matching indices.
205+
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
206+
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
207+
.map(Cluster::getClusterAlias)
208+
.collect(Collectors.toSet());
203209
for (String indexName : indexResolution.resolvedIndices()) {
204210
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
205211
}
206-
clustersWithNoMatchingIndices.removeAll(unavailableClusters);
207212
/*
208213
* Rules enforced at planning time around non-matching indices
209214
* 1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere
@@ -216,24 +221,20 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
216221
* Mark it as SKIPPED with 0 shards searched and took=0.
217222
*/
218223
for (String c : clustersWithNoMatchingIndices) {
219-
if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) {
220-
// if cluster was already in a terminal state, we don't need to check it again
221-
continue;
222-
}
223224
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
224225
if (concreteIndexRequested(executionInfo.getCluster(c).getIndexExpression())) {
225226
String error = Strings.format(
226227
"Unknown index [%s]",
227228
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
228229
);
229-
if (executionInfo.isSkipUnavailable(c) == false || filter != null) {
230+
if (executionInfo.isSkipUnavailable(c) == false || usedFilter) {
230231
if (fatalErrorMessage == null) {
231232
fatalErrorMessage = error;
232233
} else {
233234
fatalErrorMessage += "; " + error;
234235
}
235236
}
236-
if (filter == null) {
237+
if (usedFilter == false) {
237238
// We check for filter since the filter may be the reason why the index is missing, and then we don't want to mark yet
238239
markClusterWithFinalStateAndNoShards(
239240
executionInfo,
@@ -269,8 +270,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
269270

270271
// Filter-less version, mainly for testing where we don't need filter support
271272
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
272-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures()).keySet();
273-
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, unavailableClusters, null);
273+
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, false);
274274
}
275275

276276
// visible for testing

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -457,14 +457,8 @@ public void analyzedPlan(
457457
try {
458458
// the order here is tricky - if the cluster has been filtered and later became unavailable,
459459
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
460-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures());
461-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unavailableClusters);
462-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
463-
executionInfo,
464-
result.indices,
465-
unavailableClusters.keySet(),
466-
null
467-
);
460+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
461+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, false);
468462
plan = analyzeAction.apply(result);
469463
} catch (Exception e) {
470464
l.onFailure(e);
@@ -528,10 +522,7 @@ private PreAnalysisResult receiveLookupIndexResolution(
528522
EsqlExecutionInfo executionInfo,
529523
IndexResolution lookupIndexResolution
530524
) {
531-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(
532-
executionInfo,
533-
EsqlCCSUtils.determineUnavailableRemoteClusters(lookupIndexResolution.failures())
534-
);
525+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, lookupIndexResolution.failures());
535526
if (lookupIndexResolution.isValid() == false) {
536527
// If the index resolution is invalid, don't bother with the rest of the analysis
537528
return result.addLookupIndexResolution(index, lookupIndexResolution);
@@ -749,10 +740,7 @@ private boolean allCCSClustersSkipped(
749740
ActionListener<LogicalPlan> logicalPlanListener
750741
) {
751742
IndexResolution indexResolution = result.indices;
752-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(
753-
executionInfo,
754-
EsqlCCSUtils.determineUnavailableRemoteClusters(indexResolution.failures())
755-
);
743+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
756744
if (executionInfo.isCrossClusterSearch()
757745
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
758746
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
@@ -782,13 +770,7 @@ private static void analyzeAndMaybeRetry(
782770
if (result.indices.isValid() || requestFilter != null) {
783771
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
784772
// when the resolution result is not valid for a different reason.
785-
var unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(result.indices.failures()).keySet();
786-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(
787-
executionInfo,
788-
result.indices,
789-
unavailableClusters,
790-
requestFilter
791-
);
773+
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null);
792774
}
793775
plan = analyzeAction.apply(result);
794776
} catch (Exception e) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
152152
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true));
153153

154154
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
155-
var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure);
155+
var unvailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure), REMOTE2_ALIAS, List.of(failure));
156156
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters);
157157

158158
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS)));
@@ -184,7 +184,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() {
184184
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
185185
RemoteTransportException e = expectThrows(
186186
RemoteTransportException.class,
187-
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure))
187+
() -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, List.of(failure)))
188188
);
189189
assertThat(e.status().getStatus(), equalTo(500));
190190
assertThat(
@@ -337,8 +337,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
337337
);
338338
// remote1 is unavailable
339339
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
340-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
341-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
340+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
341+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
342342

343343
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
344344

@@ -348,9 +348,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
348348

349349
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
350350
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
351-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
352-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
353-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
351+
// since remote1 is in the failures Map (passed to IndexResolution.valid),
352+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
354353

355354
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
356355
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));
@@ -380,18 +379,17 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
380379
);
381380

382381
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
383-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
384-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
382+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
383+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
385384
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
386385

387386
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS);
388387
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
389388
assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING);
390389

391390
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
392-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
393-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
394-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
391+
// skipped since remote1 is in the failures Map
392+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
395393

396394
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
397395
assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
@@ -429,8 +427,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
429427

430428
// remote1 is unavailable
431429
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
432-
var unavailableClusters = Map.of(REMOTE1_ALIAS, List.of(failure));
433-
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
430+
var failures = Map.of(REMOTE1_ALIAS, List.of(failure));
431+
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), failures);
434432

435433
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
436434

@@ -440,9 +438,8 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
440438

441439
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS);
442440
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
443-
// since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed
444-
// by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters)
445-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
441+
// skipped since remote1 is in the failures Map
442+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
446443

447444
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS);
448445
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*"));

0 commit comments

Comments
 (0)