diff --git a/internal/cmd/flags.go b/internal/cmd/flags.go index 7ff6c3ec2..71e638087 100644 --- a/internal/cmd/flags.go +++ b/internal/cmd/flags.go @@ -367,7 +367,7 @@ var ( historyStatusFlag = commandLineFlag{ name: "status", - usage: "Filter by execution status (running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded)", + usage: "Filter by execution status; accepts a single value or comma-separated values (running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded)", } historyRunIDFlag = commandLineFlag{ diff --git a/internal/cmd/history.go b/internal/cmd/history.go index ce703a703..eae275587 100644 --- a/internal/cmd/history.go +++ b/internal/cmd/history.go @@ -34,7 +34,8 @@ Date/Time Filtering: Note: --last cannot be combined with --from or --to Status Filtering: - --status Filter by execution status (running, succeeded, failed, aborted, skipped, waiting, none) + --status Filter by execution status. Accepts a single status or comma-separated statuses + (running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded) Other Filters: --labels Filter by DAG labels (comma-separated, AND logic) @@ -55,6 +56,7 @@ Examples: dagu history --from 2026-01-01 # Runs since date dagu history --last 7d # Last 7 days dagu history --status failed # Only failed runs + dagu history --status running,queued # Running or queued runs dagu history --format json # JSON output dagu history --labels "prod,critical" # Filter by labels (AND logic) dagu history --limit 50 # Limit to 50 results @@ -260,12 +262,12 @@ func buildStatusOption(ctx *Context) (exec.ListDAGRunStatusesOption, error) { return nil, nil } - status, err := parseStatus(statusStr) + statuses, err := parseStatuses(statusStr) if err != nil { return nil, err } - return exec.WithStatuses([]core.Status{status}), nil + return exec.WithStatuses(statuses), nil } // buildRunIDOption constructs run ID filtering option. @@ -414,6 +416,27 @@ func parseStatus(s string) (core.Status, error) { return core.NotStarted, fmt.Errorf("invalid status '%s'. Valid values: running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded", s) } +// parseStatuses converts a comma-separated status string to core.Status values. +func parseStatuses(s string) ([]core.Status, error) { + parts := strings.Split(s, ",") + statuses := make([]core.Status, 0, len(parts)) + for _, part := range parts { + trimmed := strings.TrimSpace(part) + if trimmed == "" { + continue + } + status, err := parseStatus(trimmed) + if err != nil { + return nil, err + } + statuses = append(statuses, status) + } + if len(statuses) == 0 { + return nil, fmt.Errorf("invalid status '%s'. Valid values: running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded", s) + } + return statuses, nil +} + // parseLabels splits comma-separated labels and trims whitespace. func parseLabels(s string) []string { if s == "" { diff --git a/internal/cmd/history_test.go b/internal/cmd/history_test.go index 7f1a1f7f1..0a32cbafa 100644 --- a/internal/cmd/history_test.go +++ b/internal/cmd/history_test.go @@ -308,6 +308,59 @@ func TestParseStatus(t *testing.T) { } } +func TestParseStatuses(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input string + expected []core.Status + wantErr bool + }{ + { + name: "comma-separated statuses", + input: "running,queued", + expected: []core.Status{core.Running, core.Queued}, + }, + { + name: "trims spaces", + input: " running, succeeded ", + expected: []core.Status{core.Running, core.Succeeded}, + }, + { + name: "ignores empty entries", + input: "running,,queued,", + expected: []core.Status{core.Running, core.Queued}, + }, + { + name: "rejects empty list", + input: ",,", + wantErr: true, + }, + { + name: "rejects mixed invalid status", + input: "running,unknown", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := parseStatuses(tt.input) + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid status") + return + } + + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} + func TestParseLabels(t *testing.T) { t.Parallel() diff --git a/internal/cmd/remote_client.go b/internal/cmd/remote_client.go index 5b3f56795..0fbcdfe6f 100644 --- a/internal/cmd/remote_client.go +++ b/internal/cmd/remote_client.go @@ -46,12 +46,12 @@ func (e *remoteError) NotFound() bool { } type remoteHistoryQuery struct { - Name string - From *int64 - To *int64 - Status *int - RunID string - Labels []string + Name string + From *int64 + To *int64 + Statuses []int + RunID string + Labels []string } func newRemoteClient(ctx *clicontext.Context) (*remoteClient, error) { @@ -159,21 +159,23 @@ func (c *remoteClient) enqueueDAG(ctx context.Context, fileName string, body api } func (c *remoteClient) listDAGRuns(ctx context.Context, query remoteHistoryQuery) ([]api.DAGRunSummary, error) { - params := map[string]string{} + params := url.Values{} if query.From != nil { - params["fromDate"] = fmt.Sprintf("%d", *query.From) + params.Set("fromDate", fmt.Sprintf("%d", *query.From)) } if query.To != nil { - params["toDate"] = fmt.Sprintf("%d", *query.To) + params.Set("toDate", fmt.Sprintf("%d", *query.To)) } - if query.Status != nil { - params["status"] = fmt.Sprintf("%d", *query.Status) + if len(query.Statuses) > 0 { + for _, status := range query.Statuses { + params.Add("status", fmt.Sprintf("%d", status)) + } } if query.RunID != "" { - params["dagRunId"] = query.RunID + params.Set("dagRunId", query.RunID) } if len(query.Labels) > 0 { - params["labels"] = strings.Join(query.Labels, ",") + params.Set("labels", strings.Join(query.Labels, ",")) } var out struct { @@ -183,7 +185,7 @@ func (c *remoteClient) listDAGRuns(ctx context.Context, query remoteHistoryQuery if query.Name != "" { path = "/dag-runs/" + url.PathEscape(query.Name) } - if err := c.do(ctx, http.MethodGet, path, nil, &out, params); err != nil { + if err := c.doWithQueryValues(ctx, http.MethodGet, path, nil, &out, params); err != nil { return nil, err } return out.DagRuns, nil @@ -241,15 +243,19 @@ func dagRunPath(name, dagRunID string) string { } func (c *remoteClient) do(ctx context.Context, method, path string, body any, out any, query map[string]string) error { + values := url.Values{} + for key, value := range query { + if value != "" { + values.Set(key, value) + } + } + return c.doWithQueryValues(ctx, method, path, body, out, values) +} + +func (c *remoteClient) doWithQueryValues(ctx context.Context, method, path string, body any, out any, query url.Values) error { fullURL := c.baseURL + path if len(query) > 0 { - values := url.Values{} - for key, value := range query { - if value != "" { - values.Set(key, value) - } - } - if encoded := values.Encode(); encoded != "" { + if encoded := query.Encode(); encoded != "" { fullURL += "?" + encoded } } diff --git a/internal/cmd/remote_commands.go b/internal/cmd/remote_commands.go index c2a4ee2a2..53bcc807e 100644 --- a/internal/cmd/remote_commands.go +++ b/internal/cmd/remote_commands.go @@ -483,11 +483,11 @@ func buildRemoteHistoryQuery(ctx *Context, args []string) (remoteHistoryQuery, i } statusValue, _ := ctx.StringParam("status") if statusValue != "" { - s, err := remoteStatusValue(statusValue) + statuses, err := remoteStatusValues(statusValue) if err != nil { return query, 0, err } - query.Status = &s + query.Statuses = statuses } runID, _ := ctx.StringParam("run-id") query.RunID = runID @@ -511,24 +511,35 @@ func buildRemoteHistoryQuery(ctx *Context, args []string) (remoteHistoryQuery, i } func remoteStatusValue(s string) (int, error) { - switch s { - case "running": - return int(core.Running), nil - case "succeeded": - return int(core.Succeeded), nil - case "failed": - return int(core.Failed), nil - case "aborted": - return int(core.Aborted), nil - case "queued": - return int(core.Queued), nil - case "waiting": - return int(core.Waiting), nil - case "none": + if strings.EqualFold(strings.TrimSpace(s), "none") { return 0, fmt.Errorf("status %q is not supported in remote history", s) - default: - return 0, fmt.Errorf("invalid status %q", s) } + + status, err := parseStatus(s) + if err != nil { + return 0, err + } + return int(status), nil +} + +func remoteStatusValues(s string) ([]int, error) { + parts := strings.Split(s, ",") + statuses := make([]int, 0, len(parts)) + for _, part := range parts { + trimmed := strings.TrimSpace(part) + if trimmed == "" { + continue + } + status, err := remoteStatusValue(trimmed) + if err != nil { + return nil, err + } + statuses = append(statuses, status) + } + if len(statuses) == 0 { + return nil, fmt.Errorf("invalid status %q", s) + } + return statuses, nil } func waitForRemoteStop(ctx *Context, name, dagRunID string) error { diff --git a/internal/cmd/remote_commands_test.go b/internal/cmd/remote_commands_test.go index c36bc2c1c..aa8d19764 100644 --- a/internal/cmd/remote_commands_test.go +++ b/internal/cmd/remote_commands_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "net/http" + "net/http/httptest" "testing" "time" @@ -88,6 +89,44 @@ func TestBuildRemoteHistoryQueryRejectsMalformedLimit(t *testing.T) { assert.Contains(t, err.Error(), "greater than 0") } +func TestBuildRemoteHistoryQueryParsesMultipleStatuses(t *testing.T) { + t.Parallel() + + command := &cobra.Command{Use: "history"} + initFlags(command, historyFlags...) + require.NoError(t, command.Flags().Set("status", "running,queued")) + + ctx := &Context{Command: command} + query, limit, err := buildRemoteHistoryQuery(ctx, nil) + require.NoError(t, err) + + assert.Equal(t, 100, limit) + assert.Equal(t, []int{int(core.Running), int(core.Queued)}, query.Statuses) +} + +func TestRemoteClientListDAGRunsUsesRepeatedStatusParams(t *testing.T) { + t.Parallel() + + statusValues := make(chan []string, 1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + statusValues <- append([]string(nil), r.URL.Query()["status"]...) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"dagRuns":[]}`)) + })) + defer server.Close() + + client := &remoteClient{ + baseURL: server.URL, + client: server.Client(), + } + + _, err := client.listDAGRuns(context.Background(), remoteHistoryQuery{ + Statuses: []int{int(core.Running), int(core.Queued)}, + }) + require.NoError(t, err) + assert.Equal(t, []string{"1", "5"}, <-statusValues) +} + func TestWaitForRemoteStopHonorsContextCancellation(t *testing.T) { t.Parallel() diff --git a/internal/runtime/runner_test.go b/internal/runtime/runner_test.go index d0500f1af..3baf9b4fe 100644 --- a/internal/runtime/runner_test.go +++ b/internal/runtime/runner_test.go @@ -1747,34 +1747,96 @@ func TestRunner_DryRunWithHandlers(t *testing.T) { } func TestRunner_ConcurrentExecution(t *testing.T) { - steps := func() []core.Step { + sequentialGuardScript := func(name, lockDir string) string { + if windowsShellTest() { + return fmt.Sprintf(` + $lockDir = %s + if (-not (New-Item -ItemType Directory -Path $lockDir -ErrorAction SilentlyContinue)) { + Write-Error "sequential step %s overlapped another active step" + exit 1 + } + try { + %s + } finally { + Remove-Item -LiteralPath $lockDir -Force + } + `, test.PowerShellQuote(shellTestPath(lockDir)), name, test.Sleep(platformTestDuration(300*time.Millisecond, 600*time.Millisecond))) + } + + return fmt.Sprintf(` + lock_dir=%s + if ! mkdir "$lock_dir"; then + echo "sequential step %s overlapped another active step" >&2 + exit 1 + fi + trap 'rmdir "$lock_dir"' EXIT + %s + `, test.PosixQuote(lockDir), name, test.Sleep(300*time.Millisecond)) + } + + concurrentBarrierScript := func(name, readyDir string, readyCount int, timeout time.Duration) string { + if windowsShellTest() { + return fmt.Sprintf(` + $readyDir = %s + New-Item -ItemType Directory -Path $readyDir -Force | Out-Null + New-Item -ItemType File -Path (Join-Path $readyDir %s) -Force | Out-Null + $deadline = (Get-Date).AddSeconds(%d) + while (@(Get-ChildItem -LiteralPath $readyDir -File).Count -lt %d) { + if ((Get-Date) -ge $deadline) { + Write-Error "concurrent step %s did not observe all active steps" + exit 1 + } + Start-Sleep -Milliseconds 50 + } + `, test.PowerShellQuote(shellTestPath(readyDir)), test.PowerShellQuote(name), int(timeout/time.Second), readyCount, name) + } + + return fmt.Sprintf(` + ready_dir=%s + mkdir -p "$ready_dir" + : > "$ready_dir/%s" + deadline=$(( $(date +%%s) + %d )) + while true; do + ready_count=$(find "$ready_dir" -type f | wc -l | tr -d '[:space:]') + if [ "$ready_count" -ge %d ]; then + break + fi + if [ "$(date +%%s)" -ge "$deadline" ]; then + echo "concurrent step %s did not observe all active steps" >&2 + exit 1 + fi + sleep 0.05 + done + `, test.PosixQuote(readyDir), name, int(timeout/time.Second), readyCount, name) + } + + steps := func(script func(string) string) []core.Step { return []core.Step{ - newStep("1", withScript("sleep 0.3")), - newStep("2", withScript("sleep 0.3")), - newStep("3", withScript("sleep 0.3")), + newStep("1", withScript(script("1"))), + newStep("2", withScript(script("2"))), + newStep("3", withScript(script("3"))), } } + lockDir := filepath.Join(t.TempDir(), "active-step") sequential := setupRunner(t, withMaxActiveRuns(1)) - planSequential := sequential.newPlan(t, steps()...) - startSequential := time.Now() + planSequential := sequential.newPlan(t, steps(func(name string) string { + return sequentialGuardScript(name, lockDir) + })...) resultSequential := planSequential.assertRun(t, core.Succeeded) - elapsedSequential := time.Since(startSequential) resultSequential.assertNodeStatus(t, "1", core.NodeSucceeded) resultSequential.assertNodeStatus(t, "2", core.NodeSucceeded) resultSequential.assertNodeStatus(t, "3", core.NodeSucceeded) + readyDir := filepath.Join(t.TempDir(), "ready") concurrent := setupRunner(t, withMaxActiveRuns(3)) - planConcurrent := concurrent.newPlan(t, steps()...) - startConcurrent := time.Now() + planConcurrent := concurrent.newPlan(t, steps(func(name string) string { + return concurrentBarrierScript(name, readyDir, 3, platformTestDuration(10*time.Second, 30*time.Second)) + })...) resultConcurrent := planConcurrent.assertRun(t, core.Succeeded) - elapsedConcurrent := time.Since(startConcurrent) resultConcurrent.assertNodeStatus(t, "1", core.NodeSucceeded) resultConcurrent.assertNodeStatus(t, "2", core.NodeSucceeded) resultConcurrent.assertNodeStatus(t, "3", core.NodeSucceeded) - - assert.Greater(t, elapsedSequential, elapsedConcurrent) - assert.Greater(t, elapsedSequential-elapsedConcurrent, 200*time.Millisecond) } func TestRunner_ErrorHandling(t *testing.T) {