diff --git a/flake.nix b/flake.nix index e1ce42a0..ce155453 100644 --- a/flake.nix +++ b/flake.nix @@ -19,7 +19,7 @@ src = ./.; - vendorHash = "sha256-1aduKyNYpTt0ZVw14BsZLQsqp8XTJ2fy4zA8HCWbZWs="; + vendorHash = "sha256-0YZ+NGX39j0tYtMAZt2nHacgbAFJ0Ol/EbSb0kq10+w="; subPackages = [ "cmd/roborev" ]; diff --git a/go.mod b/go.mod index d421c55b..224f8918 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,8 @@ require ( github.com/charmbracelet/x/term v0.2.2 // indirect github.com/clipperhouse/displaywidth v0.9.0 // indirect github.com/clipperhouse/stringish v0.1.1 // indirect - github.com/clipperhouse/uax29/v2 v2.5.0 // indirect + github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/danielgtaylor/huma/v2 v2.37.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -61,7 +62,7 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect - github.com/spf13/pflag v1.0.9 // indirect + github.com/spf13/pflag v1.0.10 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect github.com/yuin/goldmark v1.7.13 // indirect github.com/yuin/goldmark-emoji v1.0.6 // indirect diff --git a/go.sum b/go.sum index f6432f6a..90fd4274 100644 --- a/go.sum +++ b/go.sum @@ -38,12 +38,16 @@ github.com/clipperhouse/stringish v0.1.1 h1:+NSqMOr3GR6k1FdRhhnXrLfztGzuG+VuFDfa github.com/clipperhouse/stringish v0.1.1/go.mod h1:v/WhFtE1q0ovMta2+m+UbpZ+2/HEXNWYXQgCt4hdOzA= github.com/clipperhouse/uax29/v2 v2.5.0 h1:x7T0T4eTHDONxFJsL94uKNKPHrclyFI0lm7+w94cO8U= github.com/clipperhouse/uax29/v2 v2.5.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g= +github.com/clipperhouse/uax29/v2 v2.7.0 h1:+gs4oBZ2gPfVrKPthwbMzWZDaAFPGYK72F0NJv2v7Vk= +github.com/clipperhouse/uax29/v2 v2.7.0/go.mod h1:EFJ2TJMRUaplDxHKj1qAEhCtQPW2tJSwu5BF98AuoVM= github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ= github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko= github.com/coreos/go-systemd/v22 v22.7.0 h1:LAEzFkke61DFROc7zNLX/WA2i5J8gYqe0rSj9KI28KA= github.com/coreos/go-systemd/v22 v22.7.0/go.mod h1:xNUYtjHu2EDXbsxz1i41wouACIwT7Ybq9o0BQhMwD0w= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/danielgtaylor/huma/v2 v2.37.3 h1:6Av0Vj45Vk5lDxRVfoO2iPlEdvCvwLc7pl5nbqGOkYM= +github.com/danielgtaylor/huma/v2 v2.37.3/go.mod h1:OeHHtCEAaNiuVbAVdYu4IQ0UOmnb4x3yMUOShNlZ53g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -133,6 +137,8 @@ github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/internal/daemon/huma_handlers.go b/internal/daemon/huma_handlers.go new file mode 100644 index 00000000..a6d13f31 --- /dev/null +++ b/internal/daemon/huma_handlers.go @@ -0,0 +1,543 @@ +package daemon + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log" + "path/filepath" + "time" + + "github.com/danielgtaylor/huma/v2" + + "github.com/roborev-dev/roborev/internal/storage" + "github.com/roborev-dev/roborev/internal/version" +) + +// limitNotProvided is the sentinel default for the Limit +// query parameter. A distinct value is needed so that an +// explicit limit=-1 (which legacy clients use to mean +// "unlimited") is distinguishable from "parameter omitted". +const limitNotProvided = -999999 + +func (s *Server) humaListJobs( + ctx context.Context, input *ListJobsInput, +) (*ListJobsOutput, error) { + // Single job lookup by ID (>= 0 because ID=0 should + // return empty, not fall through to the list path). + if input.ID >= 0 { + job, err := s.db.GetJobByID(input.ID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + resp := &ListJobsOutput{} + resp.Body.Jobs = []storage.ReviewJob{} + return resp, nil + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("database error: %v", err), + ) + } + job.Patch = nil + resp := &ListJobsOutput{} + resp.Body.Jobs = []storage.ReviewJob{*job} + return resp, nil + } + + repo := input.Repo + if repo != "" { + repo = filepath.ToSlash(filepath.Clean(repo)) + } + repoPrefix := input.RepoPrefix + if repoPrefix != "" { + repoPrefix = filepath.ToSlash(filepath.Clean(repoPrefix)) + } + + const maxLimit = 10000 + limit := 50 + switch { + case input.Limit == limitNotProvided: + // Not provided — use default + case input.Limit < 0: + limit = 0 // any negative → unlimited (legacy behavior) + default: + limit = input.Limit + } + if limit > maxLimit { + limit = maxLimit + } + + offset := max(input.Offset, 0) + if limit == 0 { + offset = 0 + } + + fetchLimit := limit + if limit > 0 { + fetchLimit = limit + 1 + } + + var listOpts []storage.ListJobsOption + if input.GitRef != "" { + listOpts = append( + listOpts, storage.WithGitRef(input.GitRef), + ) + } + if input.Branch != "" { + if input.BranchIncludeEmpty == "true" { + listOpts = append( + listOpts, + storage.WithBranchOrEmpty(input.Branch), + ) + } else { + listOpts = append( + listOpts, storage.WithBranch(input.Branch), + ) + } + } + if input.Closed == "true" || input.Closed == "false" { + listOpts = append( + listOpts, + storage.WithClosed(input.Closed == "true"), + ) + } + if input.JobType != "" { + listOpts = append( + listOpts, storage.WithJobType(input.JobType), + ) + } + if input.ExcludeJobType != "" { + listOpts = append( + listOpts, + storage.WithExcludeJobType(input.ExcludeJobType), + ) + } + if repoPrefix != "" && repo == "" { + listOpts = append( + listOpts, storage.WithRepoPrefix(repoPrefix), + ) + } + if input.Before > 0 { + listOpts = append( + listOpts, storage.WithBeforeCursor(input.Before), + ) + } + + jobs, err := s.db.ListJobs( + input.Status, repo, fetchLimit, offset, listOpts..., + ) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("list jobs: %v", err), + ) + } + + hasMore := false + if limit > 0 && len(jobs) > limit { + hasMore = true + jobs = jobs[:limit] + } + + // Stats use same repo/branch filters but ignore closed + // and pagination. + var statsOpts []storage.ListJobsOption + if input.Branch != "" { + if input.BranchIncludeEmpty == "true" { + statsOpts = append( + statsOpts, + storage.WithBranchOrEmpty(input.Branch), + ) + } else { + statsOpts = append( + statsOpts, + storage.WithBranch(input.Branch), + ) + } + } + if repoPrefix != "" && repo == "" { + statsOpts = append( + statsOpts, storage.WithRepoPrefix(repoPrefix), + ) + } + stats, statsErr := s.db.CountJobStats(repo, statsOpts...) + if statsErr != nil { + log.Printf( + "Warning: failed to count job stats: %v", statsErr, + ) + } + + resp := &ListJobsOutput{} + resp.Body.Jobs = jobs + resp.Body.HasMore = hasMore + resp.Body.Stats = &stats + return resp, nil +} + +func (s *Server) humaGetReview( + ctx context.Context, input *GetReviewInput, +) (*GetReviewOutput, error) { + var review *storage.Review + var err error + + if input.JobID >= 0 { + review, err = s.db.GetReviewByJobID(input.JobID) + } else if input.SHA != "" { + review, err = s.db.GetReviewByCommitSHA(input.SHA) + } else { + return nil, huma.Error400BadRequest( + "job_id or sha parameter required", + ) + } + + if err != nil { + return nil, huma.Error404NotFound("review not found") + } + + return &GetReviewOutput{Body: review}, nil +} + +func (s *Server) humaListComments( + ctx context.Context, input *ListCommentsInput, +) (*ListCommentsOutput, error) { + var responses []storage.Response + var err error + + if input.JobID >= 0 { + responses, err = s.db.GetCommentsForJob(input.JobID) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("get responses: %v", err), + ) + } + } else if input.CommitID >= 0 { + responses, err = s.db.GetCommentsForCommit(input.CommitID) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("get responses: %v", err), + ) + } + } else if input.SHA != "" { + responses, err = s.db.GetCommentsForCommitSHA(input.SHA) + if err != nil { + return nil, huma.Error404NotFound("commit not found") + } + } else { + return nil, huma.Error400BadRequest( + "job_id, commit_id, or sha parameter required", + ) + } + + resp := &ListCommentsOutput{} + resp.Body.Responses = responses + return resp, nil +} + +func (s *Server) humaListRepos( + ctx context.Context, input *ListReposInput, +) (*ListReposOutput, error) { + prefix := input.Prefix + if prefix != "" { + prefix = filepath.ToSlash(filepath.Clean(prefix)) + } + + var repoOpts []storage.ListReposOption + if prefix != "" { + repoOpts = append( + repoOpts, storage.WithRepoPathPrefix(prefix), + ) + } + if input.Branch != "" { + repoOpts = append( + repoOpts, storage.WithRepoBranch(input.Branch), + ) + } + + repos, totalCount, err := s.db.ListReposWithReviewCounts( + repoOpts..., + ) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("list repos: %v", err), + ) + } + + resp := &ListReposOutput{} + resp.Body.Repos = repos + resp.Body.TotalCount = totalCount + return resp, nil +} + +func (s *Server) humaListBranches( + ctx context.Context, input *ListBranchesInput, +) (*ListBranchesOutput, error) { + // Filter out empty strings to treat ?repo= as no filter + var repoPaths []string + for _, p := range input.Repo { + if p != "" { + repoPaths = append( + repoPaths, + filepath.ToSlash(filepath.Clean(p)), + ) + } + } + + result, err := s.db.ListBranchesWithCounts(repoPaths) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("list branches: %v", err), + ) + } + + resp := &ListBranchesOutput{} + resp.Body.Branches = result.Branches + resp.Body.TotalCount = result.TotalCount + resp.Body.NullsRemaining = result.NullsRemaining + return resp, nil +} + +func (s *Server) humaGetStatus( + ctx context.Context, input *GetStatusInput, +) (*GetStatusOutput, error) { + queued, running, done, failed, canceled, + applied, rebased, err := s.db.GetJobCounts() + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("get counts: %v", err), + ) + } + + configReloadedAt := "" + if t := s.configWatcher.LastReloadedAt(); !t.IsZero() { + configReloadedAt = t.Format(time.RFC3339Nano) + } + configReloadCounter := s.configWatcher.ReloadCounter() + + resp := &GetStatusOutput{} + resp.Body = storage.DaemonStatus{ + Version: version.Version, + QueuedJobs: queued, + RunningJobs: running, + CompletedJobs: done, + FailedJobs: failed, + CanceledJobs: canceled, + AppliedJobs: applied, + RebasedJobs: rebased, + ActiveWorkers: s.workerPool.ActiveWorkers(), + MaxWorkers: s.workerPool.MaxWorkers(), + MachineID: s.getMachineID(), + ConfigReloadedAt: configReloadedAt, + ConfigReloadCounter: configReloadCounter, + } + return resp, nil +} + +func (s *Server) humaGetSummary( + ctx context.Context, input *GetSummaryInput, +) (*GetSummaryOutput, error) { + since := time.Now().Add(-7 * 24 * time.Hour) + if input.Since != "" { + d, err := parseDuration(input.Since) + if err != nil { + return nil, huma.Error400BadRequest( + fmt.Sprintf("invalid since value: %s", input.Since), + ) + } + since = time.Now().Add(-d) + } + + opts := storage.SummaryOptions{ + RepoPath: input.Repo, + Branch: input.Branch, + Since: since, + AllRepos: input.All == "true", + } + + summary, err := s.db.GetSummary(opts) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("get summary: %v", err), + ) + } + + return &GetSummaryOutput{Body: summary}, nil +} + +func (s *Server) humaCancelJob( + ctx context.Context, input *CancelJobInput, +) (*CancelJobOutput, error) { + if input.Body.JobID == 0 { + return nil, huma.Error400BadRequest( + "job_id is required", + ) + } + + if err := s.db.CancelJob(input.Body.JobID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, huma.Error404NotFound( + "job not found or not cancellable", + ) + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("cancel job: %v", err), + ) + } + + s.workerPool.CancelJob(input.Body.JobID) + + resp := &CancelJobOutput{} + resp.Body.Success = true + return resp, nil +} + +func (s *Server) humaRerunJob( + ctx context.Context, input *RerunJobInput, +) (*RerunJobOutput, error) { + if input.Body.JobID == 0 { + return nil, huma.Error400BadRequest( + "job_id is required", + ) + } + + job, err := s.db.GetJobByID(input.Body.JobID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, huma.Error404NotFound( + "job not found or not rerunnable", + ) + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("load job: %v", err), + ) + } + + model, provider, err := resolveRerunModelProvider( + job, s.configWatcher.Config(), + ) + if err != nil { + return nil, huma.Error400BadRequest(err.Error()) + } + + err = s.db.ReenqueueJob( + input.Body.JobID, + storage.ReenqueueOpts{ + Model: model, + Provider: provider, + }, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, huma.Error404NotFound( + "job not found or not rerunnable", + ) + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("rerun job: %v", err), + ) + } + + resp := &RerunJobOutput{} + resp.Body.Success = true + return resp, nil +} + +func (s *Server) humaCloseReview( + ctx context.Context, input *CloseReviewInput, +) (*CloseReviewOutput, error) { + if input.Body.JobID == 0 { + return nil, huma.Error400BadRequest( + "job_id is required", + ) + } + + err := s.db.MarkReviewClosedByJobID( + input.Body.JobID, input.Body.Closed, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, huma.Error404NotFound( + "review not found for job", + ) + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("mark closed: %v", err), + ) + } + + eventType := "review.closed" + if !input.Body.Closed { + eventType = "review.reopened" + } + evt := Event{ + Type: eventType, + TS: time.Now(), + JobID: input.Body.JobID, + } + if job, err := s.db.GetJobByID(input.Body.JobID); err == nil { + evt.Repo = job.RepoPath + evt.RepoName = job.RepoName + evt.SHA = job.GitRef + evt.Agent = job.Agent + } + s.broadcaster.Broadcast(evt) + + resp := &CloseReviewOutput{} + resp.Body.Success = true + return resp, nil +} + +func (s *Server) humaAddComment( + ctx context.Context, input *AddCommentInput, +) (*AddCommentOutput, error) { + if input.Body.Commenter == "" || input.Body.Comment == "" { + return nil, huma.Error400BadRequest( + "commenter and comment are required", + ) + } + + if input.Body.JobID == 0 && input.Body.SHA == "" { + return nil, huma.Error400BadRequest( + "job_id or sha is required", + ) + } + + var resp *storage.Response + var err error + + if input.Body.JobID != 0 { + resp, err = s.db.AddCommentToJob( + input.Body.JobID, + input.Body.Commenter, + input.Body.Comment, + ) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, huma.Error404NotFound( + "job not found", + ) + } + return nil, huma.Error500InternalServerError( + fmt.Sprintf("add comment: %v", err), + ) + } + } else { + commit, commitErr := s.db.GetCommitBySHA(input.Body.SHA) + if commitErr != nil { + return nil, huma.Error404NotFound( + "commit not found", + ) + } + + resp, err = s.db.AddComment( + commit.ID, + input.Body.Commenter, + input.Body.Comment, + ) + if err != nil { + return nil, huma.Error500InternalServerError( + fmt.Sprintf("add comment: %v", err), + ) + } + } + + return &AddCommentOutput{Body: resp}, nil +} diff --git a/internal/daemon/huma_routes.go b/internal/daemon/huma_routes.go new file mode 100644 index 00000000..6a02a47c --- /dev/null +++ b/internal/daemon/huma_routes.go @@ -0,0 +1,104 @@ +package daemon + +import ( + "net/http" + + "github.com/danielgtaylor/huma/v2" + "github.com/danielgtaylor/huma/v2/adapters/humago" + + "github.com/roborev-dev/roborev/internal/version" +) + +// registerHumaAPI creates a Huma API on the given mux and registers +// all typed endpoints. The returned huma.API can be used to serve +// the generated OpenAPI spec. +func (s *Server) registerHumaAPI(mux *http.ServeMux) huma.API { + cfg := huma.DefaultConfig("roborev", version.Version) + cfg.DocsPath = "" + cfg.SchemasPath = "" + api := humago.New(mux, cfg) + + huma.Get(api, "/api/jobs", s.humaListJobs, + func(o *huma.Operation) { + o.OperationID = "list-jobs" + o.Summary = "List review jobs" + o.Tags = []string{"jobs"} + }) + + huma.Get(api, "/api/review", s.humaGetReview, + func(o *huma.Operation) { + o.OperationID = "get-review" + o.Summary = "Get a review by job ID or SHA" + o.Tags = []string{"reviews"} + }) + + // /api/job/output is registered as a plain HandleFunc + // (not Huma) because its stream=1 mode uses NDJSON + // streaming which doesn't fit Huma's typed response model. + + huma.Get(api, "/api/comments", s.humaListComments, + func(o *huma.Operation) { + o.OperationID = "list-comments" + o.Summary = "List comments for a job or commit" + o.Tags = []string{"comments"} + }) + + huma.Get(api, "/api/repos", s.humaListRepos, + func(o *huma.Operation) { + o.OperationID = "list-repos" + o.Summary = "List repos with job counts" + o.Tags = []string{"repos"} + }) + + huma.Get(api, "/api/branches", s.humaListBranches, + func(o *huma.Operation) { + o.OperationID = "list-branches" + o.Summary = "List branches with job counts" + o.Tags = []string{"repos"} + }) + + huma.Get(api, "/api/status", s.humaGetStatus, + func(o *huma.Operation) { + o.OperationID = "get-status" + o.Summary = "Get daemon status" + o.Tags = []string{"daemon"} + }) + + huma.Get(api, "/api/summary", s.humaGetSummary, + func(o *huma.Operation) { + o.OperationID = "get-summary" + o.Summary = "Get review summary statistics" + o.Tags = []string{"daemon"} + }) + + huma.Post(api, "/api/job/cancel", s.humaCancelJob, + func(o *huma.Operation) { + o.OperationID = "cancel-job" + o.Summary = "Cancel a queued or running job" + o.Tags = []string{"jobs"} + }) + + huma.Post(api, "/api/job/rerun", s.humaRerunJob, + func(o *huma.Operation) { + o.OperationID = "rerun-job" + o.Summary = "Re-enqueue a completed or failed job" + o.Tags = []string{"jobs"} + }) + + huma.Post(api, "/api/review/close", s.humaCloseReview, + func(o *huma.Operation) { + o.OperationID = "close-review" + o.Summary = "Close or reopen a review" + o.Tags = []string{"reviews"} + }) + + huma.Post(api, "/api/comment", s.humaAddComment, + func(o *huma.Operation) { + o.OperationID = "add-comment" + o.Summary = "Add a comment to a job or commit" + o.Tags = []string{"comments"} + o.DefaultStatus = 201 + }) + + return api +} diff --git a/internal/daemon/huma_routes_test.go b/internal/daemon/huma_routes_test.go new file mode 100644 index 00000000..112d85ab --- /dev/null +++ b/internal/daemon/huma_routes_test.go @@ -0,0 +1,425 @@ +package daemon + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/roborev-dev/roborev/internal/storage" + "github.com/roborev-dev/roborev/internal/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// serveHuma sends a request through the server's mux (which +// includes Huma-registered routes) and returns the recorder. +func serveHuma( + t *testing.T, srv *Server, method, path string, body []byte, +) *httptest.ResponseRecorder { + t.Helper() + var req *http.Request + if body != nil { + req = httptest.NewRequest( + method, path, bytes.NewReader(body), + ) + req.Header.Set("Content-Type", "application/json") + } else { + req = httptest.NewRequest(method, path, nil) + } + rr := httptest.NewRecorder() + srv.httpServer.Handler.ServeHTTP(rr, req) + return rr +} + +func TestHumaListJobs(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + testutil.CreateTestJobs(t, db, repo, 5, "test-agent") + + t.Run("returns all jobs", func(t *testing.T) { + rr := serveHuma( + t, srv, http.MethodGet, "/api/jobs", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var body struct { + Jobs []storage.ReviewJob `json:"jobs"` + HasMore bool `json:"has_more"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &body)) + assert.Len(t, body.Jobs, 5) + assert.False(t, body.HasMore) + }) + + t.Run("limit and has_more", func(t *testing.T) { + rr := serveHuma( + t, srv, http.MethodGet, "/api/jobs?limit=3", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var body struct { + Jobs []storage.ReviewJob `json:"jobs"` + HasMore bool `json:"has_more"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &body)) + assert.Len(t, body.Jobs, 3) + assert.True(t, body.HasMore) + }) +} + +func TestHumaListJobsCursorPagination(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + jobs := testutil.CreateTestJobs(t, db, repo, 5, "test-agent") + + // First page: 3 jobs (newest first by descending ID). + rr := serveHuma( + t, srv, http.MethodGet, "/api/jobs?limit=3", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var page1 struct { + Jobs []storage.ReviewJob `json:"jobs"` + HasMore bool `json:"has_more"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &page1)) + require.Len(t, page1.Jobs, 3) + assert.True(t, page1.HasMore) + + // Cursor = smallest ID in page 1. + cursor := page1.Jobs[len(page1.Jobs)-1].ID + + rr2 := serveHuma(t, srv, http.MethodGet, + fmt.Sprintf("/api/jobs?limit=10&before=%d", cursor), nil, + ) + require.Equal(t, http.StatusOK, rr2.Code) + + var page2 struct { + Jobs []storage.ReviewJob `json:"jobs"` + HasMore bool `json:"has_more"` + } + require.NoError(t, json.Unmarshal(rr2.Body.Bytes(), &page2)) + assert.False(t, page2.HasMore) + for _, j := range page2.Jobs { + assert.Less(t, j.ID, cursor, + "all page2 jobs should have ID < cursor") + } + + // Both pages together should cover all jobs. + allIDs := make(map[int64]bool) + for _, j := range page1.Jobs { + allIDs[j.ID] = true + } + for _, j := range page2.Jobs { + allIDs[j.ID] = true + } + assert.Len(t, allIDs, len(jobs), + "all jobs accounted for across both pages") +} + +func TestHumaGetStatus(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + testutil.CreateTestJobs(t, db, repo, 2, "test-agent") + + rr := serveHuma( + t, srv, http.MethodGet, "/api/status", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var status storage.DaemonStatus + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &status)) + assert.NotEmpty(t, status.Version) + assert.Equal(t, 2, status.QueuedJobs) +} + +func TestHumaGetReview_NotFound(t *testing.T) { + srv, _, _ := newTestServer(t) + + rr := serveHuma( + t, srv, http.MethodGet, "/api/review?job_id=99999", nil, + ) + assert.Equal(t, http.StatusNotFound, rr.Code) +} + +func TestHumaGetReview_Found(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + job := testutil.CreateCompletedReview( + t, db, repo.ID, "abc123", "test-agent", "LGTM", + ) + + rr := serveHuma(t, srv, http.MethodGet, + fmt.Sprintf("/api/review?job_id=%d", job.ID), nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var review storage.Review + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &review)) + assert.Equal(t, job.ID, review.JobID) +} + +func TestHumaCancelJob(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + jobs := testutil.CreateTestJobs(t, db, repo, 1, "test-agent") + jobID := jobs[0].ID + + t.Run("cancel queued job succeeds", func(t *testing.T) { + body, _ := json.Marshal(CancelJobRequest{JobID: jobID}) + rr := serveHuma( + t, srv, http.MethodPost, "/api/job/cancel", body, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct{ Success bool } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.True(t, resp.Success) + + job, err := db.GetJobByID(jobID) + require.NoError(t, err) + assert.Equal(t, storage.JobStatusCanceled, job.Status) + }) + + t.Run("cancel already canceled returns 404", func(t *testing.T) { + body, _ := json.Marshal(CancelJobRequest{JobID: jobID}) + rr := serveHuma( + t, srv, http.MethodPost, "/api/job/cancel", body, + ) + assert.Equal(t, http.StatusNotFound, rr.Code) + }) + + t.Run("cancel nonexistent returns 404", func(t *testing.T) { + body, _ := json.Marshal(CancelJobRequest{JobID: 99999}) + rr := serveHuma( + t, srv, http.MethodPost, "/api/job/cancel", body, + ) + assert.Equal(t, http.StatusNotFound, rr.Code) + }) +} + +func TestHumaRerunJob(t *testing.T) { + srv, db, tmpDir := newTestServer(t) + + // Use tmpDir as repo path so resolveRerunModelProvider + // finds a real directory for validation. + repo, err := db.GetOrCreateRepo(tmpDir) + require.NoError(t, err) + + commit, err := db.GetOrCreateCommit( + repo.ID, "deadbeef", "A", "S", time.Now(), + ) + require.NoError(t, err) + job, err := db.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, CommitID: commit.ID, + GitRef: "deadbeef", Agent: "test", + }) + require.NoError(t, err) + _, err = db.ClaimJob("w") + require.NoError(t, err) + _, err = db.FailJob(job.ID, "", "some error") + require.NoError(t, err) + + body, _ := json.Marshal(RerunJobRequest{JobID: job.ID}) + rr := serveHuma( + t, srv, http.MethodPost, "/api/job/rerun", body, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct{ Success bool } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.True(t, resp.Success) +} + +func TestHumaCloseReview(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + job := testutil.CreateCompletedReview( + t, db, repo.ID, "closeme", "test-agent", "done", + ) + + t.Run("close review", func(t *testing.T) { + body, _ := json.Marshal(CloseReviewRequest{ + JobID: job.ID, Closed: true, + }) + rr := serveHuma( + t, srv, http.MethodPost, "/api/review/close", body, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct{ Success bool } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.True(t, resp.Success) + }) + + t.Run("close nonexistent review returns 404", func(t *testing.T) { + body, _ := json.Marshal(CloseReviewRequest{ + JobID: 99999, Closed: true, + }) + rr := serveHuma( + t, srv, http.MethodPost, "/api/review/close", body, + ) + assert.Equal(t, http.StatusNotFound, rr.Code) + }) +} + +func TestHumaOpenAPISpec(t *testing.T) { + srv, _, _ := newTestServer(t) + + rr := serveHuma( + t, srv, http.MethodGet, "/openapi.json", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var spec map[string]any + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &spec)) + + assert.Equal(t, "3.1.0", spec["openapi"]) + + paths, ok := spec["paths"].(map[string]any) + require.True(t, ok, "spec must have paths object") + + wantPaths := map[string]string{ + "/api/jobs": "get", + "/api/review": "get", + "/api/comments": "get", + "/api/repos": "get", + "/api/branches": "get", + "/api/status": "get", + "/api/summary": "get", + "/api/job/cancel": "post", + "/api/job/rerun": "post", + "/api/review/close": "post", + "/api/comment": "post", + // /api/job/output is a plain HandleFunc (supports + // NDJSON streaming) so it is not in the OpenAPI spec. + } + for p, method := range wantPaths { + pathObj, exists := paths[p] + assert.True(t, exists, + "expected path %s in OpenAPI spec", p) + if exists { + methods, ok := pathObj.(map[string]any) + require.True(t, ok, + "path %s should be an object", p) + _, hasMethod := methods[method] + assert.True(t, hasMethod, + "path %s should have method %s", p, method) + } + } +} + +func TestHumaListRepos(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + testutil.CreateTestJobs(t, db, repo, 2, "test-agent") + + rr := serveHuma( + t, srv, http.MethodGet, "/api/repos", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct { + Repos []storage.RepoWithCount `json:"repos"` + TotalCount int `json:"total_count"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.GreaterOrEqual(t, len(resp.Repos), 1) +} + +func TestHumaListBranches(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + + commit, err := db.GetOrCreateCommit( + repo.ID, "brsha", "A", "S", time.Now(), + ) + require.NoError(t, err) + _, err = db.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "brsha", + Branch: "feature-x", + Agent: "test-agent", + }) + require.NoError(t, err) + + rr := serveHuma( + t, srv, http.MethodGet, "/api/branches", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct { + Branches []storage.BranchWithCount `json:"branches"` + TotalCount int `json:"total_count"` + NullsRemaining int `json:"nulls_remaining"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.GreaterOrEqual(t, resp.TotalCount, 1) +} + +func TestHumaGetSummary(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + testutil.CreateCompletedReview( + t, db, repo.ID, "sumsha", "test-agent", "ok", + ) + + rr := serveHuma( + t, srv, http.MethodGet, "/api/summary", nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var summary storage.Summary + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &summary)) + assert.GreaterOrEqual(t, summary.Overview.Total, 1) +} + +func TestHumaListComments(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + job := testutil.CreateCompletedReview( + t, db, repo.ID, "comsha", "test-agent", "review text", + ) + + _, err := db.AddCommentToJob(job.ID, "alice", "nice work") + require.NoError(t, err) + + rr := serveHuma(t, srv, http.MethodGet, + fmt.Sprintf("/api/comments?job_id=%d", job.ID), nil, + ) + require.Equal(t, http.StatusOK, rr.Code) + + var resp struct { + Responses []storage.Response `json:"responses"` + } + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Len(t, resp.Responses, 1) + assert.Equal(t, "alice", resp.Responses[0].Responder) +} + +func TestHumaAddComment(t *testing.T) { + srv, db, _ := newTestServer(t) + repo := testutil.CreateTestRepo(t, db) + job := testutil.CreateCompletedReview( + t, db, repo.ID, "addcsha", "test-agent", "review text", + ) + + body, _ := json.Marshal(AddCommentRequest{ + JobID: job.ID, + Commenter: "bob", + Comment: "looks good", + }) + rr := serveHuma( + t, srv, http.MethodPost, "/api/comment", body, + ) + require.Equal(t, http.StatusCreated, rr.Code) + + var resp storage.Response + require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &resp)) + assert.Equal(t, "bob", resp.Responder) +} diff --git a/internal/daemon/huma_types.go b/internal/daemon/huma_types.go new file mode 100644 index 00000000..559bb88a --- /dev/null +++ b/internal/daemon/huma_types.go @@ -0,0 +1,213 @@ +package daemon + +import ( + "github.com/roborev-dev/roborev/internal/storage" +) + +// -- GET /api/jobs -- + +// ListJobsInput holds query parameters for listing jobs. +// Huma does not support pointer types for query parameters, +// so we use sentinel defaults to detect presence: +// - ID, Before: default -1 (valid IDs are always positive) +// - Limit: default limitNotProvided (so explicit negative +// values like -1 are treated as unlimited, matching legacy) +// - Offset: default -1 (negative offsets clamp to 0) +type ListJobsInput struct { + ID int64 `query:"id" default:"-1" doc:"Return a single job by ID"` + Status string `query:"status" doc:"Filter by job status"` + Repo string `query:"repo" doc:"Filter by repo root path"` + GitRef string `query:"git_ref" doc:"Filter by git ref"` + Branch string `query:"branch" doc:"Filter by branch name"` + BranchIncludeEmpty string `query:"branch_include_empty" doc:"Include jobs with no branch when filtering by branch" enum:"true,false,"` + Closed string `query:"closed" doc:"Filter by review closed state" enum:"true,false,"` + JobType string `query:"job_type" doc:"Filter by job type"` + ExcludeJobType string `query:"exclude_job_type" doc:"Exclude jobs of this type"` + RepoPrefix string `query:"repo_prefix" doc:"Filter repos by path prefix"` + Limit int `query:"limit" default:"-999999" doc:"Max results (default 50, 0=unlimited, max 10000)"` + Offset int `query:"offset" default:"-1" doc:"Skip N results (requires limit>0)"` + Before int64 `query:"before" default:"-1" doc:"Cursor: return jobs with ID < this value"` +} + +// ListJobsOutput is the response for GET /api/jobs. +type ListJobsOutput struct { + Body struct { + Jobs []storage.ReviewJob `json:"jobs"` + HasMore bool `json:"has_more"` + Stats *storage.JobStats `json:"stats,omitempty"` + } +} + +// -- GET /api/review -- + +// GetReviewInput holds query parameters for fetching a review. +type GetReviewInput struct { + JobID int64 `query:"job_id" default:"-1" doc:"Look up review by job ID"` + SHA string `query:"sha" doc:"Look up review by commit SHA"` +} + +// GetReviewOutput is the response for GET /api/review. +type GetReviewOutput struct { + Body *storage.Review +} + +// -- Shared request/response types (used by Huma handlers) -- + +// CancelJobRequest is the JSON body for POST /api/job/cancel. +type CancelJobRequest struct { + JobID int64 `json:"job_id"` +} + +// RerunJobRequest is the JSON body for POST /api/job/rerun. +type RerunJobRequest struct { + JobID int64 `json:"job_id"` +} + +// AddCommentRequest is the JSON body for POST /api/comment. +type AddCommentRequest struct { + SHA string `json:"sha,omitempty"` // Legacy: link to commit by SHA + JobID int64 `json:"job_id,omitempty"` // Preferred: link to job + Commenter string `json:"commenter"` + Comment string `json:"comment"` +} + +// CloseReviewRequest is the JSON body for POST /api/review/close. +type CloseReviewRequest struct { + JobID int64 `json:"job_id"` + Closed bool `json:"closed"` +} + +// JobOutputResponse is the response for GET /api/job/output. +type JobOutputResponse struct { + JobID int64 `json:"job_id"` + Status string `json:"status"` + Lines []OutputLine `json:"lines"` + HasMore bool `json:"has_more"` +} + +// -- POST /api/job/cancel -- + +// CancelJobInput is the request body for canceling a job. +type CancelJobInput struct { + Body CancelJobRequest +} + +// CancelJobOutput is the response for POST /api/job/cancel. +type CancelJobOutput struct { + Body struct { + Success bool `json:"success"` + } +} + +// -- POST /api/job/rerun -- + +// RerunJobInput is the request body for rerunning a job. +type RerunJobInput struct { + Body RerunJobRequest +} + +// RerunJobOutput is the response for POST /api/job/rerun. +type RerunJobOutput struct { + Body struct { + Success bool `json:"success"` + } +} + +// -- POST /api/review/close -- + +// CloseReviewInput is the request body for closing/reopening a review. +type CloseReviewInput struct { + Body CloseReviewRequest +} + +// CloseReviewOutput is the response for POST /api/review/close. +type CloseReviewOutput struct { + Body struct { + Success bool `json:"success"` + } +} + +// -- POST /api/comment -- + +// AddCommentInput is the request body for adding a comment. +type AddCommentInput struct { + Body AddCommentRequest +} + +// AddCommentOutput is the response for POST /api/comment. +type AddCommentOutput struct { + Body *storage.Response +} + +// -- GET /api/comments -- + +// ListCommentsInput holds query parameters for listing comments. +type ListCommentsInput struct { + JobID int64 `query:"job_id" default:"-1" doc:"List comments by job ID"` + CommitID int64 `query:"commit_id" default:"-1" doc:"List comments by commit ID"` + SHA string `query:"sha" doc:"List comments by commit SHA"` +} + +// ListCommentsOutput is the response for GET /api/comments. +type ListCommentsOutput struct { + Body struct { + Responses []storage.Response `json:"responses"` + } +} + +// -- GET /api/repos -- + +// ListReposInput holds query parameters for listing repos. +type ListReposInput struct { + Branch string `query:"branch" doc:"Filter to repos with jobs on this branch"` + Prefix string `query:"prefix" doc:"Filter repos by path prefix"` +} + +// ListReposOutput is the response for GET /api/repos. +type ListReposOutput struct { + Body struct { + Repos []storage.RepoWithCount `json:"repos"` + TotalCount int `json:"total_count"` + } +} + +// -- GET /api/branches -- + +// ListBranchesInput holds query parameters for listing branches. +type ListBranchesInput struct { + Repo []string `query:"repo,explode" doc:"Filter to branches in these repo paths"` +} + +// ListBranchesOutput is the response for GET /api/branches. +type ListBranchesOutput struct { + Body struct { + Branches []storage.BranchWithCount `json:"branches"` + TotalCount int `json:"total_count"` + NullsRemaining int `json:"nulls_remaining"` + } +} + +// -- GET /api/status -- + +// GetStatusInput is an empty input for the status endpoint. +type GetStatusInput struct{} + +// GetStatusOutput is the response for GET /api/status. +type GetStatusOutput struct { + Body storage.DaemonStatus +} + +// -- GET /api/summary -- + +// GetSummaryInput holds query parameters for the summary endpoint. +type GetSummaryInput struct { + Since string `query:"since" doc:"Time window (e.g. 7d, 24h). Default: 7d"` + Repo string `query:"repo" doc:"Filter by repo root path"` + Branch string `query:"branch" doc:"Filter by branch name"` + All string `query:"all" doc:"Include per-repo breakdown" enum:"true,false"` +} + +// GetSummaryOutput is the response for GET /api/summary. +type GetSummaryOutput struct { + Body *storage.Summary +} diff --git a/internal/daemon/server.go b/internal/daemon/server.go index b40a8816..64aa2f3e 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -88,23 +88,14 @@ func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server { } mux := http.NewServeMux() + s.registerHumaAPI(mux) + mux.HandleFunc("/api/job/output", s.handleJobOutput) mux.HandleFunc("/api/enqueue", s.handleEnqueue) mux.HandleFunc("/api/health", s.handleHealth) mux.HandleFunc("/api/ping", s.handlePing) - mux.HandleFunc("/api/jobs", s.handleListJobs) - mux.HandleFunc("/api/job/cancel", s.handleCancelJob) - mux.HandleFunc("/api/job/output", s.handleJobOutput) mux.HandleFunc("/api/job/log", s.handleJobLog) - mux.HandleFunc("/api/job/rerun", s.handleRerunJob) mux.HandleFunc("/api/job/update-branch", s.handleUpdateJobBranch) - mux.HandleFunc("/api/repos", s.handleListRepos) mux.HandleFunc("/api/repos/register", s.handleRegisterRepo) - mux.HandleFunc("/api/branches", s.handleListBranches) - mux.HandleFunc("/api/review", s.handleGetReview) - mux.HandleFunc("/api/review/close", s.handleCloseReview) - mux.HandleFunc("/api/comment", s.handleAddComment) - mux.HandleFunc("/api/comments", s.handleListComments) - mux.HandleFunc("/api/status", s.handleStatus) mux.HandleFunc("/api/stream/events", s.handleStreamEvents) mux.HandleFunc("/api/jobs/batch", s.handleBatchJobs) mux.HandleFunc("/api/remap", s.handleRemap) @@ -115,7 +106,6 @@ func NewServer(db *storage.DB, cfg *config.Config, configPath string) *Server { mux.HandleFunc("/api/job/applied", s.handleMarkJobApplied) mux.HandleFunc("/api/job/rebased", s.handleMarkJobRebased) mux.HandleFunc("/api/activity", s.handleActivity) - mux.HandleFunc("/api/summary", s.handleSummary) s.httpServer = &http.Server{ Addr: cfg.ServerAddr, @@ -1280,177 +1270,6 @@ func reusableSessionTarget(gitRef string) string { return strings.TrimSpace(gitRef) } -func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - // Support fetching a single job by ID - if idStr := r.URL.Query().Get("id"); idStr != "" { - jobID, err := strconv.ParseInt(idStr, 10, 64) - if err != nil { - writeError(w, http.StatusBadRequest, "invalid id parameter") - return - } - job, err := s.db.GetJobByID(jobID) - if err != nil { - // Distinguish "not found" from actual DB errors - if errors.Is(err, sql.ErrNoRows) { - writeJSON(w, map[string]any{ - "jobs": []storage.ReviewJob{}, - "has_more": false, - }) - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("database error: %v", err)) - return - } - job.Patch = nil // Patch is only served via /api/job/patch - writeJSON(w, map[string]any{ - "jobs": []storage.ReviewJob{*job}, - "has_more": false, - }) - return - } - - status := r.URL.Query().Get("status") - repo := r.URL.Query().Get("repo") - if repo != "" { - repo = filepath.ToSlash(filepath.Clean(repo)) - } - gitRef := r.URL.Query().Get("git_ref") - repoPrefix := r.URL.Query().Get("repo_prefix") - if repoPrefix != "" { - repoPrefix = filepath.ToSlash(filepath.Clean(repoPrefix)) - } - - // Parse limit from query, default to 50, 0 means no limit - // Clamp to valid range: 0 (unlimited) or 1-10000 - const maxLimit = 10000 - limit := 50 - if limitStr := r.URL.Query().Get("limit"); limitStr != "" { - if _, err := fmt.Sscanf(limitStr, "%d", &limit); err != nil { - limit = 50 - } - } - // Clamp negative to 0, and cap at maxLimit (0 = unlimited is allowed) - if limit < 0 { - limit = 0 - } else if limit > maxLimit { - limit = maxLimit - } - - // Parse offset from query, default to 0 - // Offset is ignored when limit=0 (unlimited) since OFFSET requires LIMIT in SQL - offset := 0 - if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" { - if _, err := fmt.Sscanf(offsetStr, "%d", &offset); err != nil { - offset = 0 - } - } - if offset < 0 || limit == 0 { - offset = 0 - } - - // Fetch one extra to determine if there are more results - fetchLimit := limit - if limit > 0 { - fetchLimit = limit + 1 - } - - var listOpts []storage.ListJobsOption - if gitRef != "" { - listOpts = append(listOpts, storage.WithGitRef(gitRef)) - } - if branch := r.URL.Query().Get("branch"); branch != "" { - if r.URL.Query().Get("branch_include_empty") == "true" { - listOpts = append(listOpts, storage.WithBranchOrEmpty(branch)) - } else { - listOpts = append(listOpts, storage.WithBranch(branch)) - } - } - if closedStr := r.URL.Query().Get("closed"); closedStr == "true" || closedStr == "false" { - listOpts = append(listOpts, storage.WithClosed(closedStr == "true")) - } - if jobType := r.URL.Query().Get("job_type"); jobType != "" { - listOpts = append(listOpts, storage.WithJobType(jobType)) - } - if exJobType := r.URL.Query().Get("exclude_job_type"); exJobType != "" { - listOpts = append(listOpts, storage.WithExcludeJobType(exJobType)) - } - if repoPrefix != "" && repo == "" { - listOpts = append(listOpts, storage.WithRepoPrefix(repoPrefix)) - } - - jobs, err := s.db.ListJobs(status, repo, fetchLimit, offset, listOpts...) - if err != nil { - s.writeInternalError(w, fmt.Sprintf("list jobs: %v", err)) - return - } - - // Determine if there are more results - hasMore := false - if limit > 0 && len(jobs) > limit { - hasMore = true - jobs = jobs[:limit] // Trim to requested limit - } - - // Compute aggregate stats using same repo/branch filters (ignoring closed filter and pagination) - var statsOpts []storage.ListJobsOption - if branch := r.URL.Query().Get("branch"); branch != "" { - if r.URL.Query().Get("branch_include_empty") == "true" { - statsOpts = append(statsOpts, storage.WithBranchOrEmpty(branch)) - } else { - statsOpts = append(statsOpts, storage.WithBranch(branch)) - } - } - if repoPrefix != "" && repo == "" { - statsOpts = append(statsOpts, storage.WithRepoPrefix(repoPrefix)) - } - stats, statsErr := s.db.CountJobStats(repo, statsOpts...) - if statsErr != nil { - log.Printf("Warning: failed to count job stats: %v", statsErr) - } - - writeJSON(w, map[string]any{ - "jobs": jobs, - "has_more": hasMore, - "stats": stats, - }) -} - -func (s *Server) handleListRepos(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - branch := r.URL.Query().Get("branch") - prefix := r.URL.Query().Get("prefix") - if prefix != "" { - prefix = filepath.ToSlash(filepath.Clean(prefix)) - } - - var repoOpts []storage.ListReposOption - if prefix != "" { - repoOpts = append(repoOpts, storage.WithRepoPathPrefix(prefix)) - } - if branch != "" { - repoOpts = append(repoOpts, storage.WithRepoBranch(branch)) - } - repos, totalCount, err := s.db.ListReposWithReviewCounts(repoOpts...) - if err != nil { - writeError(w, http.StatusInternalServerError, fmt.Sprintf("list repos: %v", err)) - return - } - - writeJSON(w, map[string]any{ - "repos": repos, - "total_count": totalCount, - }) -} - func (s *Server) handleRegisterRepo(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -1490,82 +1309,18 @@ func (s *Server) handleRegisterRepo(w http.ResponseWriter, r *http.Request) { writeJSON(w, repo) } -func (s *Server) handleListBranches(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - // Optional repo filter (by path) - supports multiple values - // Filter out empty strings to treat ?repo= as no filter - var repoPaths []string - for _, p := range r.URL.Query()["repo"] { - if p != "" { - repoPaths = append(repoPaths, filepath.ToSlash(filepath.Clean(p))) - } - } - - result, err := s.db.ListBranchesWithCounts(repoPaths) - if err != nil { - writeError(w, http.StatusInternalServerError, fmt.Sprintf("list branches: %v", err)) - return - } - - writeJSON(w, map[string]any{ - "branches": result.Branches, - "total_count": result.TotalCount, - "nulls_remaining": result.NullsRemaining, - }) -} - -type CancelJobRequest struct { - JobID int64 `json:"job_id"` -} - -func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var req CancelJobRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.JobID == 0 { - writeError(w, http.StatusBadRequest, "job_id is required") - return - } - - // Cancel in DB first (marks as canceled) - if err := s.db.CancelJob(req.JobID); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, "job not found or not cancellable") - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("cancel job: %v", err)) - return - } - - // Also cancel the running worker if job was running (kills subprocess) - s.workerPool.CancelJob(req.JobID) - - writeJSON(w, map[string]any{"success": true}) -} - -// JobOutputResponse is the response for /api/job/output -type JobOutputResponse struct { - JobID int64 `json:"job_id"` - Status string `json:"status"` - Lines []OutputLine `json:"lines"` - HasMore bool `json:"has_more"` -} - -func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { +// handleJobOutput returns the in-memory output buffer for a job. +// In polling mode (default) it returns a JSON object with the +// current lines. In streaming mode (stream=1) it uses NDJSON to +// push lines in real time until the job completes or the client +// disconnects. +func (s *Server) handleJobOutput( + w http.ResponseWriter, r *http.Request, +) { if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") + writeError( + w, http.StatusMethodNotAllowed, "method not allowed", + ) return } @@ -1581,44 +1336,37 @@ func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { return } - // Check job exists job, err := s.db.GetJobByID(jobID) if err != nil { writeError(w, http.StatusNotFound, "job not found") return } - // Check if streaming mode requested stream := r.URL.Query().Get("stream") == "1" if !stream { - // Return current buffer (polling mode) lines := s.workerPool.GetJobOutput(jobID) if lines == nil { lines = []OutputLine{} } - - resp := JobOutputResponse{ + writeJSON(w, JobOutputResponse{ JobID: jobID, Status: string(job.Status), Lines: lines, HasMore: job.Status == storage.JobStatusRunning, - } - writeJSON(w, resp) + }) return } - // Streaming mode via SSE - // Don't stream for non-running jobs - they have no active buffer producer - // and would hang forever waiting for data + // Streaming mode — NDJSON if job.Status != storage.JobStatusRunning { - w.Header().Set("Content-Type", "application/x-ndjson") - if !writeNDJSON(w, map[string]any{ + w.Header().Set( + "Content-Type", "application/x-ndjson", + ) + writeNDJSON(w, map[string]any{ "type": "complete", "status": string(job.Status), - }) { - return - } + }) return } @@ -1628,15 +1376,18 @@ func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { - writeError(w, http.StatusInternalServerError, "streaming not supported") + writeError( + w, http.StatusInternalServerError, + "streaming not supported", + ) return } - // Subscribe to output - initial, ch, cancel := s.workerPool.SubscribeJobOutput(jobID) + initial, ch, cancel := s.workerPool.SubscribeJobOutput( + jobID, + ) defer cancel() - // Send initial lines for _, line := range initial { if !writeNDJSON(w, map[string]any{ "type": "line", @@ -1649,24 +1400,20 @@ func (s *Server) handleJobOutput(w http.ResponseWriter, r *http.Request) { } flusher.Flush() - // Stream new lines until job completes or client disconnects for { select { case <-r.Context().Done(): return case line, ok := <-ch: if !ok { - // Job finished - channel closed, fetch actual status finalStatus := "done" - if finalJob, err := s.db.GetJobByID(jobID); err == nil { - finalStatus = string(finalJob.Status) + if fj, err := s.db.GetJobByID(jobID); err == nil { + finalStatus = string(fj.Status) } - if !writeNDJSON(w, map[string]any{ + writeNDJSON(w, map[string]any{ "type": "complete", "status": finalStatus, - }) { - return - } + }) flusher.Flush() return } @@ -1851,57 +1598,6 @@ func writeNDJSON(w http.ResponseWriter, v any) bool { return true } -type RerunJobRequest struct { - JobID int64 `json:"job_id"` -} - -func (s *Server) handleRerunJob(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var req RerunJobRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.JobID == 0 { - writeError(w, http.StatusBadRequest, "job_id is required") - return - } - - job, err := s.db.GetJobByID(req.JobID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, "job not found or not rerunnable") - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("load job: %v", err)) - return - } - - model, provider, err := resolveRerunModelProvider( - job, s.configWatcher.Config(), - ) - if err != nil { - writeError(w, http.StatusBadRequest, err.Error()) - return - } - - if err := s.db.ReenqueueJob(req.JobID, storage.ReenqueueOpts{Model: model, Provider: provider}); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, "job not found or not rerunnable") - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("rerun job: %v", err)) - return - } - - writeJSON(w, map[string]any{"success": true}) -} - func (s *Server) handleUpdateJobBranch(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -1938,146 +1634,6 @@ func (s *Server) handleUpdateJobBranch(w http.ResponseWriter, r *http.Request) { }) } -func (s *Server) handleGetReview(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var review *storage.Review - var err error - - // Support lookup by job_id (preferred) or sha - if jobIDStr := r.URL.Query().Get("job_id"); jobIDStr != "" { - jobID, parseErr := strconv.ParseInt(jobIDStr, 10, 64) - if parseErr != nil { - writeError(w, http.StatusBadRequest, "invalid job_id") - return - } - review, err = s.db.GetReviewByJobID(jobID) - } else if sha := r.URL.Query().Get("sha"); sha != "" { - review, err = s.db.GetReviewByCommitSHA(sha) - } else { - writeError(w, http.StatusBadRequest, "job_id or sha parameter required") - return - } - - if err != nil { - writeError(w, http.StatusNotFound, "review not found") - return - } - - writeJSON(w, review) -} - -type AddCommentRequest struct { - SHA string `json:"sha,omitempty"` // Legacy: link to commit by SHA - JobID int64 `json:"job_id,omitempty"` // Preferred: link to job - Commenter string `json:"commenter"` - Comment string `json:"comment"` -} - -func (s *Server) handleAddComment(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var req AddCommentRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.Commenter == "" || req.Comment == "" { - writeError(w, http.StatusBadRequest, "commenter and comment are required") - return - } - - // Must provide either job_id or sha - if req.JobID == 0 && req.SHA == "" { - writeError(w, http.StatusBadRequest, "job_id or sha is required") - return - } - - var resp *storage.Response - var err error - - if req.JobID != 0 { - // Link to job (preferred method) - resp, err = s.db.AddCommentToJob(req.JobID, req.Commenter, req.Comment) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, "job not found") - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("add comment: %v", err)) - return - } - } else { - // Legacy: link to commit by SHA - commit, err := s.db.GetCommitBySHA(req.SHA) - if err != nil { - writeError(w, http.StatusNotFound, "commit not found") - return - } - - resp, err = s.db.AddComment(commit.ID, req.Commenter, req.Comment) - if err != nil { - writeError(w, http.StatusInternalServerError, fmt.Sprintf("add comment: %v", err)) - return - } - } - - writeCreatedJSON(w, resp) -} - -func (s *Server) handleListComments(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var responses []storage.Response - var err error - - // Support lookup by job_id (preferred), commit_id, or sha (legacy) - if jobIDStr := r.URL.Query().Get("job_id"); jobIDStr != "" { - jobID, parseErr := strconv.ParseInt(jobIDStr, 10, 64) - if parseErr != nil { - writeError(w, http.StatusBadRequest, "invalid job_id") - return - } - responses, err = s.db.GetCommentsForJob(jobID) - if err != nil { - writeError(w, http.StatusInternalServerError, fmt.Sprintf("get responses: %v", err)) - return - } - } else if cidStr := r.URL.Query().Get("commit_id"); cidStr != "" { - commitID, parseErr := strconv.ParseInt(cidStr, 10, 64) - if parseErr != nil { - writeError(w, http.StatusBadRequest, "invalid commit_id") - return - } - responses, err = s.db.GetCommentsForCommit(commitID) - if err != nil { - writeError(w, http.StatusInternalServerError, fmt.Sprintf("get responses: %v", err)) - return - } - } else if sha := r.URL.Query().Get("sha"); sha != "" { - responses, err = s.db.GetCommentsForCommitSHA(sha) - if err != nil { - writeError(w, http.StatusNotFound, "commit not found") - return - } - } else { - writeError(w, http.StatusBadRequest, "job_id, commit_id, or sha parameter required") - return - } - - writeJSON(w, map[string]any{"responses": responses}) -} - // getMachineID returns the cached machine ID, fetching it on first successful call. // Retries on each call until successful to handle transient DB errors. func (s *Server) getMachineID() string { @@ -2095,44 +1651,6 @@ func (s *Server) getMachineID() string { return s.machineID } -func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - queued, running, done, failed, canceled, applied, rebased, err := s.db.GetJobCounts() - if err != nil { - s.writeInternalError(w, fmt.Sprintf("get counts: %v", err)) - return - } - - // Get config reload time and counter - configReloadedAt := "" - if t := s.configWatcher.LastReloadedAt(); !t.IsZero() { - configReloadedAt = t.Format(time.RFC3339Nano) - } - configReloadCounter := s.configWatcher.ReloadCounter() - - status := storage.DaemonStatus{ - Version: version.Version, - QueuedJobs: queued, - RunningJobs: running, - CompletedJobs: done, - FailedJobs: failed, - CanceledJobs: canceled, - AppliedJobs: applied, - RebasedJobs: rebased, - ActiveWorkers: s.workerPool.ActiveWorkers(), - MaxWorkers: s.workerPool.MaxWorkers(), - MachineID: s.getMachineID(), - ConfigReloadedAt: configReloadedAt, - ConfigReloadCounter: configReloadCounter, - } - - writeJSON(w, status) -} - func (s *Server) handlePing(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -2146,57 +1664,6 @@ func (s *Server) handlePing(w http.ResponseWriter, r *http.Request) { }) } -type CloseReviewRequest struct { - JobID int64 `json:"job_id"` - Closed bool `json:"closed"` -} - -func (s *Server) handleCloseReview(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - var req CloseReviewRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - writeError(w, http.StatusBadRequest, "invalid request body") - return - } - - if req.JobID == 0 { - writeError(w, http.StatusBadRequest, "job_id is required") - return - } - - if err := s.db.MarkReviewClosedByJobID(req.JobID, req.Closed); err != nil { - if errors.Is(err, sql.ErrNoRows) { - writeError(w, http.StatusNotFound, "review not found for job") - return - } - writeError(w, http.StatusInternalServerError, fmt.Sprintf("mark closed: %v", err)) - return - } - - eventType := "review.closed" - if !req.Closed { - eventType = "review.reopened" - } - evt := Event{ - Type: eventType, - TS: time.Now(), - JobID: req.JobID, - } - if job, err := s.db.GetJobByID(req.JobID); err == nil { - evt.Repo = job.RepoPath - evt.RepoName = job.RepoName - evt.SHA = job.GitRef - evt.Agent = job.Agent - } - s.broadcaster.Broadcast(evt) - - writeJSON(w, map[string]any{"success": true}) -} - // RemapRequest is the request body for POST /api/remap. type RemapRequest struct { RepoPath string `json:"repo_path"` @@ -2777,41 +2244,6 @@ func (s *Server) handleActivity(w http.ResponseWriter, r *http.Request) { writeJSON(w, map[string]any{"entries": entries}) } -func (s *Server) handleSummary(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - writeError(w, http.StatusMethodNotAllowed, "method not allowed") - return - } - - q := r.URL.Query() - - // Parse --since duration (default 7d) - since := time.Now().Add(-7 * 24 * time.Hour) - if sinceStr := q.Get("since"); sinceStr != "" { - if d, err := parseDuration(sinceStr); err == nil { - since = time.Now().Add(-d) - } else { - writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid since value: %s", sinceStr)) - return - } - } - - opts := storage.SummaryOptions{ - RepoPath: q.Get("repo"), - Branch: q.Get("branch"), - Since: since, - AllRepos: q.Get("all") == "true", - } - - summary, err := s.db.GetSummary(opts) - if err != nil { - s.writeInternalError(w, fmt.Sprintf("get summary: %v", err)) - return - } - - writeJSON(w, summary) -} - // parseDuration parses a human-friendly duration string like "7d", "24h", "30d". func parseDuration(s string) (time.Duration, error) { if len(s) < 2 { diff --git a/internal/daemon/server_actions_test.go b/internal/daemon/server_actions_test.go index 39c60034..b273decf 100644 --- a/internal/daemon/server_actions_test.go +++ b/internal/daemon/server_actions_test.go @@ -50,7 +50,7 @@ func TestHandleStatus(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/status", nil) w := httptest.NewRecorder() - server.handleStatus(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -73,7 +73,7 @@ func TestHandleStatus(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/status", nil) w := httptest.NewRecorder() - server.handleStatus(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusMethodNotAllowed { assert.Condition(t, func() bool { @@ -86,7 +86,7 @@ func TestHandleStatus(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/status", nil) w := httptest.NewRecorder() - server.handleStatus(w, req) + server.httpServer.Handler.ServeHTTP(w, req) var status storage.DaemonStatus testutil.DecodeJSON(t, w, &status) @@ -104,7 +104,7 @@ func TestHandleStatus(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/status", nil) w := httptest.NewRecorder() - server.handleStatus(w, req) + server.httpServer.Handler.ServeHTTP(w, req) var status storage.DaemonStatus testutil.DecodeJSON(t, w, &status) @@ -198,7 +198,7 @@ func TestHandleCancelJob(t *testing.T) { // the full code path including workerPool side-effects. req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/cancel", CancelJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleCancelJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code, "first cancel should succeed") return job.ID }, @@ -225,7 +225,7 @@ func TestHandleCancelJob(t *testing.T) { request: func(t *testing.T, jobID int64) *http.Request { return testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/cancel", map[string]any{}) }, - wantStatus: http.StatusBadRequest, + wantStatus: http.StatusUnprocessableEntity, }, { name: "cancel with wrong method", @@ -265,7 +265,7 @@ func TestHandleCancelJob(t *testing.T) { req := tt.request(t, jobID) w := httptest.NewRecorder() - server.handleCancelJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) assert.Equal(t, tt.wantStatus, w.Code, "response body: %s", w.Body.String()) @@ -296,7 +296,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { assert.Condition(t, func() bool { @@ -325,7 +325,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { assert.Condition(t, func() bool { @@ -364,7 +364,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { assert.Condition(t, func() bool { @@ -416,7 +416,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) updated, err := isolatedDB.GetJobByID(job.ID) @@ -432,7 +432,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusNotFound { assert.Condition(t, func() bool { @@ -472,12 +472,10 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: job.ID}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusBadRequest) - var resp ErrorResponse - testutil.DecodeJSON(t, w, &resp) - assert.Contains(t, resp.Error, "rerun job worktree path is stale or invalid") + assert.Contains(t, w.Body.String(), "rerun job worktree path is stale or invalid") updated, err := db.GetJobByID(job.ID) require.NoError(t, err) @@ -488,7 +486,7 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", RerunJobRequest{JobID: 99999}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusNotFound { assert.Condition(t, func() bool { @@ -501,12 +499,12 @@ func TestHandleRerunJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/job/rerun", map[string]any{}) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) - if w.Code != http.StatusBadRequest { + if w.Code != http.StatusUnprocessableEntity { assert.Condition(t, func() bool { return false - }, "Expected status 400 for missing job_id, got %d", w.Code) + }, "Expected status 422 for missing job_id, got %d", w.Code) } }) @@ -514,7 +512,7 @@ func TestHandleRerunJob(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/job/rerun", nil) w := httptest.NewRecorder() - server.handleRerunJob(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusMethodNotAllowed { assert.Condition(t, func() bool { @@ -741,7 +739,7 @@ func TestHandleAddCommentToJobStates(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/comment", reqData) w := httptest.NewRecorder() - server.handleAddComment(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusCreated { assert.Condition(t, func() bool { @@ -774,7 +772,7 @@ func TestHandleAddCommentToNonExistentJob(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/comment", reqData) w := httptest.NewRecorder() - server.handleAddComment(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusNotFound { assert.Condition(t, func() bool { @@ -815,7 +813,7 @@ func TestHandleAddCommentWithoutReview(t *testing.T) { req := testutil.MakeJSONRequest(t, http.MethodPost, "/api/comment", reqData) w := httptest.NewRecorder() - server.handleAddComment(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusCreated { assert.Condition(t, func() bool { @@ -861,7 +859,7 @@ func TestHandleCloseReview_BroadcastsEvent(t *testing.T) { Closed: true, }) w := httptest.NewRecorder() - server.handleCloseReview(w, req) + server.httpServer.Handler.ServeHTTP(w, req) assert.Equal(http.StatusOK, w.Code) @@ -899,7 +897,7 @@ func TestHandleCloseReview_BroadcastsReopenEvent(t *testing.T) { Closed: false, }) w := httptest.NewRecorder() - server.handleCloseReview(w, req) + server.httpServer.Handler.ServeHTTP(w, req) assert.Equal(http.StatusOK, w.Code) @@ -940,7 +938,7 @@ func TestHandleCloseReview_RepoFilteredSubscriber(t *testing.T) { Closed: true, }) w := httptest.NewRecorder() - server.handleCloseReview(w, req) + server.httpServer.Handler.ServeHTTP(w, req) assert.Equal(http.StatusOK, w.Code) // Filtered subscriber receives the event @@ -996,5 +994,12 @@ func TestHandleEnqueue_BroadcastsEvent(t *testing.T) { func TestHandleListCommentsJobIDParsing(t *testing.T) { server, _, _ := newTestServer(t) - testInvalidIDParsing(t, server.handleListComments, "/api/comments?job_id=%s") + for _, id := range []string{"abc", "10abc", "1.5"} { + t.Run("invalid_id_"+id, func(t *testing.T) { + rr := serveHuma(t, server, http.MethodGet, + "/api/comments?job_id="+id, nil) + assert.GreaterOrEqual(t, rr.Code, 400, + "expected client error for invalid id %q", id) + }) + } } diff --git a/internal/daemon/server_jobs_test.go b/internal/daemon/server_jobs_test.go index 425f648f..94190548 100644 --- a/internal/daemon/server_jobs_test.go +++ b/internal/daemon/server_jobs_test.go @@ -28,13 +28,13 @@ type listJobsResponse struct { HasMore bool `json:"has_more"` } -// fetchJobs calls handleListJobs with the given query string, asserts HTTP 200, +// fetchJobs calls GET /api/jobs via the mux, asserts HTTP 200, // decodes the JSON body, and returns the parsed response. func fetchJobs(t *testing.T, server *Server, query string) listJobsResponse { t.Helper() req := httptest.NewRequest(http.MethodGet, "/api/jobs?"+query, nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code, "GET /api/jobs?%s: %s", query, w.Body.String()) var resp listJobsResponse testutil.DecodeJSON(t, w, &resp) @@ -61,7 +61,6 @@ func TestHandleListJobsWithFilter(t *testing.T) { {"repo filter with limit", "repo=" + url.QueryEscape(repo1.RootPath) + "&limit=2", 2, "repo1"}, {"negative limit treated as unlimited", "limit=-1", 5, ""}, {"very large limit capped to max", "limit=999999", 5, ""}, - {"invalid limit uses default", "limit=abc", 5, ""}, } for _, tt := range tests { @@ -1566,7 +1565,7 @@ func TestHandleListJobsByID(t *testing.T) { // Request job 1 specifically req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/jobs?id=%d", job1ID), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -1596,7 +1595,7 @@ func TestHandleListJobsByID(t *testing.T) { // Request job 2 specifically (the middle job) req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/jobs?id=%d", job2ID), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -1625,7 +1624,7 @@ func TestHandleListJobsByID(t *testing.T) { // Request a job ID that doesn't exist req := httptest.NewRequest(http.MethodGet, "/api/jobs?id=99999", nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -1648,20 +1647,18 @@ func TestHandleListJobsByID(t *testing.T) { t.Run("returns error for invalid job ID", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/jobs?id=notanumber", nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) - if w.Code != http.StatusBadRequest { - assert.Condition(t, func() bool { - return false - }, "Expected 400, got %d: %s", w.Code, w.Body.String()) - } + assert.GreaterOrEqual(t, w.Code, 400, + "expected client error for invalid id, got %d: %s", + w.Code, w.Body.String()) }) t.Run("without id param returns all jobs", func(t *testing.T) { // Request without id param should return all jobs req := httptest.NewRequest(http.MethodGet, "/api/jobs", nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -2254,7 +2251,14 @@ func TestHandleEnqueueRangeNonCommitObjectRejects(t *testing.T) { func TestHandleListJobsIDParsing(t *testing.T) { server, _, _ := newTestServer(t) - testInvalidIDParsing(t, server.handleListJobs, "/api/jobs?id=%s") + for _, id := range []string{"abc", "10abc", "1.5"} { + t.Run("invalid_id_"+id, func(t *testing.T) { + rr := serveHuma(t, server, http.MethodGet, + "/api/jobs?id="+id, nil) + assert.GreaterOrEqual(t, rr.Code, 400, + "expected client error for invalid id %q", id) + }) + } } func TestHandleListJobsJobTypeFilter(t *testing.T) { @@ -2290,7 +2294,7 @@ func TestHandleListJobsJobTypeFilter(t *testing.T) { http.MethodGet, "/api/jobs?job_type=fix", nil, ) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -2321,7 +2325,7 @@ func TestHandleListJobsJobTypeFilter(t *testing.T) { http.MethodGet, "/api/jobs", nil, ) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -2346,7 +2350,7 @@ func TestHandleListJobsJobTypeFilter(t *testing.T) { http.MethodGet, "/api/jobs?exclude_job_type=fix", nil, ) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusOK { require.Condition(t, func() bool { @@ -2384,7 +2388,7 @@ func TestHandleListJobsRepoPrefixFilter(t *testing.T) { t.Run("repo_prefix returns only child repos", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo_prefix="+url.QueryEscape(workspace), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { @@ -2413,7 +2417,7 @@ func TestHandleListJobsRepoPrefixFilter(t *testing.T) { seedRepoWithJobs(t, db, workspace, 1, "exact") req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo_prefix="+url.QueryEscape(workspace), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { @@ -2433,7 +2437,7 @@ func TestHandleListJobsRepoPrefixFilter(t *testing.T) { exactRepo := filepath.Join(workspace, "repo-a") req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo="+url.QueryEscape(exactRepo)+"&repo_prefix="+url.QueryEscape(workspace), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { @@ -2451,7 +2455,7 @@ func TestHandleListJobsRepoPrefixFilter(t *testing.T) { t.Run("repo_prefix trailing slash is normalized", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo_prefix="+url.QueryEscape(workspace+"/"), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { @@ -2471,7 +2475,7 @@ func TestHandleListJobsRepoPrefixFilter(t *testing.T) { dotdotPrefix := workspace + "/../" + filepath.Base(workspace) req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo_prefix="+url.QueryEscape(dotdotPrefix), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { @@ -2840,7 +2844,7 @@ func TestHandleListJobsSlashNormalization(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/jobs?repo_prefix="+url.QueryEscape(ws), nil) w := httptest.NewRecorder() - server.handleListJobs(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var resp struct { diff --git a/internal/daemon/server_ops_test.go b/internal/daemon/server_ops_test.go index e048a3c2..4d3ed98b 100644 --- a/internal/daemon/server_ops_test.go +++ b/internal/daemon/server_ops_test.go @@ -460,12 +460,12 @@ func TestHandleGetReviewJobIDParsing(t *testing.T) { { "non-numeric job_id", "job_id=abc", - http.StatusBadRequest, + http.StatusUnprocessableEntity, }, { "partial numeric job_id", "job_id=10abc", - http.StatusBadRequest, + http.StatusUnprocessableEntity, }, { "not found job_id", @@ -483,7 +483,7 @@ func TestHandleGetReviewJobIDParsing(t *testing.T) { req := httptest.NewRequest(http.MethodGet, url, nil) w := httptest.NewRecorder() - server.handleGetReview(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != tt.wantStatus { assert.Condition(t, func() bool { diff --git a/internal/daemon/server_output_test.go b/internal/daemon/server_output_test.go index b825c26e..37e8f30c 100644 --- a/internal/daemon/server_output_test.go +++ b/internal/daemon/server_output_test.go @@ -16,7 +16,7 @@ import ( "testing" ) -// jobOutputResponse covers the union of fields returned by handleJobOutput +// jobOutputResponse covers the union of fields returned by GET /api/job/output // in both polling and streaming modes. type jobOutputResponse struct { JobID int64 `json:"job_id"` @@ -36,7 +36,7 @@ func TestHandleJobOutput(t *testing.T) { t.Run("missing job_id", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/job/output", nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusBadRequest, w.Code, "body: %s", w.Body.String()) }) @@ -44,7 +44,7 @@ func TestHandleJobOutput(t *testing.T) { t.Run("invalid job_id", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=notanumber", nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusBadRequest, w.Code, "body: %s", w.Body.String()) }) @@ -52,7 +52,7 @@ func TestHandleJobOutput(t *testing.T) { t.Run("nonexistent job", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/job/output?job_id=99999", nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusNotFound, w.Code, "body: %s", w.Body.String()) }) @@ -63,7 +63,7 @@ func TestHandleJobOutput(t *testing.T) { req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code, "body: %s", w.Body.String()) @@ -81,7 +81,7 @@ func TestHandleJobOutput(t *testing.T) { req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d", job.ID), nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code, "body: %s", w.Body.String()) @@ -92,16 +92,16 @@ func TestHandleJobOutput(t *testing.T) { assert.False(t, resp.HasMore, "expected has_more=false for completed job") }) - t.Run("streaming completed job", func(t *testing.T) { + t.Run("stream completed job returns NDJSON complete", func(t *testing.T) { job := createTestJob(t, db, filepath.Join(tmpDir, "test-repo-stream"), "abc123", "test-agent") setJobStatus(t, db, job.ID, storage.JobStatusDone) req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/job/output?job_id=%d&stream=1", job.ID), nil) w := httptest.NewRecorder() - server.handleJobOutput(w, req) + server.httpServer.Handler.ServeHTTP(w, req) - // Should return immediately with complete message, not hang require.Equal(t, http.StatusOK, w.Code, "body: %s", w.Body.String()) + assert.Equal(t, "application/x-ndjson", w.Header().Get("Content-Type")) var resp jobOutputResponse testutil.DecodeJSON(t, w, &resp) @@ -113,7 +113,14 @@ func TestHandleJobOutput(t *testing.T) { func TestHandleJobOutputIDParsing(t *testing.T) { server, _, _ := newTestServer(t) - testInvalidIDParsing(t, server.handleJobOutput, "/api/job-output?job_id=%s") + for _, id := range []string{"abc", "10abc", "1.5"} { + t.Run("invalid_id_"+id, func(t *testing.T) { + rr := serveHuma(t, server, http.MethodGet, + "/api/job/output?job_id="+id, nil) + assert.GreaterOrEqual(t, rr.Code, 400, + "expected client error for invalid id %q", id) + }) + } } func TestHandleJobLog(t *testing.T) { diff --git a/internal/daemon/server_repos_test.go b/internal/daemon/server_repos_test.go index 4f62fb8d..eb01a18f 100644 --- a/internal/daemon/server_repos_test.go +++ b/internal/daemon/server_repos_test.go @@ -22,7 +22,7 @@ func TestHandleListRepos(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) @@ -52,7 +52,7 @@ func TestHandleListRepos(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) @@ -96,7 +96,7 @@ func TestHandleListRepos(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/repos", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusMethodNotAllowed { assert.Condition(t, func() bool { @@ -128,7 +128,7 @@ func TestHandleListReposWithBranchFilter(t *testing.T) { t.Run("filter by main branch", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?branch=main", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -148,7 +148,7 @@ func TestHandleListReposWithBranchFilter(t *testing.T) { t.Run("filter by feature branch", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?branch=feature", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -168,7 +168,7 @@ func TestHandleListReposWithBranchFilter(t *testing.T) { t.Run("nonexistent branch returns empty", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?branch=nonexistent", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -198,7 +198,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { t.Run("prefix returns only child repos", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(workspace), nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -218,7 +218,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { t.Run("prefix excludes non-matching repos", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(filepath.Join(tmpDir, "nonexistent")), nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -241,7 +241,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(workspace)+"&branch=main", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -262,7 +262,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(workspace)+"&branch=feature", nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -283,7 +283,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(workspace+"/"), nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -300,7 +300,7 @@ func TestHandleListReposWithPrefixFilter(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(dotdotPrefix), nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -335,7 +335,7 @@ func TestHandleListReposSlashNormalization(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/repos?prefix="+url.QueryEscape(ws), nil) w := httptest.NewRecorder() - server.handleListRepos(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response reposResponse @@ -392,7 +392,7 @@ func TestHandleListBranches(t *testing.T) { t.Run("list all branches", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/branches", nil) w := httptest.NewRecorder() - server.handleListBranches(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response branchesResponse @@ -418,7 +418,7 @@ func TestHandleListBranches(t *testing.T) { repoPath := filepath.Join(tmpDir, "repo1") req := httptest.NewRequest(http.MethodGet, "/api/branches?repo="+repoPath, nil) w := httptest.NewRecorder() - server.handleListBranches(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response branchesResponse @@ -438,9 +438,10 @@ func TestHandleListBranches(t *testing.T) { t.Run("filter by multiple repos", func(t *testing.T) { repo1Path := filepath.Join(tmpDir, "repo1") repo2Path := filepath.Join(tmpDir, "repo2") - req := httptest.NewRequest(http.MethodGet, "/api/branches?repo="+repo1Path+"&repo="+repo2Path, nil) + req := httptest.NewRequest(http.MethodGet, + "/api/branches?repo="+url.QueryEscape(repo1Path)+"&repo="+url.QueryEscape(repo2Path), nil) w := httptest.NewRecorder() - server.handleListBranches(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response branchesResponse @@ -460,7 +461,7 @@ func TestHandleListBranches(t *testing.T) { t.Run("empty repo param treated as no filter", func(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/api/branches?repo=", nil) w := httptest.NewRecorder() - server.handleListBranches(w, req) + server.httpServer.Handler.ServeHTTP(w, req) testutil.AssertStatusCode(t, w, http.StatusOK) var response branchesResponse @@ -481,7 +482,7 @@ func TestHandleListBranches(t *testing.T) { req := httptest.NewRequest(http.MethodPost, "/api/branches", nil) w := httptest.NewRecorder() - server.handleListBranches(w, req) + server.httpServer.Handler.ServeHTTP(w, req) if w.Code != http.StatusMethodNotAllowed { assert.Condition(t, func() bool { diff --git a/internal/daemon/server_test.go b/internal/daemon/server_test.go index 3b87d581..8ed1f00f 100644 --- a/internal/daemon/server_test.go +++ b/internal/daemon/server_test.go @@ -143,24 +143,6 @@ func createTestJob(t *testing.T, db *storage.DB, dir, gitRef, agent string) *sto return job } -// testInvalidIDParsing is a generic helper to test invalid ID query parameters. -func testInvalidIDParsing(t *testing.T, handler http.HandlerFunc, urlTemplate string) { - t.Helper() - tests := []string{"abc", "10abc", "1.5"} - for _, invalidID := range tests { - t.Run("invalid_id_"+invalidID, func(t *testing.T) { - req := httptest.NewRequest(http.MethodGet, fmt.Sprintf(urlTemplate, invalidID), nil) - w := httptest.NewRecorder() - handler(w, req) - if w.Code != http.StatusBadRequest { - assert.Condition(t, func() bool { - return false - }, "expected status %d for id %s, got %d", http.StatusBadRequest, invalidID, w.Code) - } - }) - } -} - func newTestServer(t *testing.T) (*Server, *storage.DB, string) { t.Helper() db, tmpDir := testutil.OpenTestDBWithDir(t) diff --git a/internal/storage/db_filter_test.go b/internal/storage/db_filter_test.go index c7a31401..13cfa3e1 100644 --- a/internal/storage/db_filter_test.go +++ b/internal/storage/db_filter_test.go @@ -945,3 +945,72 @@ func TestListReposWithCombinedPrefixAndBranch(t *testing.T) { assert.Equal(t, 4, total) }) } + +func TestListJobsWithBeforeCursor(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + _, jobs := seedJobs(t, db, "/tmp/cursor-repo", 5) + + t.Run("before cursor returns only older jobs", func(t *testing.T) { + // Use the middle job's ID as cursor + cursor := jobs[2].ID + result, err := db.ListJobs("", "", 50, 0, WithBeforeCursor(cursor)) + require.NoError(t, err, "ListJobs failed") + + for _, j := range result { + assert.Less(t, j.ID, cursor, + "all returned job IDs should be less than cursor") + } + }) + + t.Run("cursor at lowest ID returns empty", func(t *testing.T) { + cursor := jobs[0].ID + result, err := db.ListJobs("", "", 50, 0, WithBeforeCursor(cursor)) + require.NoError(t, err, "ListJobs failed") + + assert.Empty(t, result) + }) + + t.Run("cursor above highest ID returns all jobs", func(t *testing.T) { + cursor := jobs[len(jobs)-1].ID + 100 + result, err := db.ListJobs("", "", 50, 0, WithBeforeCursor(cursor)) + require.NoError(t, err, "ListJobs failed") + + assert.Len(t, result, 5) + }) + + t.Run("cursor with limit returns correct page", func(t *testing.T) { + cursor := jobs[4].ID + result, err := db.ListJobs("", "", 2, 0, WithBeforeCursor(cursor)) + require.NoError(t, err, "ListJobs failed") + + assert.Len(t, result, 2) + for _, j := range result { + assert.Less(t, j.ID, cursor) + } + // Results ordered by ID DESC, so first result has highest ID + assert.Greater(t, result[0].ID, result[1].ID) + }) + + t.Run("cursor combined with other filters", func(t *testing.T) { + // Set branch on some jobs for combined filtering + setJobBranch(t, db, jobs[0].ID, "main") + setJobBranch(t, db, jobs[1].ID, "main") + setJobBranch(t, db, jobs[2].ID, "feature") + setJobBranch(t, db, jobs[3].ID, "main") + setJobBranch(t, db, jobs[4].ID, "main") + + cursor := jobs[4].ID + result, err := db.ListJobs("", "", 50, 0, + WithBeforeCursor(cursor), WithBranch("main")) + require.NoError(t, err, "ListJobs failed") + + for _, j := range result { + assert.Less(t, j.ID, cursor) + assert.Equal(t, "main", j.Branch) + } + // jobs[0], jobs[1], jobs[3] have branch=main and ID < jobs[4].ID + assert.Len(t, result, 3) + }) +} diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index 8298d06c..db8b1b40 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -676,6 +676,7 @@ type listJobsOptions struct { jobType string excludeJobType string repoPrefix string + beforeCursor *int64 } // WithGitRef filters jobs by git ref. @@ -712,6 +713,11 @@ func WithExcludeJobType(jobType string) ListJobsOption { return func(o *listJobsOptions) { o.excludeJobType = jobType } } +// WithBeforeCursor filters jobs to those with ID < cursor (for cursor pagination). +func WithBeforeCursor(id int64) ListJobsOption { + return func(o *listJobsOptions) { o.beforeCursor = &id } +} + // WithRepoPrefix filters jobs to repos whose root_path starts with the given prefix. func WithRepoPrefix(prefix string) ListJobsOption { return func(o *listJobsOptions) { @@ -782,6 +788,10 @@ func buildJobFilterClause(statusFilter, repoFilter string, o listJobsOptions) (s conditions = append(conditions, "j.job_type != ?") args = append(args, o.excludeJobType) } + if o.beforeCursor != nil { + conditions = append(conditions, "j.id < ?") + args = append(args, *o.beforeCursor) + } if len(conditions) == 0 { return "", args