Skip to content

[Backport 2.x] Add timeout handling for S3 blob container async operations #18595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.opensearch.common.blobstore.support.PlainBlobMetadata;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
Expand All @@ -100,6 +101,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -114,6 +117,7 @@
class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {

private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
private static final long DEFAULT_OPERATION_TIMEOUT = TimeUnit.SECONDS.toSeconds(30);

private final S3BlobStore blobStore;
private final String keyPath;
Expand Down Expand Up @@ -389,7 +393,7 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce

private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
try {
return future.get();
return future.get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
Expand All @@ -398,6 +402,9 @@ private <T> T getFutureValue(PlainActionFuture<T> future) throws IOException {
throw (IOException) e.getCause();
}
throw new RuntimeException(e.getCause());
} catch (TimeoutException e) {
FutureUtils.cancel(future);
throw new IOException("Delete operation timed out after 30 seconds", e);
}
}

Expand Down Expand Up @@ -782,6 +789,7 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
logger.debug("Starting async deletion for path [{}]", keyPath);
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

Expand All @@ -805,6 +813,7 @@ public void deleteAsync(ActionListener<DeleteResult> completionListener) {
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
logger.debug("Subscribed to list objects publisher for path [{}]", keyPath);
subscription.request(1);
}

Expand All @@ -816,6 +825,8 @@ public void onNext(ListObjectsV2Response response) {
objectsToDelete.add(s3Object.key());
});

logger.debug("Found {} objects to delete in current batch for path [{}]", response.contents().size(), keyPath);

int bulkDeleteSize = blobStore.getBulkDeletesSize();
if (objectsToDelete.size() >= bulkDeleteSize) {
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
Expand All @@ -824,6 +835,7 @@ public void onNext(ListObjectsV2Response response) {
List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

logger.debug("Executing bulk delete of {} objects for path [{}]", batchToDelete.size(), keyPath);
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
Expand All @@ -838,12 +850,19 @@ public void onNext(ListObjectsV2Response response) {

@Override
public void onError(Throwable t) {
logger.error(() -> new ParameterizedMessage("Failed to list objects for deletion in path [{}]", keyPath), t);
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
logger.debug(
"Completed listing objects for path [{}], remaining objects to delete: {}",
keyPath,
objectsToDelete.size()
);
if (!objectsToDelete.isEmpty()) {
logger.debug("Executing final bulk delete of {} objects for path [{}]", objectsToDelete.size(), keyPath);
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
Expand All @@ -854,8 +873,13 @@ public void onComplete() {
}
deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
logger.error(
() -> new ParameterizedMessage("Failed to complete deletion chain for path [{}]", keyPath),
throwable
);
listingFuture.completeExceptionally(throwable);
} else {
logger.debug("Successfully completed deletion chain for path [{}]", keyPath);
listingFuture.complete(null);
}
});
Expand All @@ -864,16 +888,24 @@ public void onComplete() {

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
logger.error(() -> new ParameterizedMessage("Failed to complete async deletion for path [{}]", keyPath), throwable);
completionListener.onFailure(
throwable instanceof Exception
? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable)
);
} else {
logger.debug(
"Successfully completed async deletion for path [{}]. Deleted {} blobs totaling {} bytes",
keyPath,
deletedBlobs.get(),
deletedBytes.get()
);
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to initiate async deletion for path [{}]", keyPath), e);
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public static CompletableFuture<Void> executeDeleteChain(
CompletableFuture<Void> currentChain,
Runnable afterDeleteAction
) {
logger.debug("Starting delete chain execution for {} objects", objectsToDelete.size());
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
logger.debug("Created {} delete batches", batches.size());
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches));
if (afterDeleteAction != null) {
logger.debug("Adding post-delete action to the chain");
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
Expand All @@ -45,42 +48,58 @@ public static CompletableFuture<Void> executeDeleteChain(
static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
int batchSize = Math.min(keys.size() - i, bulkDeleteSize);
batches.add(keys.subList(i, i + batchSize));
logger.debug("Created delete batch of size {} starting at index {}", batchSize, i);
}
return batches;
}

static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
logger.debug("Starting execution of {} delete batches", batches.size());
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
for (int i = 0; i < batches.size(); i++) {
List<String> batch = batches.get(i);
logger.debug("Queueing batch {} of {} with {} objects", i + 1, batches.size(), batch.size());
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch));
}

return allDeletesFuture;
return allDeletesFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
logger.error("Failed to complete delete batches execution", throwable);
} else {
logger.debug("Completed execution of all delete batches");
}
});
}

static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
logger.debug("Executing delete batch of {} objects", batch.size());
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(response -> {
logger.debug("Received delete response for batch of {} objects", batch.size());
return processDeleteResponse(response);
});
}

static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (deleteObjectsResponse.errors().isEmpty()) {
logger.debug("Successfully processed delete response with no errors");
} else {
List<String> errorDetails = deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList());
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
() -> new ParameterizedMessage("Failed to delete {} objects: {}", deleteObjectsResponse.errors().size(), errorDetails)
);
}
return null;
}

static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
logger.debug("Creating bulk delete request for {} objects in bucket {}", blobs.size(), bucket);
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,72 @@ public void testDeleteBlobsIgnoringIfNotExistsWithExecutionException() throws Ex
assertEquals(simulatedError, e.getCause().getCause());
}

public void testDeleteTimeoutWithNeverCompletingAsyncDeletionFuture() throws Exception {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(1000);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

// Create a future that never completes
CompletableFuture<DeleteObjectsResponse> neverCompletingFuture = new CompletableFuture<>();
when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(neverCompletingFuture);

// Create a publisher that emits one item and completes
final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
final CountDownLatch publisherCompletedLatch = new CountDownLatch(1);
final AtomicBoolean hasEmittedItem = new AtomicBoolean(false);

doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (!hasEmittedItem.getAndSet(true)) {
subscriber.onNext(
ListObjectsV2Response.builder()
.contents(Collections.singletonList(S3Object.builder().key("test-key").size(100L).build()))
.build()
);
publisherCompletedLatch.countDown();
} else {
subscriber.onComplete();
}
}

@Override
public void cancel() {}
});
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());

when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

IOException ex = assertThrows(IOException.class, blobContainer::delete);
assertEquals("Delete operation timed out after 30 seconds", ex.getMessage());

// Wait for publisher to complete
assertTrue("Publisher should complete", publisherCompletedLatch.await(1, TimeUnit.SECONDS));

verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class));
verify(s3AsyncClient, times(1)).deleteObjects(any(DeleteObjectsRequest.class));
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down
Loading