diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java index 2fc1d4c5f1..748194d886 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java @@ -172,6 +172,11 @@ public Map purgeRealms(Iterable realms) { return Map.copyOf(results); } + @Override + public Map getMetaStoreManagerMap() { + return metaStoreManagerMap; + } + @Override public synchronized PolarisMetaStoreManager getOrCreateMetaStoreManager( RealmContext realmContext) { diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index 2a32fb6f94..018a104bf3 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -1241,6 +1241,16 @@ private void revokeGrantRecord( PolarisTaskConstants.TASK_TYPE, String.valueOf(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER.typeCode())); properties.put("data", PolarisObjectMapperUtil.serialize(callCtx, refreshEntityToDrop)); + // Update LAST_ATTEMPT_START_TIME to prevent multiple executors from picking the same task + // simultaneously; protected by TASK_TIMEOUT_MILLIS + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); taskEntity.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); if (cleanupProperties != null) { taskEntity.setInternalProperties( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java index 2c97ebd638..7e53f704da 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/LocalPolarisMetaStoreManagerFactory.java @@ -193,6 +193,11 @@ public synchronized EntityCache getOrCreateEntityCache(RealmContext realmContext return entityCacheMap.get(realmContext.getRealmIdentifier()); } + @Override + public Map getMetaStoreManagerMap() { + return metaStoreManagerMap; + } + /** * This method bootstraps service for a given realm: i.e. creates all the needed entities in the * metastore and creates a root service principal. After that we rotate the root principal diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java index cb2523891f..0e951a0183 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/MetaStoreManagerFactory.java @@ -43,4 +43,8 @@ Map bootstrapRealms( /** Purge all metadata for the realms provided */ Map purgeRealms(Iterable realms); + + default Map getMetaStoreManagerMap() { + throw new UnsupportedOperationException("getMetaStoreManagerMap not supported"); + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 62f526a6db..a60359185f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -1429,6 +1429,16 @@ private void bootstrapPolarisService( PolarisTaskConstants.TASK_TYPE, String.valueOf(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER.typeCode())); properties.put("data", PolarisObjectMapperUtil.serialize(callCtx, refreshEntityToDrop)); + // Update LAST_ATTEMPT_START_TIME to prevent multiple executors from picking the same task + // simultaneously; protected by TASK_TIMEOUT_MILLIS + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); taskEntity.setProperties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); if (cleanupProperties != null) { taskEntity.setInternalProperties( diff --git a/quarkus/service/build.gradle.kts b/quarkus/service/build.gradle.kts index 0c34075bfb..f7ea4584e3 100644 --- a/quarkus/service/build.gradle.kts +++ b/quarkus/service/build.gradle.kts @@ -53,7 +53,6 @@ dependencies { implementation("io.quarkus:quarkus-security") implementation("io.quarkus:quarkus-smallrye-context-propagation") implementation("io.quarkus:quarkus-smallrye-fault-tolerance") - implementation(libs.jakarta.enterprise.cdi.api) implementation(libs.jakarta.inject.api) implementation(libs.jakarta.validation.api) @@ -71,6 +70,7 @@ dependencies { implementation(libs.auth0.jwt) implementation(libs.bouncycastle.bcprov) + implementation("io.quarkus:quarkus-scheduler") compileOnly(libs.jakarta.annotation.api) compileOnly(libs.spotbugs.annotations) diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java index 3e16edb5a6..217f847798 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java @@ -23,7 +23,9 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.quarkus.runtime.Startup; +import io.quarkus.scheduler.Scheduled; import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.concurrent.ExecutorService; @@ -60,6 +62,18 @@ public void init() { super.init(); } + @PostConstruct + @Override + public void postConstruct() { + super.postConstruct(); + } + + @Scheduled(every = "PT10M") + @Override + public void scheduled() { + super.scheduled(); + } + @Override protected void handleTask(long taskEntityId, CallContext callContext, int attempt) { Span span = diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index 39cd619bd4..f314deb714 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -260,10 +260,12 @@ public void deleteFile(String location) { .setName(UUID.randomUUID().toString()) .build(); addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); assertThat(handler.handleTask(task, callCtx)).isTrue(); assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(manifestFile.path()); } } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 5e39028c92..d3d3cdf236 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -24,7 +24,9 @@ import jakarta.annotation.Nonnull; import jakarta.inject.Inject; import java.io.IOException; -import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -68,6 +70,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.LoggerFactory; +import org.threeten.extra.MutableClock; @QuarkusTest class TableCleanupTaskHandlerTest { @@ -78,6 +81,7 @@ class TableCleanupTaskHandlerTest { private CallContext callContext; private final RealmContext realmContext = () -> "realmName"; + private final MutableClock timeSource = MutableClock.of(Instant.now(), ZoneOffset.UTC); private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { return new TaskFileIOSupplier( @@ -103,7 +107,7 @@ void setup() { metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), diagServices, configurationStore, - Clock.systemDefaultZone()); + timeSource); callContext = CallContext.of(realmContext, polarisCallContext); } @@ -152,6 +156,7 @@ public void testTableCleanup() throws IOException { handler.handleTask(task, callContext); + timeSource.add(Duration.ofMinutes(10)); assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) @@ -232,6 +237,7 @@ public void close() { assertThat(results).containsExactly(true, true); // both tasks successfully executed, but only one should queue subtasks + timeSource.add(Duration.ofMinutes(10)); assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) @@ -293,6 +299,7 @@ public void close() { assertThat(results).containsExactly(true, true); // both tasks successfully executed, but only one should queue subtasks + timeSource.add(Duration.ofMinutes(10)); assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) @@ -413,6 +420,7 @@ public void testTableCleanupMultipleSnapshots() throws IOException { handler.handleTask(task, callContext); + timeSource.add(Duration.ofMinutes(10)); List entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) @@ -587,6 +595,7 @@ public void testTableCleanupMultipleMetadata() throws IOException { handler.handleTask(task, callContext); + timeSource.add(Duration.ofMinutes(10)); List entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(callContext.getRealmContext()) diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskRecoveryManagerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskRecoveryManagerTest.java new file mode 100644 index 0000000000..24c0b600f1 --- /dev/null +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskRecoveryManagerTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.task; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.quarkus.test.junit.QuarkusTest; +import jakarta.annotation.Nonnull; +import jakarta.inject.Inject; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryFileIO; +import org.apache.iceberg.io.FileIO; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.AsyncTaskType; +import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisTaskConstants; +import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.apache.polaris.core.storage.PolarisStorageActions; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.catalog.io.FileIOFactory; +import org.apache.polaris.service.task.TableCleanupTaskHandler; +import org.apache.polaris.service.task.TaskExecutorImpl; +import org.apache.polaris.service.task.TaskFileIOSupplier; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.threeten.extra.MutableClock; + +@QuarkusTest +public class TaskRecoveryManagerTest { + @Inject private MetaStoreManagerFactory metaStoreManagerFactory; + protected final MutableClock timeSource = MutableClock.of(Instant.now(), ZoneOffset.UTC); + private final RealmContext realmContext = () -> "realmName"; + + private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) { + return new TaskFileIOSupplier( + new FileIOFactory() { + @Override + public FileIO loadFileIO( + @Nonnull CallContext callContext, + @Nonnull String ioImplClassName, + @Nonnull Map properties, + @Nonnull TableIdentifier identifier, + @Nonnull Set tableLocations, + @Nonnull Set storageActions, + @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) { + return fileIO; + } + }); + } + + private void addTaskLocation(TaskEntity task) { + Map internalPropertiesAsMap = new HashMap<>(task.getInternalPropertiesAsMap()); + internalPropertiesAsMap.put(PolarisTaskConstants.STORAGE_LOCATION, "file:///tmp/"); + ((PolarisBaseEntity) task).setInternalPropertiesAsMap(internalPropertiesAsMap); + } + + @Test + void testTaskRecovery() throws IOException { + // Step 1: Initialize mock table metadata, snapshot, and statistics file to simulate a realistic + // Iceberg table + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), + new PolarisDefaultDiagServiceImpl(), + new PolarisConfigurationStore() {}, + timeSource); + try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { + CallContext.setCurrentContext(callCtx); + Map retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TestServices testServices = TestServices.builder().realmContext(realmContext).build(); + TaskExecutorImpl taskExecutor = + new TaskExecutorImpl( + Runnable::run, + metaStoreManagerFactory, + buildTaskFileIOSupplier(fileIO), + testServices.polarisEventListener()) { + @Override + public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { + int attempts = + retryCounter + .computeIfAbsent(String.valueOf(taskEntityId), k -> new AtomicInteger(0)) + .incrementAndGet(); + if (attempts == 1) { + // no-op for first attempt to mock failure + } else { + super.addTaskHandlerContext(taskEntityId, callContext); + } + } + }; + taskExecutor.init(); + + TableCleanupTaskHandler tableCleanupTaskHandler = + new TableCleanupTaskHandler( + taskExecutor, + metaStoreManagerFactory, + buildTaskFileIOSupplier(new InMemoryFileIO())) {}; + + TableIdentifier tableIdentifier = + TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + + // Step 2: Execute the initial cleanup task, where two child cleanup tasks are generated and + // executed the first time + TaskEntity task = + new TaskEntity.Builder() + .setName("cleanup_" + tableIdentifier) + .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER) + .withData( + new IcebergTableLikeEntity.Builder(tableIdentifier, metadataFile) + .setName("table1") + .setCatalogId(1) + .setCreateTimestamp(100) + .build()) + .build(); + addTaskLocation(task); + Assertions.assertThatPredicate(tableCleanupTaskHandler::canHandleTask).accepts(task); + tableCleanupTaskHandler.handleTask(task, callCtx); + + // Step 3: Verify that the generated child tasks were registered, ATTEMPT_COUNT = 2 + timeSource.add(Duration.ofMinutes(10)); + EntitiesResult entitiesResult = + metaStoreManagerFactory + .getOrCreateMetaStoreManager(realmContext) + .loadTasks(callCtx.getPolarisCallContext(), "test", PageToken.fromLimit(2)); + assertThat(entitiesResult.getEntities()).hasSize(2); + entitiesResult + .getEntities() + .forEach( + entity -> { + TaskEntity taskEntity = TaskEntity.of(entity); + assertThat(taskEntity.getPropertiesAsMap().get(PolarisTaskConstants.ATTEMPT_COUNT)) + .isEqualTo("2"); + }); + + // Step 4: Test task recovery + // Before timeout: expect no tasks eligible for recovery + entitiesResult = + metaStoreManagerFactory + .getOrCreateMetaStoreManager(realmContext) + .loadTasks(callCtx.getPolarisCallContext(), "test", PageToken.fromLimit(2)); + assertThat(entitiesResult.getEntities()).hasSize(0); + // Advance time and trigger recovery: expect ATTEMPT_COUNT = 3 + timeSource.add(Duration.ofMinutes(10)); + taskExecutor.recoverPendingTasks(timeSource); + + // Step 5: all task should success ATTEMPT_COUNT = 4 + timeSource.add(Duration.ofMinutes(10)); + entitiesResult = + metaStoreManagerFactory + .getOrCreateMetaStoreManager(realmContext) + .loadTasks(callCtx.getPolarisCallContext(), "test", PageToken.fromLimit(2)); + assertThat(entitiesResult.getEntities()).hasSize(0); + } + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java index aa0b0d9f1e..a22e51ae93 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/FileCleanupTaskHandler.java @@ -86,7 +86,7 @@ public CompletableFuture tryDelete( .addKeyValue("file", file) .addKeyValue("baseFile", baseFile != null ? baseFile : "") .addKeyValue("tableId", tableId) - .log("table file cleanup task scheduled, but data file doesn't exist"); + .log("File does not exist in FileIO, skipping deletion"); } }, executorService) @@ -97,7 +97,8 @@ public CompletableFuture tryDelete( .addKeyValue("file", file) .addKeyValue("tableIdentifier", tableId) .addKeyValue("baseFile", baseFile != null ? baseFile : "") - .log("Exception caught deleting data file", newEx); + .addKeyValue("error", newEx.getMessage()) + .log("Exception caught deleting data file"); return tryDelete(tableId, fileIO, baseFile, file, newEx, attempt + 1); }, CompletableFuture.delayedExecutor( diff --git a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java index 3173dc25e7..5451a02233 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java @@ -82,18 +82,20 @@ private boolean cleanUpManifestFile( return true; } - ManifestReader dataFiles = ManifestFiles.read(manifestFile, fileIO); - List> dataFileDeletes = - StreamSupport.stream( - Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), - false) - .map(file -> tryDelete(tableId, fileIO, manifestFile.path(), file.location(), null, 1)) - .toList(); - LOGGER.debug( - "Scheduled {} data files to be deleted from manifest {}", - dataFileDeletes.size(), - manifestFile.path()); try { + ManifestReader dataFiles = ManifestFiles.read(manifestFile, fileIO); + List> dataFileDeletes = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE), + false) + .map( + file -> tryDelete(tableId, fileIO, manifestFile.path(), file.location(), null, 1)) + .toList(); + LOGGER.debug( + "Scheduled {} data files to be deleted from manifest {}", + dataFileDeletes.size(), + manifestFile.path()); + // wait for all data files to be deleted, then wait for the manifest itself to be deleted CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new)) .thenCompose( diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java index ff791bf188..9bc03ee831 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java @@ -37,6 +37,7 @@ import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntityType; +import org.apache.polaris.core.entity.PolarisTaskConstants; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; @@ -156,6 +157,10 @@ private Stream getManifestTaskStream( IcebergTableLikeEntity tableEntity, PolarisMetaStoreManager metaStoreManager, PolarisCallContext polarisCallContext) { + String executorId = + cleanupTask + .getPropertiesAsMap() + .getOrDefault(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, ""); // read the manifest list for each snapshot. dedupe the manifest files and schedule a // cleanupTask // for each manifest file and its data files to be deleted @@ -193,6 +198,9 @@ private Stream getManifestTaskStream( .withData( new ManifestFileCleanupTaskHandler.ManifestCleanupTask( tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf))) + .withLastAttemptExecutorId(executorId) + .withAttemptCount(1) + .withLastAttemptStartedTimestamp(polarisCallContext.getClock().millis()) .setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId()) // copy the internal properties, which will have storage info .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) @@ -211,6 +219,10 @@ private Stream getMetadataTaskStream( polarisCallContext .getConfigurationStore() .getConfiguration(polarisCallContext, BATCH_SIZE_CONFIG_KEY, 10); + String executorId = + cleanupTask + .getPropertiesAsMap() + .getOrDefault(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, ""); return getMetadataFileBatches(tableMetadata, batchSize).stream() .map( metadataBatch -> { @@ -235,6 +247,9 @@ private Stream getMetadataTaskStream( .withData( new BatchFileCleanupTaskHandler.BatchFileCleanupTask( tableEntity.getTableIdentifier(), metadataBatch)) + .withLastAttemptExecutorId(executorId) + .withAttemptCount(1) + .withLastAttemptStartedTimestamp(polarisCallContext.getClock().millis()) .setInternalProperties(cleanupTask.getInternalPropertiesAsMap()) .build(); }); diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index 1409310318..b19eda41c4 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -18,15 +18,19 @@ */ package org.apache.polaris.service.task; +import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; +import java.time.Clock; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntityType; @@ -46,7 +50,9 @@ public class TaskExecutorImpl implements TaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorImpl.class); private static final long TASK_RETRY_DELAY = 1000; + private static final String EXECUTOR_ID_PREFIX = "TaskExecutor-"; + private final String executorId; private final Executor executor; private final MetaStoreManagerFactory metaStoreManagerFactory; private final TaskFileIOSupplier fileIOSupplier; @@ -58,6 +64,7 @@ public TaskExecutorImpl( MetaStoreManagerFactory metaStoreManagerFactory, TaskFileIOSupplier fileIOSupplier, PolarisEventListener polarisEventListener) { + this.executorId = EXECUTOR_ID_PREFIX + UUID.randomUUID(); this.executor = executor; this.metaStoreManagerFactory = metaStoreManagerFactory; this.fileIOSupplier = fileIOSupplier; @@ -74,6 +81,16 @@ public void init() { fileIOSupplier, Executors.newVirtualThreadPerTaskExecutor())); } + public void postConstruct() { + LOGGER.info("Try to recover pending tasks from TaskExecutorImpl postConstruct"); + recoverPendingTasks(); + } + + public void scheduled() { + LOGGER.info("Try to recover pending tasks from TaskExecutorImpl scheduled"); + recoverPendingTasks(); + } + /** * Add a {@link TaskHandler}. {@link TaskEntity}s will be tested against the {@link * TaskHandler#canHandleTask(TaskEntity)} method and will be handled by the first handler that @@ -103,6 +120,8 @@ public void addTaskHandlerContext(long taskEntityId, CallContext callContext) { private @Nonnull CompletableFuture tryHandleTask( long taskEntityId, CallContext callContext, Throwable e, int attempt) { if (attempt > 3) { + // When fail to handle a task, we will leave the task entity in the metastore and handle it + // later return CompletableFuture.failedFuture(e); } return CompletableFuture.runAsync( @@ -162,9 +181,21 @@ protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { .addKeyValue("taskEntityName", taskEntity.getName()) .log("Unable to execute async task"); } + } catch (Exception e) { + LOGGER.error("Error while handling task entity id {}, error: {}", taskEntityId, e); } finally { polarisEventListener.onAfterTaskAttempted( new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success)); } } + + public void recoverPendingTasks() { + TaskRecoveryManager.recoverPendingTasks(metaStoreManagerFactory, this.executorId, this); + } + + @VisibleForTesting + public void recoverPendingTasks(@Nonnull Clock clock) { + TaskRecoveryManager.recoverPendingTasks( + metaStoreManagerFactory, this.executorId, this, new PolarisConfigurationStore() {}, clock); + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskRecoveryManager.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskRecoveryManager.java new file mode 100644 index 0000000000..9470cb9774 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskRecoveryManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import jakarta.annotation.Nonnull; +import java.time.Clock; +import java.time.ZoneId; +import java.util.Map; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.core.persistence.dao.entity.BaseResult; +import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskRecoveryManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskRecoveryManager.class); + + public static void recoverPendingTasks( + MetaStoreManagerFactory metaStoreManagerFactory, + String executorId, + TaskExecutor taskExecutor) { + recoverPendingTasks( + metaStoreManagerFactory, + executorId, + taskExecutor, + new PolarisConfigurationStore() {}, + Clock.system(ZoneId.systemDefault())); + } + + public static void recoverPendingTasks( + @Nonnull MetaStoreManagerFactory metaStoreManagerFactory, + @Nonnull String executorId, + @Nonnull TaskExecutor taskExecutor, + @Nonnull PolarisConfigurationStore configurationStore, + @Nonnull Clock clock) { + for (Map.Entry entry : + metaStoreManagerFactory.getMetaStoreManagerMap().entrySet()) { + RealmContext realmContext = entry::getKey; + PolarisMetaStoreManager metaStoreManager = entry.getValue(); + BasePersistence metastore = + metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(); + // Construct a PolarisCallContext since the original one has lost + PolarisCallContext polarisCallContext = + new PolarisCallContext( + metastore, new PolarisDefaultDiagServiceImpl(), configurationStore, clock); + EntitiesResult entitiesResult = + metaStoreManager.loadTasks(polarisCallContext, executorId, PageToken.fromLimit(20)); + if (entitiesResult.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + entitiesResult.getEntities().stream() + .map(TaskEntity::of) + .forEach( + entity -> { + // Construct a CallContext since the original one has lost + CallContext callContext = CallContext.of(realmContext, polarisCallContext); + taskExecutor.addTaskHandlerContext(entity.getId(), callContext); + }); + } else { + LOGGER.error("Failed to recover pending tasks for {}", executorId); + } + } + } +}