Skip to content

Support retrying non-finished async tasks on startup and periodically #2003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

danielhumanmod
Copy link
Contributor

Fix #774

Context

Polaris uses async tasks to perform operations such as table and manifest file cleanup. These tasks are executed asynchronously in a separate thread within the same JVM, and retries are handled inline within the task execution. However, this mechanism does not guarantee eventual execution in the following cases:

  • The task fails repeatedly and hits the maximum retry limit.
  • The service crashes or shuts down before retrying.

Implementation

Persist failed tasks and introduce a retry mechanism triggered during Polaris startup and via periodic background checks, changes included:

  1. Metastore Layer:
    • Exposes a new API getMetaStoreManagerMap
    • Ensures LAST_ATTEMPT_START_TIME set for each task entity creation, which is important for time-out filtering when loadTasks() from metastore, so that prevent multiple executors from picking the same task
  2. TaskRecoveryManager: New class responsible for task recovery logic, including:
    • Constructing executionPolarisCallContext
    • Loading tasks from metastore
    • Triggering task execution
  3. QuarkusTaskExecutorImpl: Hook into application lifecycle to initiate task recovery.
  4. Task Retry Strategy: Failed tasks remain persisted in the metastore and are retried by the recovery manager.
  5. Tests: Adjusted existing tests and added new coverage for recovery behavior.

Recommended Review Order

  1. Metastore Layer related code
  2. TaskRecoveryManager
  3. QuarkusTaskExecutorImpl and TaskExecutorImpl
  4. Task cleanup handlers
  5. Tests

@@ -147,6 +151,8 @@ public void testTableCleanup() throws IOException {

handler.handleTask(task, callContext);

timeSource.add(Duration.ofMinutes(10));
Copy link
Contributor Author

@danielhumanmod danielhumanmod Jul 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adnanhemani continue the previous comment here:

Can you explain this further - I'm not sure why the tests need this 10m jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?

metaStoreManager.loadTasks fetches available tasks from the metastore — meaning tasks that are either not leased by any executor or whose lease has already timed out (after 5 minutes).

In this test, the new tasks are created and not executed by its parent task, so to ensure these tasks are fetched, we need to simulate a time gap longer than 5 minutes.

@@ -243,6 +244,11 @@ public synchronized EntityCache getOrCreateEntityCache(RealmContext realmContext
return entityCacheMap.get(realmContext.getRealmIdentifier());
}

@Override
public Iterator<Map.Entry<String, PolarisMetaStoreManager>> getMetaStoreManagerMap() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it to Iter<Map.Entry> as https://github.com/apache/polaris/pull/1585/files#r2121942046 suggests

@@ -188,6 +193,9 @@ private Stream<TaskEntity> getManifestTaskStream(
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf)))
.withLastAttemptExecutorId(executorId)
.withAttemptCount(1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adnanhemani continue previous discussion here:

How can we assume this?

This new task (ManifestFileCleanupTask) is created by the current task (TableCleanupTask) and will be executed immediately at the end of it.
Since it’s its first execution, we set the attemptCount to 1 here.

configurationStore,
clock);
EntitiesResult entitiesResult =
metaStoreManager.loadTasks(polarisCallContext, executorId, PageToken.readEverything());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adnanhemani continue previous comment here:

I'm not sure I'm understanding the logic here: we are asking for 20 tasks here - but what if there are more than 20 tasks that need recovery?

Good catch, we should update to read all pending tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task handling is incomplete
1 participant