Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ var (

historyStatusFlag = commandLineFlag{
name: "status",
usage: "Filter by execution status (running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded)",
usage: "Filter by execution status; accepts a single value or comma-separated values (running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded)",
}

historyRunIDFlag = commandLineFlag{
Expand Down
29 changes: 26 additions & 3 deletions internal/cmd/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Date/Time Filtering:
Note: --last cannot be combined with --from or --to

Status Filtering:
--status Filter by execution status (running, succeeded, failed, aborted, skipped, waiting, none)
--status Filter by execution status. Accepts a single status or comma-separated statuses
(running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded)

Other Filters:
--labels Filter by DAG labels (comma-separated, AND logic)
Expand All @@ -55,6 +56,7 @@ Examples:
dagu history --from 2026-01-01 # Runs since date
dagu history --last 7d # Last 7 days
dagu history --status failed # Only failed runs
dagu history --status running,queued # Running or queued runs
dagu history --format json # JSON output
dagu history --labels "prod,critical" # Filter by labels (AND logic)
dagu history --limit 50 # Limit to 50 results
Expand Down Expand Up @@ -260,12 +262,12 @@ func buildStatusOption(ctx *Context) (exec.ListDAGRunStatusesOption, error) {
return nil, nil
}

status, err := parseStatus(statusStr)
statuses, err := parseStatuses(statusStr)
if err != nil {
return nil, err
}

return exec.WithStatuses([]core.Status{status}), nil
return exec.WithStatuses(statuses), nil
}

// buildRunIDOption constructs run ID filtering option.
Expand Down Expand Up @@ -414,6 +416,27 @@ func parseStatus(s string) (core.Status, error) {
return core.NotStarted, fmt.Errorf("invalid status '%s'. Valid values: running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded", s)
}

// parseStatuses converts a comma-separated status string to core.Status values.
func parseStatuses(s string) ([]core.Status, error) {
parts := strings.Split(s, ",")
statuses := make([]core.Status, 0, len(parts))
for _, part := range parts {
trimmed := strings.TrimSpace(part)
if trimmed == "" {
continue
}
status, err := parseStatus(trimmed)
if err != nil {
return nil, err
}
statuses = append(statuses, status)
}
if len(statuses) == 0 {
return nil, fmt.Errorf("invalid status '%s'. Valid values: running, succeeded, failed, aborted, queued, waiting, rejected, not_started, partially_succeeded", s)
}
return statuses, nil
}

// parseLabels splits comma-separated labels and trims whitespace.
func parseLabels(s string) []string {
if s == "" {
Expand Down
53 changes: 53 additions & 0 deletions internal/cmd/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,59 @@ func TestParseStatus(t *testing.T) {
}
}

func TestParseStatuses(t *testing.T) {
t.Parallel()

tests := []struct {
name string
input string
expected []core.Status
wantErr bool
}{
{
name: "comma-separated statuses",
input: "running,queued",
expected: []core.Status{core.Running, core.Queued},
},
{
name: "trims spaces",
input: " running, succeeded ",
expected: []core.Status{core.Running, core.Succeeded},
},
{
name: "ignores empty entries",
input: "running,,queued,",
expected: []core.Status{core.Running, core.Queued},
},
{
name: "rejects empty list",
input: ",,",
wantErr: true,
},
{
name: "rejects mixed invalid status",
input: "running,unknown",
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := parseStatuses(tt.input)
if tt.wantErr {
require.Error(t, err)
assert.Contains(t, err.Error(), "invalid status")
return
}

require.NoError(t, err)
assert.Equal(t, tt.expected, got)
})
}
}

func TestParseLabels(t *testing.T) {
t.Parallel()

Expand Down
48 changes: 27 additions & 21 deletions internal/cmd/remote_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (e *remoteError) NotFound() bool {
}

type remoteHistoryQuery struct {
Name string
From *int64
To *int64
Status *int
RunID string
Labels []string
Name string
From *int64
To *int64
Statuses []int
RunID string
Labels []string
}

func newRemoteClient(ctx *clicontext.Context) (*remoteClient, error) {
Expand Down Expand Up @@ -159,21 +159,23 @@ func (c *remoteClient) enqueueDAG(ctx context.Context, fileName string, body api
}

func (c *remoteClient) listDAGRuns(ctx context.Context, query remoteHistoryQuery) ([]api.DAGRunSummary, error) {
params := map[string]string{}
params := url.Values{}
if query.From != nil {
params["fromDate"] = fmt.Sprintf("%d", *query.From)
params.Set("fromDate", fmt.Sprintf("%d", *query.From))
}
if query.To != nil {
params["toDate"] = fmt.Sprintf("%d", *query.To)
params.Set("toDate", fmt.Sprintf("%d", *query.To))
}
if query.Status != nil {
params["status"] = fmt.Sprintf("%d", *query.Status)
if len(query.Statuses) > 0 {
for _, status := range query.Statuses {
params.Add("status", fmt.Sprintf("%d", status))
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if query.RunID != "" {
params["dagRunId"] = query.RunID
params.Set("dagRunId", query.RunID)
}
if len(query.Labels) > 0 {
params["labels"] = strings.Join(query.Labels, ",")
params.Set("labels", strings.Join(query.Labels, ","))
}

var out struct {
Expand All @@ -183,7 +185,7 @@ func (c *remoteClient) listDAGRuns(ctx context.Context, query remoteHistoryQuery
if query.Name != "" {
path = "/dag-runs/" + url.PathEscape(query.Name)
}
if err := c.do(ctx, http.MethodGet, path, nil, &out, params); err != nil {
if err := c.doWithQueryValues(ctx, http.MethodGet, path, nil, &out, params); err != nil {
return nil, err
}
return out.DagRuns, nil
Expand Down Expand Up @@ -241,15 +243,19 @@ func dagRunPath(name, dagRunID string) string {
}

func (c *remoteClient) do(ctx context.Context, method, path string, body any, out any, query map[string]string) error {
values := url.Values{}
for key, value := range query {
if value != "" {
values.Set(key, value)
}
}
return c.doWithQueryValues(ctx, method, path, body, out, values)
}

func (c *remoteClient) doWithQueryValues(ctx context.Context, method, path string, body any, out any, query url.Values) error {
fullURL := c.baseURL + path
if len(query) > 0 {
values := url.Values{}
for key, value := range query {
if value != "" {
values.Set(key, value)
}
}
if encoded := values.Encode(); encoded != "" {
if encoded := query.Encode(); encoded != "" {
fullURL += "?" + encoded
}
}
Expand Down
47 changes: 29 additions & 18 deletions internal/cmd/remote_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,11 @@ func buildRemoteHistoryQuery(ctx *Context, args []string) (remoteHistoryQuery, i
}
statusValue, _ := ctx.StringParam("status")
if statusValue != "" {
s, err := remoteStatusValue(statusValue)
statuses, err := remoteStatusValues(statusValue)
if err != nil {
return query, 0, err
}
query.Status = &s
query.Statuses = statuses
}
runID, _ := ctx.StringParam("run-id")
query.RunID = runID
Expand All @@ -511,24 +511,35 @@ func buildRemoteHistoryQuery(ctx *Context, args []string) (remoteHistoryQuery, i
}

func remoteStatusValue(s string) (int, error) {
switch s {
case "running":
return int(core.Running), nil
case "succeeded":
return int(core.Succeeded), nil
case "failed":
return int(core.Failed), nil
case "aborted":
return int(core.Aborted), nil
case "queued":
return int(core.Queued), nil
case "waiting":
return int(core.Waiting), nil
case "none":
if strings.EqualFold(strings.TrimSpace(s), "none") {
return 0, fmt.Errorf("status %q is not supported in remote history", s)
default:
return 0, fmt.Errorf("invalid status %q", s)
}

status, err := parseStatus(s)
if err != nil {
return 0, err
}
return int(status), nil
}

func remoteStatusValues(s string) ([]int, error) {
parts := strings.Split(s, ",")
statuses := make([]int, 0, len(parts))
for _, part := range parts {
trimmed := strings.TrimSpace(part)
if trimmed == "" {
continue
}
status, err := remoteStatusValue(trimmed)
if err != nil {
return nil, err
}
statuses = append(statuses, status)
}
if len(statuses) == 0 {
return nil, fmt.Errorf("invalid status %q", s)
}
return statuses, nil
}

func waitForRemoteStop(ctx *Context, name, dagRunID string) error {
Expand Down
39 changes: 39 additions & 0 deletions internal/cmd/remote_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"net/http"
"net/http/httptest"
"testing"
"time"

Expand Down Expand Up @@ -88,6 +89,44 @@ func TestBuildRemoteHistoryQueryRejectsMalformedLimit(t *testing.T) {
assert.Contains(t, err.Error(), "greater than 0")
}

func TestBuildRemoteHistoryQueryParsesMultipleStatuses(t *testing.T) {
t.Parallel()

command := &cobra.Command{Use: "history"}
initFlags(command, historyFlags...)
require.NoError(t, command.Flags().Set("status", "running,queued"))

ctx := &Context{Command: command}
query, limit, err := buildRemoteHistoryQuery(ctx, nil)
require.NoError(t, err)

assert.Equal(t, 100, limit)
assert.Equal(t, []int{int(core.Running), int(core.Queued)}, query.Statuses)
}

func TestRemoteClientListDAGRunsUsesRepeatedStatusParams(t *testing.T) {
t.Parallel()

statusValues := make(chan []string, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
statusValues <- append([]string(nil), r.URL.Query()["status"]...)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"dagRuns":[]}`))
}))
defer server.Close()

client := &remoteClient{
baseURL: server.URL,
client: server.Client(),
}

_, err := client.listDAGRuns(context.Background(), remoteHistoryQuery{
Statuses: []int{int(core.Running), int(core.Queued)},
})
require.NoError(t, err)
assert.Equal(t, []string{"1", "5"}, <-statusValues)
}

func TestWaitForRemoteStopHonorsContextCancellation(t *testing.T) {
t.Parallel()

Expand Down
Loading