From 9160690611b502f7de5ee9134086178a82fd8cd9 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Wed, 23 Jul 2025 21:18:15 +0300 Subject: [PATCH 1/2] Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767) This adds a test that covers relocation for shards that are running a force merge. Relates https://github.com/elastic/elasticsearch/issues/93503 --- .../index/engine/MergeWithLowDiskSpaceIT.java | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 7a76872c27a54..6e448caeeb705 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -10,16 +10,23 @@ package org.elasticsearch.index.engine; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.DiskUsageIntegTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; @@ -33,16 +40,20 @@ import java.util.Collection; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase { + private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES); protected static long MERGE_DISK_HIGH_WATERMARK_BYTES; @BeforeClass @@ -266,6 +277,115 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { assertAcked(indicesAdmin().prepareDelete(indexName).get()); } + public void testRelocationWhileForceMerging() throws Exception { + final String node1 = internalCluster().startNode(); + ensureStableCluster(1); + setTotalSpace(node1, Long.MAX_VALUE); + String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + prepareCreate(indexName, indexSettings(1, 0)).get(); + // get current disk space usage (for all indices on the node) + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); + // restrict the total disk space such that the next merge does not have sufficient disk space + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); + setTotalSpace(node1, insufficientTotalDiskSpace); + // node stats' FS stats should report that there is insufficient disk space available + assertBusy(() -> { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); + }); + int indexingRounds = randomIntBetween(5, 10); + while (indexingRounds-- > 0) { + indexRandom( + true, + true, + true, + false, + IntStream.range(1, randomIntBetween(5, 10)) + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) + .toList() + ); + } + // the max segments argument makes it a blocking call + ActionFuture forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName) + .setMaxNumSegments(1) + .execute(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1) + .getThreadPoolMergeExecutorService(); + TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1); + assertBusy(() -> { + // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + // telemetry says that there are indeed some segments enqueued to be merged + testTelemetryPlugin.collect(); + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), + greaterThan(0L) + ); + // but still no merges are currently running + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), + equalTo(0L) + ); + // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); + long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); + assertThat(currentMergeCount, equalTo(0L)); + }); + // the force merge call is still blocked + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); + assertFalse(forceMergeBeforeRelocationFuture.isDone()); + // merge executor still confirms merging is blocked due to insufficient disk space + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get(); + // the index should have more than 1 segments at this stage + assertThat( + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), + iterableWithSize(greaterThan(1)) + ); + // start another node + final String node2 = internalCluster().startNode(); + ensureStableCluster(2); + setTotalSpace(node2, Long.MAX_VALUE); + // relocate the shard from node1 to node2 + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID)); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + // the force merge call is now unblocked + assertBusy(() -> { + assertTrue(forceMergeBeforeRelocationFuture.isDone()); + assertFalse(forceMergeBeforeRelocationFuture.isCancelled()); + }); + // there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment, + // so let's trigger a force merge to 1 segment again (this one should succeed promptly) + indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get(); + IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get(); + // assert there's only one segment now + assertThat( + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(), + iterableWithSize(1) + ); + // also assert that the shard was indeed moved to a different node + assertThat( + indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() + .currentNodeId(), + not( + equalTo( + indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting() + .currentNodeId() + ) + ) + ); + } + public void setTotalSpace(String dataNodeName, long totalSpace) { getTestFileStore(dataNodeName).setTotalSpace(totalSpace); refreshClusterInfo(); From 844367a143b24d3b28813f885dcb63d2f3558d47 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 28 Jul 2025 14:04:26 +0300 Subject: [PATCH 2/2] Fix MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131806) The index settings are randomized in the test, but this test suite doesn't work when indices have a custom data path. --- .../elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 6e448caeeb705..ff1dde7ed5572 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -282,7 +282,10 @@ public void testRelocationWhileForceMerging() throws Exception { ensureStableCluster(1); setTotalSpace(node1, Long.MAX_VALUE); String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - prepareCreate(indexName, indexSettings(1, 0)).get(); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); // get current disk space usage (for all indices on the node) IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();