Skip to content

[9.1] Add TEST MergeWithLowDiskSpaceIT testRelocationWhileForceMerging (#131767) #131778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@
package org.elasticsearch.index.engine;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -33,16 +40,20 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MergeWithLowDiskSpaceIT extends DiskUsageIntegTestCase {
private final TimeValue ACCEPTABLE_RELOCATION_TIME = new TimeValue(5, TimeUnit.MINUTES);
protected static long MERGE_DISK_HIGH_WATERMARK_BYTES;

@BeforeClass
Expand Down Expand Up @@ -266,6 +277,118 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception {
assertAcked(indicesAdmin().prepareDelete(indexName).get());
}

public void testRelocationWhileForceMerging() throws Exception {
final String node1 = internalCluster().startNode();
ensureStableCluster(1);
setTotalSpace(node1, Long.MAX_VALUE);
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()
);
// get current disk space usage (for all indices on the node)
IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get();
long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes();
// restrict the total disk space such that the next merge does not have sufficient disk space
long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L);
setTotalSpace(node1, insufficientTotalDiskSpace);
// node stats' FS stats should report that there is insufficient disk space available
assertBusy(() -> {
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get();
assertThat(nodesStatsResponse.getNodes().size(), equalTo(1));
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace));
assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES));
});
int indexingRounds = randomIntBetween(5, 10);
while (indexingRounds-- > 0) {
indexRandom(
true,
true,
true,
false,
IntStream.range(1, randomIntBetween(5, 10))
.mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50)))
.toList()
);
}
// the max segments argument makes it a blocking call
ActionFuture<BroadcastResponse> forceMergeBeforeRelocationFuture = indicesAdmin().prepareForceMerge(indexName)
.setMaxNumSegments(1)
.execute();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node1)
.getThreadPoolMergeExecutorService();
TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node1);
assertBusy(() -> {
// merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1));
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
// telemetry says that there are indeed some segments enqueued to be merged
testTelemetryPlugin.collect();
assertThat(
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(),
greaterThan(0L)
);
// but still no merges are currently running
assertThat(
testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(),
equalTo(0L)
);
// indices stats also says that no merge is currently running (blocked merges are NOT considered as "running")
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get();
long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent();
assertThat(currentMergeCount, equalTo(0L));
});
// the force merge call is still blocked
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
assertFalse(forceMergeBeforeRelocationFuture.isDone());
// merge executor still confirms merging is blocked due to insufficient disk space
assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace());
IndicesSegmentResponse indicesSegmentResponseBeforeRelocation = indicesAdmin().prepareSegments(indexName).get();
// the index should have more than 1 segments at this stage
assertThat(
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
iterableWithSize(greaterThan(1))
);
// start another node
final String node2 = internalCluster().startNode();
ensureStableCluster(2);
setTotalSpace(node2, Long.MAX_VALUE);
// relocate the shard from node1 to node2
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, node1, node2, Metadata.DEFAULT_PROJECT_ID));
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
// the force merge call is now unblocked
assertBusy(() -> {
assertTrue(forceMergeBeforeRelocationFuture.isDone());
assertFalse(forceMergeBeforeRelocationFuture.isCancelled());
});
// there is some merging going on in the {@code PostRecoveryMerger} after recovery, but that's not guaranteeing us a single segment,
// so let's trigger a force merge to 1 segment again (this one should succeed promptly)
indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).get();
IndicesSegmentResponse indicesSegmentResponseAfterRelocation = indicesAdmin().prepareSegments(indexName).get();
// assert there's only one segment now
assertThat(
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getSegments(),
iterableWithSize(1)
);
// also assert that the shard was indeed moved to a different node
assertThat(
indicesSegmentResponseAfterRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
.currentNodeId(),
not(
equalTo(
indicesSegmentResponseBeforeRelocation.getIndices().get(indexName).iterator().next().shards()[0].getShardRouting()
.currentNodeId()
)
)
);
}

public void setTotalSpace(String dataNodeName, long totalSpace) {
getTestFileStore(dataNodeName).setTotalSpace(totalSpace);
refreshClusterInfo();
Expand Down