diff --git a/cmd/pilot/main.go b/cmd/pilot/main.go index ab797ce7..1afffc37 100644 --- a/cmd/pilot/main.go +++ b/cmd/pilot/main.go @@ -359,7 +359,7 @@ Examples: // Create dispatcher if store available if gwStore != nil { gwDispatcher = executor.NewDispatcher(gwStore, gwRunner, nil) - if dispErr := gwDispatcher.Start(); dispErr != nil { + if dispErr := gwDispatcher.Start(context.Background()); dispErr != nil { logging.WithComponent("start").Warn("Failed to start dispatcher for gateway polling", slog.Any("error", dispErr)) gwDispatcher = nil } @@ -1793,7 +1793,7 @@ func runPollingMode(cfg *config.Config, projectPath string, replace, dashboardMo var dispatcher *executor.Dispatcher if store != nil { dispatcher = executor.NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(ctx); err != nil { logging.WithComponent("start").Warn("Failed to start dispatcher", slog.Any("error", err)) dispatcher = nil } else { diff --git a/internal/executor/dispatcher.go b/internal/executor/dispatcher.go index 6fd56bb2..4b6e366a 100644 --- a/internal/executor/dispatcher.go +++ b/internal/executor/dispatcher.go @@ -16,15 +16,42 @@ import ( // DispatcherConfig configures the task dispatcher behavior. type DispatcherConfig struct { - // StaleTaskDuration is how long a "running" task can be stale before reset. - // Used on startup to detect crashed workers. + // StaleTaskDuration is a backwards-compat alias for StaleRunningThreshold. + // Deprecated: use StaleRunningThreshold instead. StaleTaskDuration time.Duration + + // StaleRunningThreshold is how long a "running" task can remain before + // it is considered orphaned and marked failed. Default: 30 minutes. + StaleRunningThreshold time.Duration + + // StaleQueuedThreshold is how long a "queued" task can remain without + // being picked up before it is considered stuck and marked failed. + // Default: 5 minutes. + StaleQueuedThreshold time.Duration + + // StaleRecoveryInterval is how often the periodic stale-recovery loop + // runs. Default: 5 minutes. + StaleRecoveryInterval time.Duration } // DefaultDispatcherConfig returns default dispatcher settings. func DefaultDispatcherConfig() *DispatcherConfig { return &DispatcherConfig{ - StaleTaskDuration: 30 * time.Minute, + StaleRunningThreshold: 30 * time.Minute, + StaleQueuedThreshold: 5 * time.Minute, + StaleRecoveryInterval: 5 * time.Minute, + } +} + +// resolveDefaults fills zero-valued fields with sensible defaults and +// applies the StaleTaskDuration backwards-compat alias. +func (c *DispatcherConfig) resolveDefaults() { + // Backwards compat: if only the deprecated field is set, use it. + if c.StaleRunningThreshold == 0 && c.StaleTaskDuration > 0 { + c.StaleRunningThreshold = c.StaleTaskDuration + } + if c.StaleRecoveryInterval == 0 { + c.StaleRecoveryInterval = 5 * time.Minute } } @@ -51,6 +78,7 @@ func NewDispatcher(store *memory.Store, runner *Runner, config *DispatcherConfig if config == nil { config = DefaultDispatcherConfig() } + config.resolveDefaults() ctx, cancel := context.WithCancel(context.Background()) @@ -72,18 +100,46 @@ func (d *Dispatcher) SetDecomposer(decomposer *TaskDecomposer) { d.decomposer = decomposer } -// Start initializes the dispatcher and recovers from any stale tasks. -func (d *Dispatcher) Start() error { +// Start initializes the dispatcher, recovers stale tasks, and launches the +// periodic stale-recovery loop. The provided context controls the loop lifetime. +func (d *Dispatcher) Start(ctx context.Context) error { d.log.Info("Starting dispatcher") - // Recover stale running tasks (from crashed workers) - if err := d.recoverStaleTasks(); err != nil { - d.log.Warn("Failed to recover stale tasks", slog.Any("error", err)) - } + // Initial recovery pass on startup. + d.recoverStaleTasks() + + // Launch periodic recovery loop. + d.wg.Add(1) + go d.runStaleRecoveryLoop(ctx) return nil } +// runStaleRecoveryLoop ticks every StaleRecoveryInterval and calls +// recoverStaleTasks. It stops when ctx is cancelled or the dispatcher stops. +func (d *Dispatcher) runStaleRecoveryLoop(ctx context.Context) { + defer d.wg.Done() + + interval := d.config.StaleRecoveryInterval + ticker := time.NewTicker(interval) + defer ticker.Stop() + + d.log.Info("Stale recovery loop started", slog.Duration("interval", interval)) + + for { + select { + case <-ctx.Done(): + d.log.Debug("Stale recovery loop stopped (context cancelled)") + return + case <-d.ctx.Done(): + d.log.Debug("Stale recovery loop stopped (dispatcher stopped)") + return + case <-ticker.C: + d.recoverStaleTasks() + } + } +} + // Stop gracefully stops all workers and the dispatcher. func (d *Dispatcher) Stop() { d.log.Info("Stopping dispatcher") @@ -101,31 +157,49 @@ func (d *Dispatcher) Stop() { d.log.Info("Dispatcher stopped") } -// recoverStaleTasks resets tasks that were left in "running" state -// from a previous crashed session. -func (d *Dispatcher) recoverStaleTasks() error { - stale, err := d.store.GetStaleRunningExecutions(d.config.StaleTaskDuration) +// recoverStaleTasks marks orphaned running and queued tasks as failed. +// Re-queuing without a worker just recreates the orphan, so we fail them. +func (d *Dispatcher) recoverStaleTasks() int { + var resetCount int + + // Recover stale running tasks (crashed workers). + staleRunning, err := d.store.GetStaleRunningExecutions(d.config.StaleRunningThreshold) if err != nil { - return err + d.log.Warn("Failed to fetch stale running executions", slog.Any("error", err)) } - - for _, exec := range stale { - d.log.Warn("Recovering stale task", + for _, exec := range staleRunning { + d.log.Warn("Marking stale running task as failed", slog.String("execution_id", exec.ID), slog.String("task_id", exec.TaskID), slog.Time("created_at", exec.CreatedAt), ) - // Reset to queued so it will be picked up again - if err := d.store.UpdateExecutionStatus(exec.ID, "queued", "recovered from stale running state"); err != nil { - d.log.Error("Failed to reset stale task", slog.String("id", exec.ID), slog.Any("error", err)) + if err := d.store.UpdateExecutionStatus(exec.ID, "failed", "stale running task recovered (orphaned worker)"); err != nil { + d.log.Error("Failed to mark stale running task", slog.String("id", exec.ID), slog.Any("error", err)) + } else { + resetCount++ } } - if len(stale) > 0 { - d.log.Info("Recovered stale tasks", slog.Int("count", len(stale))) + // Recover stale queued tasks (stuck in queue with no worker). + staleQueued, err := d.store.GetStaleQueuedExecutions(d.config.StaleQueuedThreshold) + if err != nil { + d.log.Warn("Failed to fetch stale queued executions", slog.Any("error", err)) + } + for _, exec := range staleQueued { + d.log.Warn("Marking stale queued task as failed", + slog.String("execution_id", exec.ID), + slog.String("task_id", exec.TaskID), + slog.Time("created_at", exec.CreatedAt), + ) + if err := d.store.UpdateExecutionStatus(exec.ID, "failed", "stale queued task recovered (no worker picked up)"); err != nil { + d.log.Error("Failed to mark stale queued task", slog.String("id", exec.ID), slog.Any("error", err)) + } else { + resetCount++ + } } - return nil + d.log.Info("stale recovery complete, reset N tasks", slog.Int("count", resetCount)) + return resetCount } // QueueTask adds a task to the execution queue and returns the execution ID. diff --git a/internal/executor/dispatcher_test.go b/internal/executor/dispatcher_test.go index cf605a52..4c082e29 100644 --- a/internal/executor/dispatcher_test.go +++ b/internal/executor/dispatcher_test.go @@ -40,7 +40,7 @@ func TestDispatcher_QueueTask(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() @@ -105,7 +105,7 @@ func TestDispatcher_DuplicateTask(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() @@ -140,7 +140,7 @@ func TestDispatcher_GetWorkerStatus(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() @@ -187,7 +187,7 @@ func TestDispatcher_MultipleProjects(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() @@ -439,19 +439,20 @@ func TestDispatcher_RecoverStaleTasks(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, config) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() - // Check that the task was reset to queued + // Check that the task was marked failed (not re-queued — re-queuing without + // a worker just recreates the orphan). updated, err := store.GetExecution("exec-recover") if err != nil { t.Fatalf("failed to get execution: %v", err) } - if updated.Status != "queued" { - t.Errorf("expected recovered task to have status 'queued', got '%s'", updated.Status) + if updated.Status != "failed" { + t.Errorf("expected recovered task to have status 'failed', got '%s'", updated.Status) } } @@ -486,7 +487,7 @@ func TestDispatcher_ExecutionStatusPath(t *testing.T) { runner := NewRunner() dispatcher := NewDispatcher(store, runner, nil) - if err := dispatcher.Start(); err != nil { + if err := dispatcher.Start(context.Background()); err != nil { t.Fatalf("failed to start dispatcher: %v", err) } defer dispatcher.Stop() @@ -517,3 +518,192 @@ func TestDispatcher_ExecutionStatusPath(t *testing.T) { t.Errorf("unexpected execution status: %s", exec.Status) } } + +func TestRecoverStaleTasks_QueuedAndRunning(t *testing.T) { + store, cleanup := setupTestStore(t) + defer cleanup() + + // Insert a stale running task and a stale queued task. + executions := []*memory.Execution{ + {ID: "exec-stale-run", TaskID: "TASK-RUN", ProjectPath: "/project", Status: "running"}, + {ID: "exec-stale-q", TaskID: "TASK-Q", ProjectPath: "/project", Status: "queued"}, + {ID: "exec-ok", TaskID: "TASK-OK", ProjectPath: "/project", Status: "completed"}, + } + for _, exec := range executions { + if err := store.SaveExecution(exec); err != nil { + t.Fatalf("failed to save execution: %v", err) + } + } + + // Use 0 thresholds so everything is stale immediately. + config := &DispatcherConfig{ + StaleRunningThreshold: 0, + StaleQueuedThreshold: 0, + StaleRecoveryInterval: time.Hour, // won't tick in this test + } + runner := NewRunner() + dispatcher := NewDispatcher(store, runner, config) + + if err := dispatcher.Start(context.Background()); err != nil { + t.Fatalf("failed to start dispatcher: %v", err) + } + defer dispatcher.Stop() + + // Both stale tasks should be failed. + for _, id := range []string{"exec-stale-run", "exec-stale-q"} { + exec, err := store.GetExecution(id) + if err != nil { + t.Fatalf("failed to get execution %s: %v", id, err) + } + if exec.Status != "failed" { + t.Errorf("expected %s to be 'failed', got '%s'", id, exec.Status) + } + } + + // Completed task should be untouched. + exec, err := store.GetExecution("exec-ok") + if err != nil { + t.Fatalf("failed to get execution: %v", err) + } + if exec.Status != "completed" { + t.Errorf("expected completed task to remain 'completed', got '%s'", exec.Status) + } +} + +func TestRecoverStaleTasks_RespectsThresholds(t *testing.T) { + store, cleanup := setupTestStore(t) + defer cleanup() + + // Insert running and queued tasks that were just created. + executions := []*memory.Execution{ + {ID: "exec-fresh-run", TaskID: "TASK-FR", ProjectPath: "/project", Status: "running"}, + {ID: "exec-fresh-q", TaskID: "TASK-FQ", ProjectPath: "/project", Status: "queued"}, + } + for _, exec := range executions { + if err := store.SaveExecution(exec); err != nil { + t.Fatalf("failed to save execution: %v", err) + } + } + + // Use very long thresholds so nothing is stale. + config := &DispatcherConfig{ + StaleRunningThreshold: 24 * time.Hour, + StaleQueuedThreshold: 24 * time.Hour, + StaleRecoveryInterval: time.Hour, + } + runner := NewRunner() + dispatcher := NewDispatcher(store, runner, config) + + if err := dispatcher.Start(context.Background()); err != nil { + t.Fatalf("failed to start dispatcher: %v", err) + } + defer dispatcher.Stop() + + // Nothing should have been marked failed. + for _, tc := range []struct { + id string + expect string + }{ + {"exec-fresh-run", "running"}, + {"exec-fresh-q", "queued"}, + } { + exec, err := store.GetExecution(tc.id) + if err != nil { + t.Fatalf("failed to get execution %s: %v", tc.id, err) + } + if exec.Status != tc.expect { + t.Errorf("expected %s to remain '%s', got '%s'", tc.id, tc.expect, exec.Status) + } + } +} + +func TestRunStaleRecoveryLoop_Periodic(t *testing.T) { + store, cleanup := setupTestStore(t) + defer cleanup() + + // Use a very short interval so the loop ticks quickly. + config := &DispatcherConfig{ + StaleRunningThreshold: 0, + StaleQueuedThreshold: 0, + StaleRecoveryInterval: 50 * time.Millisecond, + } + runner := NewRunner() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dispatcher := NewDispatcher(store, runner, config) + if err := dispatcher.Start(ctx); err != nil { + t.Fatalf("failed to start dispatcher: %v", err) + } + defer dispatcher.Stop() + + // Insert a stale task AFTER Start() (so the initial pass doesn't see it). + time.Sleep(20 * time.Millisecond) + exec := &memory.Execution{ + ID: "exec-periodic", + TaskID: "TASK-PERIODIC", + ProjectPath: "/project", + Status: "running", + } + if err := store.SaveExecution(exec); err != nil { + t.Fatalf("failed to save execution: %v", err) + } + + // Wait for the loop to tick and recover it. + time.Sleep(200 * time.Millisecond) + + updated, err := store.GetExecution("exec-periodic") + if err != nil { + t.Fatalf("failed to get execution: %v", err) + } + if updated.Status != "failed" { + t.Errorf("expected periodic recovery to mark task 'failed', got '%s'", updated.Status) + } +} + +func TestQueueTask_AfterRecovery(t *testing.T) { + store, cleanup := setupTestStore(t) + defer cleanup() + + // Insert a stale task for the same task ID we'll try to queue. + exec := &memory.Execution{ + ID: "exec-old", + TaskID: "TASK-REQUEUE", + ProjectPath: "/project", + Status: "running", + } + if err := store.SaveExecution(exec); err != nil { + t.Fatalf("failed to save execution: %v", err) + } + + // Start dispatcher with 0 threshold so it recovers immediately. + config := &DispatcherConfig{ + StaleRunningThreshold: 0, + StaleQueuedThreshold: 0, + StaleRecoveryInterval: time.Hour, + } + runner := NewRunner() + dispatcher := NewDispatcher(store, runner, config) + + if err := dispatcher.Start(context.Background()); err != nil { + t.Fatalf("failed to start dispatcher: %v", err) + } + defer dispatcher.Stop() + + // The old task should now be failed, so re-queuing the same task ID should succeed. + task := &Task{ + ID: "TASK-REQUEUE", + Title: "Re-queued after recovery", + Description: "Should succeed since old execution is failed", + ProjectPath: "/project", + } + + execID, err := dispatcher.QueueTask(context.Background(), task) + if err != nil { + t.Fatalf("expected re-queue to succeed after recovery, got error: %v", err) + } + if execID == "" { + t.Error("expected non-empty execution ID") + } +}