Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a PostgreSQL control-plane store, config/schema and loader support, runtime wiring (CLI/engine/frontend/scheduler), queue leasing, UUIDv7 identifiers, development docker-compose/Make targets, CI job, and extensive unit/integration tests. ChangesControl-plane store and runtime integration
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (6)
compose.postgres.yaml (1)
10-11: 💤 Low valueConsider pinning the volume mount to the PostgreSQL data directory.
The official
postgresDocker image stores data under/var/lib/postgresql/data, not/var/lib/postgresql. While data is still persisted (since/datais a subdirectory of the mount point), pinning the mount to the actual data directory avoids inadvertently capturing other runtime files (e.g., run scripts under/var/lib/postgresql/) and aligns with the documented convention.🛠️ Suggested change
- - dagu-dev-postgres-data:/var/lib/postgresql + - dagu-dev-postgres-data:/var/lib/postgresql/data🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@compose.postgres.yaml` around lines 10 - 11, The volumes mount currently pins dagu-dev-postgres-data to /var/lib/postgresql; change the target path to the PostgreSQL data directory /var/lib/postgresql/data to match the official image convention and avoid capturing unrelated runtime files—update the volumes entry that references dagu-dev-postgres-data under the volumes key to mount at /var/lib/postgresql/data instead of /var/lib/postgresql.internal/persis/dagrunstore/postgres/attempt.go (1)
58-63: ⚡ Quick winSilent JSON unmarshal failure leaves
att.dagnil with no diagnostic.If
row.DagDatais non‑empty but malformed, the error is dropped andatt.dagstays nil — laterOpenwon't repersist a DAG (skips theatt.dag != nilbranch) andeventDataquietly returns nil, dropping theDAGFileNameDataKeyfrom emitted events. At minimum, log the failure so corrupted rows surface in logs.♻️ Proposed change
if len(row.DagData) > 0 { var dag core.DAG - if err := json.Unmarshal(row.DagData, &dag); err == nil { + if err := json.Unmarshal(row.DagData, &dag); err != nil { + logger.Warn(context.Background(), "Failed to unmarshal stored DAG definition", + tag.AttemptID(row.AttemptID), tag.Error(err)) + } else { att.dag = &dag } }Or, since
newAttemptis only called with a fresh row from a query, propagate the error via the constructor signature.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/attempt.go` around lines 58 - 63, The JSON unmarshal of row.DagData currently swallows errors causing att.dag to remain nil and later drop DAGFileNameDataKey from events; update newAttempt to surface failures by changing its signature to return (attempt, error) instead of silently ignoring json.Unmarshal errors on row.DagData, return the unmarshal error (or a wrapped error including row identifiers) so callers can handle/log it, and update callers of newAttempt to propagate/handle that error (or log it) so malformed DAG rows are visible in logs/metrics.internal/persis/dagrunstore/postgres/migration_test.go (1)
16-28: 💤 Low valueMinor:
assert.Containson raw SQL is brittle to formatting.Whitespace, quoting, or harmless reordering in the migration will break this test even though the constraints remain semantically equivalent. Consider tightening to a regex (
assert.Regexp) or, better, exercising the constraints behaviorally against an ephemeral Postgres instance via testcontainers (you already have docker compose tooling for dev pg).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/migration_test.go` around lines 16 - 28, The test TestMigrationUsesExistingIdentifierConstraints is brittle because assert.Contains checks raw SQL substrings; replace those with assert.Regexp calls using robust regexes that tolerate whitespace/quoting (targeting migrations.FS.ReadFile("20260506000000_create_dag_run_attempts.sql") and the same semantic patterns), e.g. compile regexes for each constraint (UUID regex, char_length checks, lower(...) NOT IN, and allowed character classes) and assert.Regexp(t, re, sql) for each expected constraint; alternatively, for stronger coverage, convert the test to run the migration against an ephemeral Postgres (using your docker/testcontainers setup) and validate the constraints behaviorally by attempting inserts that should pass/fail.internal/cmn/schema/config.schema.json (1)
939-973: 💤 Low valueOptional: schema could enforce
dsnwhenbackend: postgres.The runtime loader already validates this and produces
dag_run_store.postgres.dsn is required, so users get a clear error. However, IDE/CI schema validators won't catch a missing DSN until the loader runs. If you want pre-runtime feedback, add anallOf/if-thenconstraint mirroring theBotsDefpattern already in this file.♻️ Example schema constraint
"DAGRunStoreDef": { "type": "object", "description": "DAG-run status persistence configuration.", "additionalProperties": false, "properties": { "backend": { "type": "string", "description": "DAG-run status persistence backend. Default: file.", "enum": ["file", "postgres"], "default": "file" }, "postgres": { "$ref": "#/definitions/DAGRunStorePostgresDef" } - } + }, + "allOf": [ + { + "if": { + "properties": { "backend": { "const": "postgres" } }, + "required": ["backend"] + }, + "then": { + "required": ["postgres"], + "properties": { + "postgres": { + "required": ["dsn"], + "properties": { "dsn": { "type": "string", "minLength": 1 } } + } + } + } + } + ] },🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmn/schema/config.schema.json` around lines 939 - 973, Add a conditional schema rule so that when DAGRunStoreDef.backend == "postgres" the DAGRunStorePostgresDef.dsn property is required; update the DAGRunStoreDef definition to include an if/then (or allOf with a conditional) that checks {properties: {backend: {const: "postgres"}}} and then requires the path dag_run_store.postgres.dsn, mirroring the existing BotsDef pattern in this file; reference DAGRunStoreDef and DAGRunStorePostgresDef to locate where to insert the if/then constraint.internal/persis/dagrunstore/postgres/queries/attempts.sql (2)
237-243: 💤 Low value
RenameDAGRunsmay touch unrelated sub-DAG rows.The WHERE clause matches rows where
dag_name = old_name(including sub-DAG rows whose owndag_namehappens to equal the renamed root DAG's old name, but which aren't conceptually related to this rename). For those rows the SET is effectively a no-op fordag_name(guarded byis_root) and only changesroot_dag_nameif it also matches, butupdated_atis bumped regardless. If you intend the rename to be strictly scoped to runs whose root isold_name, consider tightening the WHERE clause (e.g.,WHERE root_dag_name = old_name OR (is_root AND dag_name = old_name)).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/queries/attempts.sql` around lines 237 - 243, The RenameDAGRuns UPDATE currently matches any row with dag_name = old_name and thus can bump updated_at on unrelated sub-DAG rows; change the WHERE to restrict to rows whose root is old_name or to root runs matching old_name (e.g., WHERE root_dag_name::text = sqlc.arg(old_name)::text OR (is_root AND dag_name::text = sqlc.arg(old_name)::text)) so only runs conceptually related to the renamed root DAG in table dagu_dag_run_attempts are touched, leaving the existing SET logic for dag_name, root_dag_name and updated_at intact.
1-2: 💤 Low valueAdvisory lock keyspace remains effectively 32-bit despite casting to
bigint.
hashtext(...)returnsint4(32-bit signed integer). Casting tobigintonly sign-extends the value without expanding the collision space—collisions remain at the int32 level (~1 in 4B). For DAG-run scoped locking this is usually acceptable, but if you ever lock per(dag_name, dag_run_id)with high cardinality, consider widening the key using either the two-argument formpg_advisory_xact_lock(int, int)or combining multiple hash outputs.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/queries/attempts.sql` around lines 1 - 2, The advisory lock is effectively 32-bit because hashtext(...) returns int4; update the LockDAGRunKey query to use the two-argument advisory-lock form so you get a 64-bit keyspace instead of sign-extending a single int4: replace the single pg_advisory_xact_lock(hashtext(sqlc.arg(lock_key)::text)::bigint) call with pg_advisory_xact_lock(<first_int4>, <second_int4>) where <first_int4> and <second_int4> are two distinct 32-bit hashes derived from the lock components (e.g., hashtext of dag_name and hashtext of dag_run_id or two different hash functions on lock_key), casting each hashtext(...) result to int; ensure you reference the SQL comment name LockDAGRunKey and the existing sqlc.arg(lock_key) expression when extracting the components.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/engine/engine.go`:
- Around line 72-80: Engine/context teardown never closes the concrete
dagRunStore Postgres resources because exec.DAGRunStore lacks Close; add cleanup
by either (A) adding Close() error to the exec.DAGRunStore interface and
implement it on postgres.Store, then call e.dagRunStore.Close() in Engine.Close
(and the context shutdown in cmd/context.go), or (B) if you prefer minimal
change, perform a type-assertion where the store is created/closed: if closer,
ok := dagRunStore.(interface{ Close() error }); ok { _ = closer.Close() } (do
this in Engine.Close and the context shutdown paths referenced in
cmd/context.go) so the Postgres pool is released; ensure error handling/logging
around the Close call.
In `@internal/persis/dagrunstore/postgres/attempt.go`:
- Around line 231-254: WriteStepMessages currently does an unsafe
read-modify-write by calling GetAttempt, unmarshalling messages_data, mutating
in memory and calling UpdateAttemptMessages, which races with concurrent
writers; fix by performing the merge atomically in the DB instead of
client-side: add a new SQL exec query (e.g. MergeAttemptStepMessages) that uses
jsonb_set(coalesce(messages_data,'{}'::jsonb), ARRAY[sqlc.arg(step_name)::text],
sqlc.arg(messages)::jsonb, true) and updates updated_at, then replace the
GetAttempt + Marshal/Unmarshal + UpdateAttemptMessages sequence in
Attempt.WriteStepMessages to call this new query (or alternatively wrap
GetAttempt/UpdateAttemptMessages in a transaction and SELECT ... FOR UPDATE) to
prevent lost concurrent writes.
In `@internal/persis/dagrunstore/postgres/cursor.go`:
- Around line 119-121: Replace the ignored json.Marshal error with proper
handling: change the marshal call to capture the error (data, err :=
json.Marshal(normalized)) and add a short check (if err != nil { panic(err) } or
propagate/handle per surrounding conventions) before computing the sha256 and
calling hex.EncodeToString; this touches the json.Marshal call that produces
data, the variable normalized, and the subsequent
sha256.Sum256/hex.EncodeToString use.
In `@internal/persis/dagrunstore/postgres/db/attempts.sql.go`:
- Around line 1-13: The sqlc-generated Go files in package db (files starting
with the "Code generated by sqlc" header) are not excluded by the current tool
patterns, so update the tooling: either add the sqlc output directory
(internal/persis/dagrunstore/postgres/db) to the exclusion lists in the Makefile
`addlicense` target and .golangci.yml (matching the "Code generated by sqlc"
files), or change the sqlc config to emit generated code into a `gen/`
subdirectory so existing `-ignore "./**/gen/**"` rules apply; ensure the change
prevents addlicense from modifying files with the sqlc header.
In `@internal/persis/dagrunstore/postgres/store.go`:
- Around line 489-495: When iterating rows, don't silently drop decode errors
from statusFromListRow: call statusFromListRow(row) as before but if err != nil
then if errors.Is(err, exec.ErrNoStatusData) continue; otherwise log the
unexpected decode error (including err and identifying row keys like run ID)
using the package/store logger before continuing so schema drift/corruption is
visible; keep the existing label filter check
(core.NewLabels(status.Labels).MatchesFilters) unchanged.
- Around line 264-281: The RecentAttempts method currently swallows DB errors
(the error returned from s.queries.RecentAttemptsByName) and returns nil; change
RecentAttempts in type Store to log the error instead of silently dropping it:
capture err from s.queries.RecentAttemptsByName and call the Store logger (or a
package logger on Store, e.g. s.logger or s.log) at warn/debug level including
the dag name and the error (and keep returning nil/empty as the function
signature requires), so failures like transient Postgres outages are visible in
logs; ensure the log message references the DagName and the underlying err from
RecentAttemptsByName.
- Around line 32-38: PoolConfig currently declares MaxIdleConns but
applyPoolConfig doesn't use it; map it to pgxpool.Config.MinIdleConns (not
MaxConns) in the applyPoolConfig function: when PoolConfig.MaxIdleConns > 0 set
cfg.MinIdleConns = PoolConfig.MaxIdleConns but cap it at cfg.MaxConns (or
PoolConfig.MaxOpenConns if that’s used to set cfg.MaxConns) to avoid
MinIdleConns > MaxConns; alternatively remove MaxIdleConns from
PoolConfig/schema if you prefer not to support this mapping. Ensure references:
PoolConfig, MaxIdleConns, applyPoolConfig, and pgxpool.Config.MinIdleConns are
updated accordingly.
In `@sqlc.yaml`:
- Around line 22-23: The sqlc override maps dagu_workspace_name to a
non-nullable go_type causing sqlc to emit interface{} + WorkspaceValid; update
sqlc.yaml to use a nullable-compatible Go type (e.g.,
"github.com/guregu/null.String" or "sql.NullString" or a pointer "string" via a
nullable override) for the field dagu_workspace_name so generated models.go has
Workspace as a proper nullable string type and removes the interface{} pattern;
after updating sqlc.yaml, regenerate code (sqlc generate) and verify models.go
contains the chosen nullable type and that existing code using attempt.Workspace
is adjusted to the new type (or add nil/Valid checks if using sql.NullString).
---
Nitpick comments:
In `@compose.postgres.yaml`:
- Around line 10-11: The volumes mount currently pins dagu-dev-postgres-data to
/var/lib/postgresql; change the target path to the PostgreSQL data directory
/var/lib/postgresql/data to match the official image convention and avoid
capturing unrelated runtime files—update the volumes entry that references
dagu-dev-postgres-data under the volumes key to mount at
/var/lib/postgresql/data instead of /var/lib/postgresql.
In `@internal/cmn/schema/config.schema.json`:
- Around line 939-973: Add a conditional schema rule so that when
DAGRunStoreDef.backend == "postgres" the DAGRunStorePostgresDef.dsn property is
required; update the DAGRunStoreDef definition to include an if/then (or allOf
with a conditional) that checks {properties: {backend: {const: "postgres"}}} and
then requires the path dag_run_store.postgres.dsn, mirroring the existing
BotsDef pattern in this file; reference DAGRunStoreDef and
DAGRunStorePostgresDef to locate where to insert the if/then constraint.
In `@internal/persis/dagrunstore/postgres/attempt.go`:
- Around line 58-63: The JSON unmarshal of row.DagData currently swallows errors
causing att.dag to remain nil and later drop DAGFileNameDataKey from events;
update newAttempt to surface failures by changing its signature to return
(attempt, error) instead of silently ignoring json.Unmarshal errors on
row.DagData, return the unmarshal error (or a wrapped error including row
identifiers) so callers can handle/log it, and update callers of newAttempt to
propagate/handle that error (or log it) so malformed DAG rows are visible in
logs/metrics.
In `@internal/persis/dagrunstore/postgres/migration_test.go`:
- Around line 16-28: The test TestMigrationUsesExistingIdentifierConstraints is
brittle because assert.Contains checks raw SQL substrings; replace those with
assert.Regexp calls using robust regexes that tolerate whitespace/quoting
(targeting migrations.FS.ReadFile("20260506000000_create_dag_run_attempts.sql")
and the same semantic patterns), e.g. compile regexes for each constraint (UUID
regex, char_length checks, lower(...) NOT IN, and allowed character classes) and
assert.Regexp(t, re, sql) for each expected constraint; alternatively, for
stronger coverage, convert the test to run the migration against an ephemeral
Postgres (using your docker/testcontainers setup) and validate the constraints
behaviorally by attempting inserts that should pass/fail.
In `@internal/persis/dagrunstore/postgres/queries/attempts.sql`:
- Around line 237-243: The RenameDAGRuns UPDATE currently matches any row with
dag_name = old_name and thus can bump updated_at on unrelated sub-DAG rows;
change the WHERE to restrict to rows whose root is old_name or to root runs
matching old_name (e.g., WHERE root_dag_name::text = sqlc.arg(old_name)::text OR
(is_root AND dag_name::text = sqlc.arg(old_name)::text)) so only runs
conceptually related to the renamed root DAG in table dagu_dag_run_attempts are
touched, leaving the existing SET logic for dag_name, root_dag_name and
updated_at intact.
- Around line 1-2: The advisory lock is effectively 32-bit because hashtext(...)
returns int4; update the LockDAGRunKey query to use the two-argument
advisory-lock form so you get a 64-bit keyspace instead of sign-extending a
single int4: replace the single
pg_advisory_xact_lock(hashtext(sqlc.arg(lock_key)::text)::bigint) call with
pg_advisory_xact_lock(<first_int4>, <second_int4>) where <first_int4> and
<second_int4> are two distinct 32-bit hashes derived from the lock components
(e.g., hashtext of dag_name and hashtext of dag_run_id or two different hash
functions on lock_key), casting each hashtext(...) result to int; ensure you
reference the SQL comment name LockDAGRunKey and the existing sqlc.arg(lock_key)
expression when extracting the components.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 832bded3-7795-4463-adab-e5d4ba7b908d
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (27)
Makefilecompose.postgres.yamlgo.modinternal/cmd/context.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/key_hints.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/core/exec/dagrun.gointernal/engine/engine.gointernal/persis/dagrunstore/postgres/attempt.gointernal/persis/dagrunstore/postgres/cursor.gointernal/persis/dagrunstore/postgres/db/attempts.sql.gointernal/persis/dagrunstore/postgres/db/db.gointernal/persis/dagrunstore/postgres/db/models.gointernal/persis/dagrunstore/postgres/migrate.gointernal/persis/dagrunstore/postgres/migration_test.gointernal/persis/dagrunstore/postgres/migrations/20260506000000_create_dag_run_attempts.sqlinternal/persis/dagrunstore/postgres/migrations/embed.gointernal/persis/dagrunstore/postgres/queries/attempts.sqlinternal/persis/dagrunstore/postgres/store.gointernal/persis/dagrunstore/store.gointernal/persis/filedagrun/query_cursor.gointernal/service/frontend/api/v1/dagruns.gosqlc.yaml
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (3)
internal/persis/dagrunstore/postgres/attempt.go (2)
96-99: 💤 Low valueSilently swallowing
ReadStatuserrors duringOpeninitialization.
lastPersistedEventTypeLockedreturns""on anyReadStatusfailure (including transient DB errors), andOpenthen proceeds with an emptylastEmittedEventType. That can lead to duplicate-emitted transition events on a subsequentWrite(since the emitter compares against an empty baseline). Logging a warning preserves diagnosability without changing behavior.📝 Log on read failure
func (att *Attempt) lastPersistedEventTypeLocked(ctx context.Context) eventstore.EventType { status, err := att.ReadStatus(ctx) if err != nil { + logger.Warn(ctx, "Failed to read persisted DAG-run status during Open", tag.Error(err)) return "" } eventType, ok := eventstore.PersistedDAGRunEventTypeForStatus(status.Status) if !ok { return "" } return eventType }Also applies to: 267-277
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/attempt.go` around lines 96 - 99, The Open initialization currently swallows ReadStatus errors because lastPersistedEventTypeLocked(ctx) returns "" on failure, so update Open (and the similar block around the other occurrence at the 267-277 region) to capture the error from lastPersistedEventTypeLocked/ReadStatus and, if non-nil, log a warning via the package logger (including context and the error) before falling back to the empty string; keep the existing behavior (do not change the return flow), but ensure att.lastEmittedEventType assignment still happens and att.opened.Store(true) is executed after logging so transient DB read errors are at least recorded for diagnostics.
73-99: 💤 Low value
Openre-persistsdag_dataon every invocation.If
newAttemptalready populatedatt.dagfrom a stored row (line 58-65),Openwill re-serialize and re-UPDATEthe same DAG bytes — wasteful round-trip on warm reopen. Consider only persisting whendag_datawas previously empty or whenSetDAGwas called after construction. Not a correctness issue.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/attempt.go` around lines 73 - 99, Attempt.Open currently re-serializes and always UPDATEs dag_data when att.dag is non-nil, causing unnecessary DB writes for attempts initialized from stored rows; modify Open (method Attempt.Open) to only persist the DAG when the persisted dag_data was previously empty or when SetDAG was called after construction (i.e., track a flag like dagDirty or check a nil/empty persistedDAG marker populated in newAttempt), so Open checks that flag (or persistedDAG state) before calling att.queries.UpdateAttemptDAG; ensure SetDAG sets the dirty flag and Open clears it after a successful update.internal/persis/dagrunstore/postgres/queries/attempts.sql (1)
245-249: 💤 Low value
DeleteDAGRunRowscan return duplicatedag_run_idvalues.Because each
dag_run_idmay have multiple attempt rows (and root runs have a separatedag_run_idfrom sub-runs but all share the sameroot_dag_run_id),RETURNING dag_run_idwill emit one row per deleted attempt — so callers iterating these for filesystem/working-dir cleanup will repeat the same path. Either dedupe in SQL or trust the caller to dedupe; flagging in case the caller doesn't.♻️ Dedupe at the source
-- name: DeleteDAGRunRows :many -DELETE FROM dagu_dag_run_attempts -WHERE root_dag_name = sqlc.arg(root_dag_name) - AND root_dag_run_id = sqlc.arg(root_dag_run_id) -RETURNING dag_run_id; +WITH deleted AS ( + DELETE FROM dagu_dag_run_attempts + WHERE root_dag_name = sqlc.arg(root_dag_name) + AND root_dag_run_id = sqlc.arg(root_dag_run_id) + RETURNING dag_run_id +) +SELECT DISTINCT dag_run_id FROM deleted;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/dagrunstore/postgres/queries/attempts.sql` around lines 245 - 249, The DeleteDAGRunRows query currently returns one row per deleted attempt, causing duplicate dag_run_id values; change the query that deletes from dagu_dag_run_attempts (matching root_dag_name and root_dag_run_id) to return unique dag_run_id values by using a DISTINCT return (e.g., use RETURNING DISTINCT dag_run_id or wrap the DELETE in a CTE and SELECT DISTINCT dag_run_id from the deleted rows) so callers iterating results get each dag_run_id only once.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/cmd/context.go`:
- Around line 671-680: The dagrun store created by dagrunstore.New (assigned to
schedulerRunStore) must be closed if subsequent scheduler.New(...) fails; add a
short-lived defer to close schedulerRunStore (calling its Close/Release method)
immediately after successful creation and cancel that defer/transfer ownership
right after scheduler.New returns the sched instance successfully so that if
scheduler construction errors the Postgres pool is released; do the same pattern
for the other similar creation at the 683-686 site.
- Around line 907-911: Direct NewContext callers (e.g., places that call
NewContext directly instead of via NewCommand) are creating resource-owning
Contexts without invoking the new Context.Close contract, causing leaks for
Postgres-backed run stores; update each direct NewContext usage (notably the
callers referenced in the review) to ensure Close is always run by adding a
deferred close like defer ctx.Close(ctx) immediately after NewContext returns or
by refactoring the call site to go through the existing wrapper used by
NewCommand so cleanup happens automatically; look for symbol NewContext and
ensure Context.Close(ctx) is deferred (or route through NewCommand's wrapper) at
those call sites.
In `@internal/persis/dagrunstore/postgres/cursor.go`:
- Around line 95-110: The queryFilterHash implementation currently builds a
local normalized struct (named normalized) but omits opts.WorkspaceFilter, so
include a stable representation of the workspace filter in that struct before
computing the hash: add a field like WorkspaceFilter []string
`json:"workspace_filter,omitempty"` (or a single string if the filter is scalar)
to the normalized literal and initialize it from opts.WorkspaceFilter (guarding
nil -> empty slice/string for stability) inside queryFilterHash so the workspace
scope is part of the hashed cursor fingerprint.
In `@internal/persis/dagrunstore/postgres/store.go`:
- Around line 635-680: The RejectActive check in RemoveDAGRun is a TOCTOU bug
because it runs before acquiring the advisory lock; move the FindAttempt +
status check inside the existing withTx callback immediately after calling
q.LockDAGRunKey so the status is read under the same lock (mirror how
CompareAndSwapLatestAttemptStatus does it). Concretely, inside
Store.RemoveDAGRun's withTx closure, after q.LockDAGRunKey(...) call, call
s.FindAttempt(ctx, dagRun) (or query the attempt/status via the transaction if
available) and then attempt.ReadStatus(ctx); if status.Status.IsActive() return
exec.ErrDAGRunActive (formatted the same way the code currently uses) before
calling q.DeleteDAGRunRows, so deletion cannot proceed for active runs.
In `@internal/service/scheduler/scheduler.go`:
- Around line 769-771: When startup fails, ensure the DAG-run store is closed in
the cleanupFailedStartup path just like in Stop(): call
exec.CloseDAGRunStore(ctx, s.dagRunStore) from cleanupFailedStartup and log any
error with logger.Warn(ctx, "Failed to close scheduler DAG-run store",
tag.Error(err)); this reuses the same close logic used in Stop() so
s.dagRunStore isn't left open on Start() failures.
In `@Makefile`:
- Line 295: Remove the exemption for internal/persis/dagrunstore/postgres/db/**
from the addlicense ignore list in the Makefile so Go sources there are
processed by the addlicense target; if those files are generated, modify the
generator that writes files into internal/persis/dagrunstore/postgres/db (or the
sqlc post-generation step) to insert the required GPL v3 header into each *.go
file at generation time instead of keeping the directory excluded.
---
Nitpick comments:
In `@internal/persis/dagrunstore/postgres/attempt.go`:
- Around line 96-99: The Open initialization currently swallows ReadStatus
errors because lastPersistedEventTypeLocked(ctx) returns "" on failure, so
update Open (and the similar block around the other occurrence at the 267-277
region) to capture the error from lastPersistedEventTypeLocked/ReadStatus and,
if non-nil, log a warning via the package logger (including context and the
error) before falling back to the empty string; keep the existing behavior (do
not change the return flow), but ensure att.lastEmittedEventType assignment
still happens and att.opened.Store(true) is executed after logging so transient
DB read errors are at least recorded for diagnostics.
- Around line 73-99: Attempt.Open currently re-serializes and always UPDATEs
dag_data when att.dag is non-nil, causing unnecessary DB writes for attempts
initialized from stored rows; modify Open (method Attempt.Open) to only persist
the DAG when the persisted dag_data was previously empty or when SetDAG was
called after construction (i.e., track a flag like dagDirty or check a nil/empty
persistedDAG marker populated in newAttempt), so Open checks that flag (or
persistedDAG state) before calling att.queries.UpdateAttemptDAG; ensure SetDAG
sets the dirty flag and Open clears it after a successful update.
In `@internal/persis/dagrunstore/postgres/queries/attempts.sql`:
- Around line 245-249: The DeleteDAGRunRows query currently returns one row per
deleted attempt, causing duplicate dag_run_id values; change the query that
deletes from dagu_dag_run_attempts (matching root_dag_name and root_dag_run_id)
to return unique dag_run_id values by using a DISTINCT return (e.g., use
RETURNING DISTINCT dag_run_id or wrap the DELETE in a CTE and SELECT DISTINCT
dag_run_id from the deleted rows) so callers iterating results get each
dag_run_id only once.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 89d98aa6-fbcc-4c2f-aad6-3794cd1b68f5
📒 Files selected for processing (17)
.golangci.ymlMakefilecompose.postgres.yamlinternal/cmd/context.gointernal/cmn/schema/config.schema.jsoninternal/core/exec/dagrun.gointernal/engine/engine.gointernal/persis/dagrunstore/postgres/attempt.gointernal/persis/dagrunstore/postgres/attempt_test.gointernal/persis/dagrunstore/postgres/cursor.gointernal/persis/dagrunstore/postgres/db/attempts.sql.gointernal/persis/dagrunstore/postgres/db/models.gointernal/persis/dagrunstore/postgres/migration_test.gointernal/persis/dagrunstore/postgres/queries/attempts.sqlinternal/persis/dagrunstore/postgres/store.gointernal/service/scheduler/scheduler.gosqlc.yaml
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/persis/dagrunstore/postgres/queries/attempts.sql`:
- Around line 262-268: The RenameDAGRuns update currently only changes
relational columns; modify the UPDATE for RenameDAGRuns to also update the JSONB
status_data->>'name' for affected rows so partial-name searches remain
consistent: when root_dag_name or (is_root and dag_name) matches
sqlc.arg(old_name), use jsonb_set on status_data to replace the top-level "name"
value with sqlc.arg(new_name); keep the same WHERE logic and updated_at update
and ensure only rows whose name equals the old_name are changed (use the same
predicates used for dag_name/root_dag_name checks).
- Around line 256-260: The DeleteDAGRunRows query currently returns one
dag_run_id per deleted attempt, producing duplicates when a run had multiple
attempts; change the SQL to deduplicate the returned IDs by wrapping the DELETE
in a CTE and selecting DISTINCT dag_run_id from that CTE (so keep the query name
DeleteDAGRunRows and the DELETE against dagu_dag_run_attempts with filters
root_dag_name and root_dag_run_id, but return unique dag_run_id values). Ensure
the query still uses the same sqlc :many shape but emits each dag_run_id only
once.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e79f6a70-3c9d-4ac5-b227-b22576ef599a
📒 Files selected for processing (4)
internal/persis/dagrunstore/postgres/db/attempts.sql.gointernal/persis/dagrunstore/postgres/migrations/20260506000001_add_cleanup_latest_index.sqlinternal/persis/dagrunstore/postgres/queries/attempts.sqlinternal/persis/dagrunstore/postgres/retention_query_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
internal/cmn/config/loader_test.go (1)
427-567: ⚡ Quick winAdd the missing
backend: postgres/ empty-DSN regression case.This suite never asserts the failure mode where Postgres is selected but every role DSN is empty, so the loader can keep accepting an unusable DAG-run store config and still leave this test green.
Suggested patch
t.Run("InvalidBackend", func(t *testing.T) { err := loadWithErrorFromYAML(t, ` dag_run_store: backend: cassandra `) require.Error(t, err) assert.Contains(t, err.Error(), "invalid dag_run_store.backend") }) + + t.Run("PostgresRequiresAtLeastOneDSN", func(t *testing.T) { + err := loadWithErrorFromYAML(t, ` +dag_run_store: + backend: postgres +`) + + require.Error(t, err) + assert.Contains(t, err.Error(), "dag_run_store.postgres") + }) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmn/config/loader_test.go` around lines 427 - 567, Add a new subtest inside TestLoad_DAGRunStore (e.g., "PostgresMissingDSNs") that uses loadWithErrorFromYAML to load YAML with dag_run_store.backend: postgres but no postgres role DSNs provided, then require.Error on the result and assert the error message indicates missing/empty DSNs (assert.Contains(err.Error(), "dsn")). This ensures the loader rejects a postgres backend when all role DSNs are empty; reference the TestLoad_DAGRunStore function and the helper loadWithErrorFromYAML to implement the check.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/cmd/context.go`:
- Around line 932-936: RunE currently calls ctx.Close(ctx) after runFunc
returns, but that reuses a possibly canceled command context; change the Close
call to use a non-cancelable context (e.g., context.Background() or a short
timeout derived from background) so cleanup (ctx.Close / exec.CloseDAGRunStore)
runs reliably; update the call site where runFunc is invoked and replace the ctx
parameter passed into ctx.Close(...) with a new non-cancelable context while
keeping the existing error-join logic around runErr and closeErr.
In `@internal/cmd/dagrunstore_role_test.go`:
- Around line 16-33: Add a test entry covering the "exec" command mapping:
update the test table in internal/cmd/dagrunstore_role_test.go to include {name:
"exec", want: dagrunstore.RoleAgent} so dagRunStoreRoleForCommand (the function
under test) exercises the explicit exec → RoleAgent branch (defined in
internal/cmd/context.go).
In `@internal/cmn/config/config.go`:
- Around line 637-645: validateDAGRunStore currently accepts backend=postgres
even when the Postgres connection block is empty; modify validateDAGRunStore so
that when c.DAGRunStore.Backend == DAGRunStoreBackendPostgres it verifies the
Postgres role DSN fields on c.DAGRunStore (the role DSN fields used to connect:
e.g. primary/writer/reader/replica DSNs) are not all empty and return a
descriptive fmt.Errorf (e.g. "invalid dag_run_store.postgres: all role DSNs are
empty") if they are; keep the existing switch and error path for unknown
backends and only return nil when a usable Postgres DSN is present.
In `@internal/cmn/config/key_hints.go`:
- Around line 125-154: The existing key-hints map includes legacy keys for
"dagrunstore.postgres.<role>.automigrate" but is missing the corresponding
"dag_run_store.postgres.<role>.automigrate" variants, so add entries mapping
"dag_run_store.postgres.server.automigrate" ->
"dag_run_store.postgres.server.auto_migrate",
"dag_run_store.postgres.scheduler.automigrate" ->
"dag_run_store.postgres.scheduler.auto_migrate", and
"dag_run_store.postgres.agent.automigrate" ->
"dag_run_store.postgres.agent.auto_migrate" (matching the pattern used for pool
keys) so viper-lowercased "automigrate" is recognized and routed to the existing
auto_migrate config names.
In `@internal/engine/engine.go`:
- Around line 72-81: After creating dagRunStore in the engine constructor,
ensure the store is closed if later initialization fails: add a deferred cleanup
immediately after dagRunStore, e.g. defer func() { if dagRunStore != nil { _ =
dagRunStore.Close() } }() so any early return (e.g. from newDAGStore or other
failures in New) will release the Postgres pool; then, once the engine is
successfully constructed, set dagRunStore = nil (or otherwise cancel the defer)
before returning to avoid closing the live store. Reference dagRunStore,
newDAGStore and the constructor function (New) when applying this change.
---
Nitpick comments:
In `@internal/cmn/config/loader_test.go`:
- Around line 427-567: Add a new subtest inside TestLoad_DAGRunStore (e.g.,
"PostgresMissingDSNs") that uses loadWithErrorFromYAML to load YAML with
dag_run_store.backend: postgres but no postgres role DSNs provided, then
require.Error on the result and assert the error message indicates missing/empty
DSNs (assert.Contains(err.Error(), "dsn")). This ensures the loader rejects a
postgres backend when all role DSNs are empty; reference the
TestLoad_DAGRunStore function and the helper loadWithErrorFromYAML to implement
the check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 576b3d9d-ac6b-4a0c-a41b-a6f985422d1d
📒 Files selected for processing (12)
Makefileinternal/cmd/context.gointernal/cmd/dagrunstore_role_test.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/key_hints.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/engine/engine.gointernal/persis/dagrunstore/store.gointernal/persis/dagrunstore/store_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
♻️ Duplicate comments (5)
internal/engine/engine.go (1)
72-87:⚠️ Potential issue | 🟠 Major | ⚡ Quick winClose the DAG-run store on constructor rollback.
If
dagrunstore.New(...)succeeds andnewDAGStore(...)fails, the store is leaked because cleanup only happens in(*Engine).Close. Add a deferred rollback cleanup immediately after construction and cancel it before the successful return.Suggested patch
dagRunStore, err := dagrunstore.New( ctx, cfg, dagrunstore.WithRole(dagrunstore.RoleAgent), dagrunstore.WithLatestStatusToday(false), dagrunstore.WithLocation(cfg.Core.Location), ) if err != nil { return nil, fmt.Errorf("initialize DAG-run store: %w", err) } + cleanupDAGRunStore := true + defer func() { + if cleanupDAGRunStore { + _ = coreexec.CloseDAGRunStore(context.Background(), dagRunStore) + } + }() serviceRegistry := fileserviceregistry.New(cfg.Paths.ServiceRegistryDir) dagRunMgr := runtime.NewManager(dagRunStore, procStore, cfg) dagStore, err := newDAGStore(cfg, nil, false) if err != nil { return nil, err } + cleanupDAGRunStore = false🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/engine/engine.go` around lines 72 - 87, After creating dagRunStore via dagrunstore.New in the constructor, add a deferred rollback that closes/releases dagRunStore if subsequent initialization fails (e.g., when newDAGStore returns an error); implement this by deferring a cleanup closure right after dagRunStore is assigned that checks whether the constructor is still failing and calls the appropriate dagRunStore.Close/Release method, and ensure you cancel/disable that deferred rollback (or set err=nil / flip a success flag) just before returning the successfully-constructed Engine so the store is not closed on success. Reference dagrunstore.New, dagRunStore, newDAGStore and the constructor that currently defers cleanup only in Engine.Close to locate where to add and cancel the rollback.internal/cmn/config/key_hints.go (1)
125-154:⚠️ Potential issue | 🟠 Major | ⚡ Quick winAdd the
dag_run_store.*.automigratealiases too.This block already remaps
dagrunstore.postgres.<role>.automigrate, but it still misses the partially migrateddag_run_store.postgres.<role>.automigrateforms. With Viper lowercasingautoMigratetoautomigrate, those configs silently fall back to the defaultauto_migratevalue instead of honoring the user's setting.Suggested patch
"dagrunstore.postgres.agent.pool.connmaxlifetime": "dag_run_store.postgres.agent.pool.conn_max_lifetime", "dagrunstore.postgres.agent.pool.connmaxidletime": "dag_run_store.postgres.agent.pool.conn_max_idle_time", + "dag_run_store.postgres.server.automigrate": "dag_run_store.postgres.server.auto_migrate", + "dag_run_store.postgres.scheduler.automigrate": "dag_run_store.postgres.scheduler.auto_migrate", + "dag_run_store.postgres.agent.automigrate": "dag_run_store.postgres.agent.auto_migrate", "dag_run_store.postgres.server.pool.maxopenconns": "dag_run_store.postgres.server.pool.max_open_conns",🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmn/config/key_hints.go` around lines 125 - 154, The automigrate aliases for the partially-migrated keys are missing: add mappings for "dag_run_store.postgres.server.automigrate", "dag_run_store.postgres.scheduler.automigrate", and "dag_run_store.postgres.agent.automigrate" and point each to the canonical snake_case target (e.g. map "dag_run_store.postgres.server.automigrate" -> "dag_run_store.postgres.server.auto_migrate"); mirror the existing "dagrunstore.postgres.*.automigrate" entries so Viper lowercased "automigrate" settings are honored in the dag_run_store namespace (update file key_hints.go, alongside the existing dagrunstore and dag_run_store pool mappings).internal/cmn/config/config.go (1)
637-645:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReject
backend=postgreswhen every role DSN is empty.
validateDAGRunStore()currently accepts an unusable Postgres config and defers the failure untildagrunstore.New(...). That makes a bad config passLoad()and fail only at runtime.Suggested patch
func (c *Config) validateDAGRunStore() error { switch c.DAGRunStore.Backend { case "", DAGRunStoreBackendFile: return nil case DAGRunStoreBackendPostgres: + if c.DAGRunStore.Postgres.Server.DSN == "" && + c.DAGRunStore.Postgres.Scheduler.DSN == "" && + c.DAGRunStore.Postgres.Agent.DSN == "" { + return fmt.Errorf("invalid dag_run_store.postgres: all role DSNs are empty") + } return nil default: return fmt.Errorf("invalid dag_run_store.backend: %q", c.DAGRunStore.Backend) } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmn/config/config.go` around lines 637 - 645, validateDAGRunStore() currently accepts DAGRunStoreBackendPostgres even when no Postgres role DSNs are configured; update validateDAGRunStore to, when c.DAGRunStore.Backend == DAGRunStoreBackendPostgres, inspect the Postgres role DSN fields (e.g. ReaderDSN, WriterDSN, MigratorDSN or the equivalent fields on c.DAGRunStore.Postgres) and return an error if all of them are empty (use a clear message like "invalid dag_run_store.backend: postgres selected but no role DSNs configured"); keep the existing behavior for the file backend and other checks intact.internal/cmd/dagrunstore_role_test.go (1)
16-28:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTest table still missing the
execmapping.The previous review asked for an
exec → RoleAgentrow, and that comment was marked addressed, but the table here still omits it. SincedagRunStoreRoleForCommandincontext.go(line 567) explicitly handlesexec, this branch remains untested.🔧 Proposed fix
{name: "start", want: dagrunstore.RoleAgent}, {name: "restart", want: dagrunstore.RoleAgent}, {name: "retry", want: dagrunstore.RoleAgent}, {name: "dry", want: dagrunstore.RoleAgent}, + {name: "exec", want: dagrunstore.RoleAgent}, {name: "worker", want: dagrunstore.RoleAgent},🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmd/dagrunstore_role_test.go` around lines 16 - 28, The test table in internal/cmd/dagrunstore_role_test.go is missing the "exec" case so dagRunStoreRoleForCommand in context.go isn't covered; add a test row with name: "exec" and want: dagrunstore.RoleAgent to the existing tests slice (alongside entries like "start" and "worker") so the exec → RoleAgent mapping is exercised by the test suite.internal/cmd/context.go (1)
932-936:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCleanup still uses a potentially-canceled context.
ctx.Close(ctx)forwards the embedded commandcontext.Contextintoexec.CloseDAGRunStore. By the timeRunEreturns (e.g., after SIGINT/SIGTERM or a deadline-driven cancellation), that context can already beDone, and any context-aware cleanup (notably the Postgres backend) will short-circuit precisely when resources need to be released. Use a non-cancelable context (or a fresh bounded one derived fromcontext.Background()).🔧 Proposed fix
- runErr := runFunc(ctx, args) - if closeErr := ctx.Close(ctx); closeErr != nil { - return errors.Join(runErr, closeErr) - } - return runErr + runErr := runFunc(ctx, args) + closeCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if closeErr := ctx.Close(closeCtx); closeErr != nil { + return errors.Join(runErr, closeErr) + } + return runErr🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmd/context.go` around lines 932 - 936, The cleanup is calling ctx.Close(ctx) using the possibly-canceled command context; change it to call Close with a fresh non-cancelable (or short bounded) context derived from context.Background() so cleanup (e.g., exec.CloseDAGRunStore) can complete even after the original ctx is Done; locate the call site around runFunc and ctx.Close in RunE and replace passing the embedded command context with a new background or context.WithTimeout(context.Background(), ...) context, adding necessary imports (context and optionally time).
🧹 Nitpick comments (1)
Makefile (1)
129-157: ⚡ Quick winSplit
dev-pginto smaller targets to reduce maintenance/lint friction.Line 131 onward packs orchestration, readiness checks, and app startup into one long recipe. Extracting readiness/startup into helper targets (or a small script) will make failures easier to debug and addresses the current checkmake
maxbodylengthwarning.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Makefile` around lines 129 - 157, The dev-pg Makefile target packs container orchestration, readiness polling, and app startup into one long recipe; split it into smaller targets (e.g., dev-pg-up to run the docker compose up postgres step, dev-pg-wait to run the pg_isready loop and error handling, and dev-pg-start to set the DAGU_* env vars and run go run ./cmd start-all) and make dev-pg depend on those helpers (dev-pg: ${FE_BUNDLE_JS} dev-pg-up dev-pg-wait dev-pg-start) so each step is isolated, easier to lint, test and debug and the checkmake maxbodylength warning is resolved.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@internal/cmd/context.go`:
- Around line 932-936: The cleanup is calling ctx.Close(ctx) using the
possibly-canceled command context; change it to call Close with a fresh
non-cancelable (or short bounded) context derived from context.Background() so
cleanup (e.g., exec.CloseDAGRunStore) can complete even after the original ctx
is Done; locate the call site around runFunc and ctx.Close in RunE and replace
passing the embedded command context with a new background or
context.WithTimeout(context.Background(), ...) context, adding necessary imports
(context and optionally time).
In `@internal/cmd/dagrunstore_role_test.go`:
- Around line 16-28: The test table in internal/cmd/dagrunstore_role_test.go is
missing the "exec" case so dagRunStoreRoleForCommand in context.go isn't
covered; add a test row with name: "exec" and want: dagrunstore.RoleAgent to the
existing tests slice (alongside entries like "start" and "worker") so the exec →
RoleAgent mapping is exercised by the test suite.
In `@internal/cmn/config/config.go`:
- Around line 637-645: validateDAGRunStore() currently accepts
DAGRunStoreBackendPostgres even when no Postgres role DSNs are configured;
update validateDAGRunStore to, when c.DAGRunStore.Backend ==
DAGRunStoreBackendPostgres, inspect the Postgres role DSN fields (e.g.
ReaderDSN, WriterDSN, MigratorDSN or the equivalent fields on
c.DAGRunStore.Postgres) and return an error if all of them are empty (use a
clear message like "invalid dag_run_store.backend: postgres selected but no role
DSNs configured"); keep the existing behavior for the file backend and other
checks intact.
In `@internal/cmn/config/key_hints.go`:
- Around line 125-154: The automigrate aliases for the partially-migrated keys
are missing: add mappings for "dag_run_store.postgres.server.automigrate",
"dag_run_store.postgres.scheduler.automigrate", and
"dag_run_store.postgres.agent.automigrate" and point each to the canonical
snake_case target (e.g. map "dag_run_store.postgres.server.automigrate" ->
"dag_run_store.postgres.server.auto_migrate"); mirror the existing
"dagrunstore.postgres.*.automigrate" entries so Viper lowercased "automigrate"
settings are honored in the dag_run_store namespace (update file key_hints.go,
alongside the existing dagrunstore and dag_run_store pool mappings).
In `@internal/engine/engine.go`:
- Around line 72-87: After creating dagRunStore via dagrunstore.New in the
constructor, add a deferred rollback that closes/releases dagRunStore if
subsequent initialization fails (e.g., when newDAGStore returns an error);
implement this by deferring a cleanup closure right after dagRunStore is
assigned that checks whether the constructor is still failing and calls the
appropriate dagRunStore.Close/Release method, and ensure you cancel/disable that
deferred rollback (or set err=nil / flip a success flag) just before returning
the successfully-constructed Engine so the store is not closed on success.
Reference dagrunstore.New, dagRunStore, newDAGStore and the constructor that
currently defers cleanup only in Engine.Close to locate where to add and cancel
the rollback.
---
Nitpick comments:
In `@Makefile`:
- Around line 129-157: The dev-pg Makefile target packs container orchestration,
readiness polling, and app startup into one long recipe; split it into smaller
targets (e.g., dev-pg-up to run the docker compose up postgres step, dev-pg-wait
to run the pg_isready loop and error handling, and dev-pg-start to set the
DAGU_* env vars and run go run ./cmd start-all) and make dev-pg depend on those
helpers (dev-pg: ${FE_BUNDLE_JS} dev-pg-up dev-pg-wait dev-pg-start) so each
step is isolated, easier to lint, test and debug and the checkmake maxbodylength
warning is resolved.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9b84c8f5-a05e-4160-9eb1-2cc3b3757753
📒 Files selected for processing (13)
Makefileinternal/cmd/context.gointernal/cmd/dagrunstore_role_test.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/key_hints.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/engine/engine.gointernal/persis/dagrunstore/postgres/retention_query_test.gointernal/persis/dagrunstore/store.gointernal/persis/dagrunstore/store_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/cmn/config/definition.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/intg/queue/queue_test.go (1)
120-137:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winMirror the Windows timing slack in this FIFO assertion.
This now uses the portable sleep step, but the assertion still assumes ~1s start-time precision. The same file already widens timing budgets for Windows because persisted queue timestamps are second-granularity there; keeping
time.Secondplus900*time.Millisecondhere can still flake on Windows or race builds when two sequential starts round into the same second. Reusing the widened duration/threshold logic fromTestGlobalConcurrencywould make this check much steadier.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/intg/queue/queue_test.go` around lines 120 - 137, TestLocalQueueFIFOProcessing currently asserts sequential starts with a hardcoded 900ms threshold which can flake on Windows; update the test to mirror the widened timing/slack logic used in TestGlobalConcurrency by introducing and reusing the same sleep duration and threshold variables (the ones passed into directSleepStepYAML and used to compute the allowed inter-start interval) instead of the literal 900*time.Millisecond, and use that threshold in the require.GreaterOrEqual assertion; reference TestLocalQueueFIFOProcessing, directSleepStepYAML, newFixture, WaitDrain, collectStartTimes and copy the duration/threshold calculation from TestGlobalConcurrency so the timing budget matches across tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/cmd/context.go`:
- Around line 361-364: NewContext eagerly calls dagrunstore.New which forces DB
access for every local/static command and duplicates stores when NewScheduler
also opens its own; change this by deferring or gating dagrunstore
initialization: detect whether the current command path requires runtime DAG-run
persistence (e.g., scheduler/commands that operate on DAG-run state) and only
call dagrunstore.New in those code paths, or convert the store into a
lazily-initialized singleton accessed via a getter that creates the store on
first use; also update NewScheduler to accept an optional store (or use the
shared lazy getter) so it does not open a second pool.
In `@internal/cmn/config/config.go`:
- Around line 637-647: The validateDAGRunStore function currently only checks
for empty DSNs but does not validate the Postgres pool subfields; add validation
of DAGRunStore.Postgres.Server.Pool, .Scheduler.Pool and .Agent.Pool by reusing
a shared helper (e.g. validatePostgresPool or validatePoolConfig) that enforces
allowed ranges/types (non-negative/max open/conns >= 0, min idle >= 0 and <=
max, connection max lifetime/dial timeout > 0 where required, etc.); call that
helper from validateDAGRunStore for each non-nil role pool and ensure the same
helper is used by worker.postgres_pool validation to keep behavior consistent.
In `@Makefile`:
- Around line 147-156: The Makefile currently hardcodes a 30-second retry loop
for PostgreSQL readiness; make it configurable by adding variables (e.g.,
DEV_PG_WAIT_RETRIES with a default of 30 and DEV_PG_WAIT_SLEEP with a default of
1) and replace the literal 30 and sleep 1 in the loop with those variables (use
$$DEV_PG_WAIT_RETRIES and $$DEV_PG_WAIT_SLEEP in the shell block) so callers can
override them from the environment or make command line; ensure the final check
uses the configured retries (if [ $$i -ge $$DEV_PG_WAIT_RETRIES ]) and preserve
existing logging using COLOR_RED and COLOR_RESET.
---
Outside diff comments:
In `@internal/intg/queue/queue_test.go`:
- Around line 120-137: TestLocalQueueFIFOProcessing currently asserts sequential
starts with a hardcoded 900ms threshold which can flake on Windows; update the
test to mirror the widened timing/slack logic used in TestGlobalConcurrency by
introducing and reusing the same sleep duration and threshold variables (the
ones passed into directSleepStepYAML and used to compute the allowed inter-start
interval) instead of the literal 900*time.Millisecond, and use that threshold in
the require.GreaterOrEqual assertion; reference TestLocalQueueFIFOProcessing,
directSleepStepYAML, newFixture, WaitDrain, collectStartTimes and copy the
duration/threshold calculation from TestGlobalConcurrency so the timing budget
matches across tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 519c83aa-b563-4495-9758-ff4916d6b490
📒 Files selected for processing (12)
Makefileinternal/cmd/context.gointernal/cmd/dagrunstore_role_test.gointernal/cmd/validate.gointernal/cmn/config/config.gointernal/cmn/config/key_hints.gointernal/cmn/config/loader_test.gointernal/engine/engine.gointernal/intg/queue/queue_test.gointernal/persis/dagrunstore/postgres/db/attempts.sql.gointernal/persis/dagrunstore/postgres/queries/attempts.sqlinternal/persis/dagrunstore/postgres/retention_query_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
Makefile (1)
131-165: 💤 Low valueConsider adding a
dev-pg-downcompanion target.The new
dev-pg*targets bring the Postgres container up but never tear it down.make dev-pgrunsgo run ./cmd start-allin the foreground; on Ctrl-C the container is left running, and there is no documentedmakeentrypoint to stop/clean it. Adding a small symmetric target would improve developer ergonomics without affecting the start path.♻️ Suggested addition
.PHONY: dev-pg dev-pg-up dev-pg-wait dev-pg-start dev-pg: dev-pg-start + +.PHONY: dev-pg-down +dev-pg-down: + `@printf` '%b\n' "${COLOR_GREEN}Stopping PostgreSQL DAG-run store dev environment...${COLOR_RESET}" + `@docker` compose -p ${DEV_PG_COMPOSE_PROJECT} -f ${DEV_PG_COMPOSE_FILE} downIf preserving the data volume across runs is desired, leave
downwithout-v; otherwise document-vfor a full reset.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@Makefile` around lines 131 - 165, Add a symmetric "dev-pg-down" Makefile target to stop and remove the PostgreSQL container started by dev-pg/dev-pg-up: create a PHONY dev-pg-down target that runs docker compose -p ${DEV_PG_COMPOSE_PROJECT} -f ${DEV_PG_COMPOSE_FILE} down (optionally with -v if you want to remove volumes) and prints a clear message; reference the existing dev-pg, dev-pg-up, dev-pg-start, and dev-pg-wait targets so maintainers can call it from docs or CI and choose whether to preserve volumes by default or document using -v for full reset.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@Makefile`:
- Around line 131-165: Add a symmetric "dev-pg-down" Makefile target to stop and
remove the PostgreSQL container started by dev-pg/dev-pg-up: create a PHONY
dev-pg-down target that runs docker compose -p ${DEV_PG_COMPOSE_PROJECT} -f
${DEV_PG_COMPOSE_FILE} down (optionally with -v if you want to remove volumes)
and prints a clear message; reference the existing dev-pg, dev-pg-up,
dev-pg-start, and dev-pg-wait targets so maintainers can call it from docs or CI
and choose whether to preserve volumes by default or document using -v for full
reset.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5233d196-0485-4eea-b62a-b6b76163acd2
📒 Files selected for processing (6)
Makefileinternal/cmd/context.gointernal/cmd/context_hardening_test.gointernal/cmn/config/config.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.go
👮 Files not reviewed due to content moderation or server errors (5)
- internal/cmn/config/loader.go
- internal/cmd/context.go
- internal/cmd/context_hardening_test.go
- internal/cmn/config/loader_test.go
- internal/cmn/config/config.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Add an opt-in PostgreSQL control-plane store with goose migrations, sqlc queries, UUIDv7 identifiers, role-specific connection settings, and PostgreSQL-backed DAG runs, queues, distributed state, auth, audit, workspace, session, webhook, and event stores. Keep file storage as the default, require explicit agent direct_access for local development, add dev-pg wiring with postgres:18, and add a separate PostgreSQL integration CI job.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
1 similar comment
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 17
🧹 Nitpick comments (5)
internal/runtime/builtin/redis/lock.go (2)
37-43: ⚖️ Poor tradeoff
uuid.Mustpanics atNewLockManagercall-site on entropy failure.This is acceptable at initialization time (entropy exhaustion is genuinely unrecoverable), but the caller gets no chance to handle it. Consider returning
(*LockManager, error)if the rest of the codebase prefers explicit error propagation at construction.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/builtin/redis/lock.go` around lines 37 - 43, NewLockManager currently uses uuid.Must(uuid.NewV7()) which will panic on entropy failure; change NewLockManager signature to return (*LockManager, error), call uuid.NewV7() directly, check the returned error, and propagate it (return nil, err) instead of panicking; on success construct the LockManager with lockVal set from the generated UUID and return (manager, nil), and update callers to handle the error from NewLockManager.
126-146: ⚖️ Poor tradeoff
Extendhas a non-atomic TOCTOU gap between ownership check andExpire.
Get+Expireare two separate Redis calls. If the lock expires between them, another process can acquire it and thenExpirewill silently extend the new holder's lock. This is a pre-existing issue, but worth flagging sinceExtendis the primary lease-renewal path.A Lua script (similar to
unlockScript) can make this atomic:if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("PEXPIRE", KEYS[1], ARGV[2]) else return 0 end🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/builtin/redis/lock.go` around lines 126 - 146, Extend has a TOCTOU between Get and Expire allowing another holder to be extended; replace the two-call sequence in LockManager.Extend with an atomic Redis EVAL using a Lua script (like the suggested snippet) that checks GET KEYS[1] == ARGV[1] and then PEXPIRE KEYS[1] with ARGV[2], invoke it via m.client.Eval with KEYS = []string{m.lockKey} and ARGV = {m.lockVal, duration in milliseconds}, and treat a non-zero return as success and zero as "lock not owned" (return an error); you can store the script as extendScript (analogous to unlockScript) for reuse.internal/persis/controlplanestore/postgres/cursor.go (1)
136-141: ⚡ Quick winAvoid
panicinqueryFilterHash; propagate an error instead.
json.Marshalon a struct of plain Go types cannot fail in practice, but usingpanicto signal the impossible case means any future change tonormalized's type set (e.g. adding a map or interface field that can contain a cycle) silently turns a data error into a goroutine crash. Changing the function to return(string, error)and threading that error through both callers (encodeQueryCursoranddecodeQueryCursor) is a small, localised change.♻️ Proposed change
-func queryFilterHash(opts exec.ListDAGRunStatusesOptions) string { +func queryFilterHash(opts exec.ListDAGRunStatusesOptions) (string, error) { ... data, err := json.Marshal(normalized) if err != nil { - panic(fmt.Errorf("marshal query cursor filter: %w", err)) + return "", fmt.Errorf("marshal query cursor filter: %w", err) } sum := sha256.Sum256(data) - return hex.EncodeToString(sum[:]) + return hex.EncodeToString(sum[:]), nil }Then in
encodeQueryCursoranddecodeQueryCursor, propagate the returned error.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/controlplanestore/postgres/cursor.go` around lines 136 - 141, queryFilterHash currently panics on json.Marshal errors; change its signature to return (string, error) and return an error instead of calling panic when json.Marshal fails, then update both encodeQueryCursor and decodeQueryCursor to accept and propagate that error (i.e., call queryFilterHash, check the returned error, and return or wrap it as appropriate) so serialization failures are handled without crashing the goroutine.internal/persis/controlplanestore/postgres/queue.go (1)
217-223: 💤 Low value
QueueWatcherignores its context argument.
s.QueueWatcher(ctx)acceptscontext.Contextbut discards it; the goroutine started inStart(ctx)only observes the Start-time context. If callers expect the watcher to also be torn down when the factory context is cancelled (a common pattern for store-scoped lifecycles), this silently violates that contract.If the discard is intentional — i.e., the watcher's lifetime is solely owned by
Start/Stop— consider naming the parameter_ context.Contextto make the contract explicit, or document it on the method.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/persis/controlplanestore/postgres/queue.go` around lines 217 - 223, The QueueWatcher function currently discards its context parameter; update QueueWatcher(ctx) to store the incoming context on the returned postgresQueueWatcher (e.g., add a parentCtx/context field to postgresQueueWatcher) and then have the watcher’s Start method observe both the factory parentCtx and the Start-time ctx (for example by selecting on parentCtx.Done() and startCtx.Done(), or by deriving a combined context via context.WithCancel that is cancelled when either parentCtx or startCtx is done); alternatively, if the watcher lifetime is intentionally only owned by Start/Stop, rename the parameter to _ context.Context and document that behavior — change the QueueWatcher, postgresQueueWatcher struct, and Start/Stop usage accordingly so the factory context is not silently ignored.internal/cmd/context.go (1)
287-298: ⚡ Quick winAvoid building a file event store when it will be replaced by the control-plane store.
fileeventstore.New(cfg.Paths.EventStoreDir)runs first whenevercfg.EventStore.Enabledis true (Lines 287-298), creating directories and possibly opening files. Then, once the control-plane store is constructed (Lines 391-397),eventSvcis replaced witheventstore.New(controlStore.Events())and the file-backed instance is dropped without any cleanup. For postgres backends this is wasted I/O on every command startup, and there is noCloseon the discarded service to release whatever the file collector held.Consider deferring the file-store branch until after
controlStoreis resolved, or skipping it entirely whenshouldOpenContextControlPlaneStore(cmd, cfg)will produce a control-plane-backed event store.Also applies to: 391-397
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/cmd/context.go` around lines 287 - 298, The code currently constructs a file-backed event store unconditionally when cfg.EventStore.Enabled is true, then later replaces it with a control-plane-backed store; avoid that wasted I/O by first checking whether a control-plane store will be used via shouldOpenContextControlPlaneStore(cmd, cfg) and only call fileeventstore.New(cfg.Paths.EventStoreDir) and set eventSvc when that check is false. Alternatively, move the file-store branch to after the controlStore resolution and only create the file-backed store if controlStore is not being opened; if you must construct a temporary file-backed store and later replace it, ensure the discarded resource is closed (add a Close call on the file store or eventSvc before replacement). Use the symbols fileeventstore.New, cfg.EventStore.Enabled, shouldOpenContextControlPlaneStore, controlStore, and eventstore.New(controlStore.Events()) to locate and change the logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/agent/api.go`:
- Line 1104: The current use of uuid.Must(uuid.NewV7()) (e.g., the id assignment
at the shown location and the similar usages at the other occurrences around
lines 1164 and 1279) panics instead of returning errors; replace these with
non-panicking generation by calling uuid.NewV7(), checking the returned error,
and propagating or wrapping that error via the function's existing error return
(or returning a descriptive error) where id is assigned, so the caller can
handle failures; alternatively, if the team prefers to keep uuid.Must(),
document and apply that decision consistently across all production UUID usages
rather than changing only these three sites.
In `@internal/cmd/context.go`:
- Around line 862-913: The newly-created schedulerControlPlaneStore from
controlplanestore.New is never closed; add ownership tracking and proper cleanup
similar to schedulerRunStore: set a bool (e.g., ownsSchedulerControlPlaneStore)
to true when you create schedulerControlPlaneStore in the else branch, add a
defer that calls schedulerControlPlaneStore.Close() (or the appropriate close
method) if ownsSchedulerControlPlaneStore is true so it will be closed on early
returns (including on scheduler.New error), and ensure you clear the flag (set
to false) only when ownership is intentionally transferred to the longer-lived
shutdown path (e.g., register it so Context.Close or the scheduler shutdown will
call schedulerControlPlaneStore.Close()); reference schedulerControlPlaneStore,
controlplanestore.New, ownsSchedulerRunStore/exec.CloseDAGRunStore,
scheduler.New, and Context.Close when making these changes.
In `@internal/cmn/schema/config.schema.json`:
- Around line 1011-1031: The schema currently allows
ControlPlaneStorePostgresAgentDef objects with direct_access: true but no dsn;
update the ControlPlaneStorePostgresAgentDef definition to enforce that when
"direct_access" is true the "dsn" property is required by adding a JSON Schema
conditional (an "if" that checks properties.direct_access.const true and a
matching "then" that adds required: ["dsn"]) so validation fails unless dsn is
present; reference the ControlPlaneStorePostgresAgentDef object and its
properties "direct_access" and "dsn" when making the change.
In `@internal/persis/controlplanestore/postgres/audit_store.go`:
- Around line 95-96: Clamp the pagination integers before converting to int32:
ensure `limit` is bounded (e.g., min 0, max 500 or math.MaxInt32 if you want a
hard upper clamp) and ensure `offset` is non-negative and not greater than
math.MaxInt32, then cast to int32 when assigning `RowLimit` and `RowOffset` in
the code that builds the query (the fields named RowLimit and RowOffset in
audit_store.go). Replace the direct casts int32(limit)/int32(offset) with a
small helper or inline clamping logic that applies bounds and then casts to
int32 to prevent overflow/negative values from internal callers.
In `@internal/persis/controlplanestore/postgres/auth_store.go`:
- Around line 412-424: Update currently skips the same DAG name and token-hash
validation done in Create, allowing invalid/empty values to be persisted; modify
webhookStore.Update (around parseUUIDv7, updateWebhookParams, and before
s.store.queries.UpdateWebhook) to run the same validation logic used by Create:
validate the webhook.DagName with the same validate function (or logic) used in
Create and reject invalid DAG names, and ensure webhook.TokenHash is non-empty
(return the same error values Create returns). Keep the checks before calling
updateWebhookParams and queries.UpdateWebhook so invalid updates are rejected
early.
- Around line 155-189: The Update method can write a user with only one of
OIDCIssuer / OIDCSubject set, breaking the invariant used by GetByOIDCIdentity;
before marshalling or calling s.store.queries.UpdateUser in userStore.Update,
validate that either both user.OIDCIssuer and user.OIDCSubject are empty or both
are non-empty, and if that check fails return a suitable error (e.g.
auth.ErrInvalidOIDCIdentity or another existing auth error) so Update rejects
partial OIDC identity updates just like Create does.
In `@internal/persis/controlplanestore/postgres/dagrun_query.go`:
- Around line 90-108: latestRootAttempt currently treats any non-nil error from
s.queries.FindRootRun as a "not found" (exec.ErrDAGRunIDNotFound), masking real
DB errors; change the logic in latestRootAttempt so that after LatestRootAttempt
returns pgx.ErrNoRows you call FindRootRun and: if anyErr == nil return
exec.ErrNoStatusData, else if errors.Is(anyErr, pgx.ErrNoRows) return
exec.ErrDAGRunIDNotFound, else return the actual anyErr (propagate the real DB
error). Ensure you use errors.Is and reference LatestRootAttempt, FindRootRun,
exec.ErrNoStatusData and exec.ErrDAGRunIDNotFound so callers and alerts receive
real DB errors instead of a false "missing run".
In `@internal/persis/controlplanestore/postgres/distributed.go`:
- Around line 411-430: The JSON blob is marshaled before the synthesized root is
written back to the record, so the DB row columns get the defaulted root but
row.Data keeps an empty Root; modify the block in UpsertActiveDistributedRun so
that after you compute root (the variable root and the zero check) you assign
record.Root = root (or a shallow copy) before calling json.Marshal(record), then
proceed with the UpsertActiveDistributedRun call so the serialized Data contains
the persisted defaulted root.
- Around line 325-349: The code normalizes root into the local variable root but
still marshals the original lease (so Data keeps a zero Root); before calling
json.Marshal(lease) assign the normalized root back into the lease struct (e.g.,
lease.Root = root) so the serialized Data matches the typed columns used in
UpsertDAGRunLease and reads observe the fallback Root.DAGRun value.
In `@internal/persis/controlplanestore/postgres/queries/attempts.sql`:
- Around line 39-46: The FindSubRun SQL query currently omits dag_name which
allows collisions when different child DAGs share the same dag_run_id; update
the query (named FindSubRun) to add "AND dag_name = sqlc.arg(dag_name)" to the
WHERE clause so the lookup keys are (root_dag_name, root_dag_run_id, dag_name,
dag_run_id), then regenerate sqlc bindings and update the call site
createSubAttempt to pass the new parameter (e.g., DagName: dag.Name) so the
newly created attempt is attached to the correct sub-run.
In `@internal/persis/controlplanestore/postgres/queries/auth.sql`:
- Around line 127-132: The JSONB denormalization keys are inconsistent:
UpdateAPIKeyLastUsed writes '{last_used_at}' while UpdateWebhookLastUsed uses
'{lastUsedAt}'; update the webhook-related SQL (functions/queries named
UpdateWebhookLastUsed and any other occurrences around the referenced block(s))
to use the snake_case key '{last_used_at}' (and likewise change any other
occurrences at lines ~200-205) so the JSONB key matches UpdateAPIKeyLastUsed and
the Go struct JSON tags; ensure jsonb_set/to_jsonb calls and any JSON-path
expressions use the unified '{last_used_at}' key.
In `@internal/persis/controlplanestore/postgres/queries/session.sql`:
- Around line 31-41: The two SQLC queries ListAgentSessionsByUser and
ListAgentSubSessions currently return unbounded result sets; add a hard safety
cap (e.g. append "LIMIT sqlc.arg(limit)" and validate/clip the incoming limit to
a safe maximum such as 1000) and update the store API to accept pagination
parameters (limit and optional offset) so session_store.go callers can page
results; ensure the store implementation enforces a maxLimit constant (1000)
before passing to the SQL queries to prevent OOMs even if callers request larger
values.
In `@internal/persis/controlplanestore/postgres/registry.go`:
- Around line 89-97: The loop currently ignores errors from
s.queries.DeleteServiceInstance after canceling reg.cancel and waiting reg.wg,
so deletions can silently fail (especially if ctx is canceled). Change the
delete call to run with a fresh bounded context (e.g., use context.Background()
or context.WithTimeout when ctx is already canceled) and capture the returned
error instead of discarding it; if deletion fails log the error (or retry once)
including reg.serviceName and reg.hostInfo.ID so the failure is observable and
not left to serviceStaleTimeout. Ensure you update the code around
registrations, reg.cancel, reg.wg.Wait and s.queries.DeleteServiceInstance to
use the new ctx and handle/log the error.
In `@internal/persis/controlplanestore/postgres/session_store.go`:
- Around line 187-241: The AddMessage transaction reads the session with
q.GetAgentSession without a row-level lock, allowing lost-update races; change
the sqlc query to add FOR UPDATE (e.g., create "GetAgentSessionForUpdate :one"
selecting FROM dagu_agent_sessions ... FOR UPDATE) and replace the call to
q.GetAgentSession with q.GetAgentSessionForUpdate inside the withTx block so
concurrent AddMessage calls serialize updates to Title and UpdatedAt; ensure the
new query is used in the same transaction context (withTx) and update any error
handling references accordingly.
- Around line 170-177: Currently the code silently replaces any invalid msg.ID
by calling ensureUUIDv7String and falling back to newUUIDv7String; change this
so you only auto-generate a new UUID when msg.ID is empty and otherwise validate
the provided ID: call ensureUUIDv7String only to validate a non-empty msg.ID and
return an explicit error if it is invalid, and call newUUIDv7String only when
msg.ID == "" to populate msg.ID; update the block around
ensureUUIDv7String/newUUIDv7String and the assignment to msg.ID accordingly.
In `@internal/persis/filedistributed/dispatch_task_store.go`:
- Line 81: The code currently uses uuid.Must(uuid.NewV7()) which panics on
failure; change both places (in Enqueue where fileName :=
fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(),
uuid.Must(uuid.NewV7()).String()) and in ClaimNext where uuid.Must is used) to
call uuid.NewV7() directly, check the returned error, and propagate a wrapped
error (return fmt.Errorf(...): include context like "generating task UUID" or
"generating claim UUID") instead of panicking; update the fileName construction
to use the validated uuid string and ensure both Enqueue and ClaimNext return
that error to the caller.
In `@internal/service/oidcprovision/service.go`:
- Around line 159-161: Replace the panic-prone uuid.Must call in the request
path by calling uuid.NewV7() directly, checking its returned error, and
returning a graceful error response from the OIDC provisioning handler instead
of allowing a panic; update the code that constructs the auth.User (where ID is
set) to use the validated UUID string only when err == nil, and propagate a
suitable error/HTTP response from the function that creates the user when
uuid.NewV7() fails.
---
Nitpick comments:
In `@internal/cmd/context.go`:
- Around line 287-298: The code currently constructs a file-backed event store
unconditionally when cfg.EventStore.Enabled is true, then later replaces it with
a control-plane-backed store; avoid that wasted I/O by first checking whether a
control-plane store will be used via shouldOpenContextControlPlaneStore(cmd,
cfg) and only call fileeventstore.New(cfg.Paths.EventStoreDir) and set eventSvc
when that check is false. Alternatively, move the file-store branch to after the
controlStore resolution and only create the file-backed store if controlStore is
not being opened; if you must construct a temporary file-backed store and later
replace it, ensure the discarded resource is closed (add a Close call on the
file store or eventSvc before replacement). Use the symbols fileeventstore.New,
cfg.EventStore.Enabled, shouldOpenContextControlPlaneStore, controlStore, and
eventstore.New(controlStore.Events()) to locate and change the logic.
In `@internal/persis/controlplanestore/postgres/cursor.go`:
- Around line 136-141: queryFilterHash currently panics on json.Marshal errors;
change its signature to return (string, error) and return an error instead of
calling panic when json.Marshal fails, then update both encodeQueryCursor and
decodeQueryCursor to accept and propagate that error (i.e., call
queryFilterHash, check the returned error, and return or wrap it as appropriate)
so serialization failures are handled without crashing the goroutine.
In `@internal/persis/controlplanestore/postgres/queue.go`:
- Around line 217-223: The QueueWatcher function currently discards its context
parameter; update QueueWatcher(ctx) to store the incoming context on the
returned postgresQueueWatcher (e.g., add a parentCtx/context field to
postgresQueueWatcher) and then have the watcher’s Start method observe both the
factory parentCtx and the Start-time ctx (for example by selecting on
parentCtx.Done() and startCtx.Done(), or by deriving a combined context via
context.WithCancel that is cancelled when either parentCtx or startCtx is done);
alternatively, if the watcher lifetime is intentionally only owned by
Start/Stop, rename the parameter to _ context.Context and document that behavior
— change the QueueWatcher, postgresQueueWatcher struct, and Start/Stop usage
accordingly so the factory context is not silently ignored.
In `@internal/runtime/builtin/redis/lock.go`:
- Around line 37-43: NewLockManager currently uses uuid.Must(uuid.NewV7()) which
will panic on entropy failure; change NewLockManager signature to return
(*LockManager, error), call uuid.NewV7() directly, check the returned error, and
propagate it (return nil, err) instead of panicking; on success construct the
LockManager with lockVal set from the generated UUID and return (manager, nil),
and update callers to handle the error from NewLockManager.
- Around line 126-146: Extend has a TOCTOU between Get and Expire allowing
another holder to be extended; replace the two-call sequence in
LockManager.Extend with an atomic Redis EVAL using a Lua script (like the
suggested snippet) that checks GET KEYS[1] == ARGV[1] and then PEXPIRE KEYS[1]
with ARGV[2], invoke it via m.client.Eval with KEYS = []string{m.lockKey} and
ARGV = {m.lockVal, duration in milliseconds}, and treat a non-zero return as
success and zero as "lock not owned" (return an error); you can store the script
as extendScript (analogous to unlockScript) for reuse.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2c048cca-d01c-4c47-962b-c1aee6958136
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (93)
.github/workflows/ci.yaml.golangci.ymlMakefilego.modinternal/agent/api.gointernal/agent/approval.gointernal/agent/ask_user.gointernal/agent/delegate.gointernal/agent/inputspill.gointernal/agent/remote_agent.gointernal/agent/session.gointernal/auth/apikey.gointernal/auth/user.gointernal/auth/webhook.gointernal/cmd/context.gointernal/cmd/context_hardening_test.gointernal/cmd/dagrunstore_role_test.gointernal/cmd/start.gointernal/cmd/validate.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/key_hints.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/core/exec/queue.gointernal/engine/engine.gointernal/persis/controlplanestore/file.gointernal/persis/controlplanestore/postgres/attempt.gointernal/persis/controlplanestore/postgres/attempt_test.gointernal/persis/controlplanestore/postgres/audit_store.gointernal/persis/controlplanestore/postgres/auth_store.gointernal/persis/controlplanestore/postgres/cursor.gointernal/persis/controlplanestore/postgres/cursor_test.gointernal/persis/controlplanestore/postgres/dagrun_convert.gointernal/persis/controlplanestore/postgres/dagrun_create.gointernal/persis/controlplanestore/postgres/dagrun_list.gointernal/persis/controlplanestore/postgres/dagrun_query.gointernal/persis/controlplanestore/postgres/dagrun_retention.gointernal/persis/controlplanestore/postgres/db/attempts.sql.gointernal/persis/controlplanestore/postgres/db/audit.sql.gointernal/persis/controlplanestore/postgres/db/auth.sql.gointernal/persis/controlplanestore/postgres/db/db.gointernal/persis/controlplanestore/postgres/db/distributed.sql.gointernal/persis/controlplanestore/postgres/db/event.sql.gointernal/persis/controlplanestore/postgres/db/models.gointernal/persis/controlplanestore/postgres/db/queue.sql.gointernal/persis/controlplanestore/postgres/db/registry.sql.gointernal/persis/controlplanestore/postgres/db/session.sql.gointernal/persis/controlplanestore/postgres/db/workspace.sql.gointernal/persis/controlplanestore/postgres/distributed.gointernal/persis/controlplanestore/postgres/event_store.gointernal/persis/controlplanestore/postgres/id.gointernal/persis/controlplanestore/postgres/integration_test.gointernal/persis/controlplanestore/postgres/migrate.gointernal/persis/controlplanestore/postgres/migration_test.gointernal/persis/controlplanestore/postgres/migrations/20260506000000_create_control_plane_store.sqlinternal/persis/controlplanestore/postgres/migrations/embed.gointernal/persis/controlplanestore/postgres/queries/attempts.sqlinternal/persis/controlplanestore/postgres/queries/audit.sqlinternal/persis/controlplanestore/postgres/queries/auth.sqlinternal/persis/controlplanestore/postgres/queries/distributed.sqlinternal/persis/controlplanestore/postgres/queries/event.sqlinternal/persis/controlplanestore/postgres/queries/queue.sqlinternal/persis/controlplanestore/postgres/queries/registry.sqlinternal/persis/controlplanestore/postgres/queries/session.sqlinternal/persis/controlplanestore/postgres/queries/workspace.sqlinternal/persis/controlplanestore/postgres/queue.gointernal/persis/controlplanestore/postgres/registry.gointernal/persis/controlplanestore/postgres/retention_query_test.gointernal/persis/controlplanestore/postgres/session_store.gointernal/persis/controlplanestore/postgres/store.gointernal/persis/controlplanestore/postgres/workspace_store.gointernal/persis/controlplanestore/store.gointernal/persis/controlplanestore/store_test.gointernal/persis/dagrunstore/store.gointernal/persis/dagrunstore/store_test.gointernal/persis/filedistributed/dispatch_task_store.gointernal/persis/fileeventstore/store.gointernal/remotenode/remotenode.gointernal/runtime/builtin/redis/lock.gointernal/service/audit/entry.gointernal/service/coordinator/handler.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/server.gointernal/service/frontend/server_test.gointernal/service/frontend/sse/multiplex.gointernal/service/frontend/terminal/connection.gointernal/service/oidcprovision/service.gointernal/service/scheduler/queue_processor.gointernal/service/worker/poller.gointernal/workspace/workspace.gosqlc.yaml
✅ Files skipped from review due to trivial changes (19)
- internal/service/frontend/sse/multiplex.go
- .golangci.yml
- internal/service/frontend/terminal/connection.go
- internal/persis/controlplanestore/postgres/queries/workspace.sql
- internal/service/coordinator/handler.go
- internal/persis/controlplanestore/postgres/db/registry.sql.go
- internal/persis/controlplanestore/postgres/migrations/embed.go
- internal/persis/controlplanestore/postgres/db/workspace.sql.go
- internal/persis/controlplanestore/postgres/db/event.sql.go
- internal/persis/controlplanestore/postgres/queries/distributed.sql
- internal/persis/controlplanestore/postgres/db/db.go
- internal/persis/controlplanestore/postgres/queries/queue.sql
- internal/persis/controlplanestore/postgres/db/queue.sql.go
- internal/persis/controlplanestore/postgres/attempt.go
- internal/persis/controlplanestore/postgres/db/session.sql.go
- internal/persis/controlplanestore/postgres/db/distributed.sql.go
- internal/persis/controlplanestore/postgres/db/models.go
- internal/persis/controlplanestore/postgres/db/attempts.sql.go
- internal/persis/controlplanestore/postgres/db/auth.sql.go
🚧 Files skipped from review as they are similar to previous changes (8)
- internal/service/frontend/api/v1/dagruns.go
- internal/cmd/start.go
- internal/engine/engine.go
- internal/cmn/config/definition.go
- internal/cmd/validate.go
- internal/cmn/config/loader.go
- go.mod
- internal/cmn/config/config.go
| } | ||
| } else { | ||
| id = uuid.New().String() | ||
| id = uuid.Must(uuid.NewV7()).String() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Expectation:
# - No runtime/session creation paths should use panic-based UUID creation.
# - `uuid.NewV7()` should be used with explicit error handling.
rg -nP 'uuid\.Must\(\s*uuid\.NewV7\(\)\s*\)' --type go -C2
rg -nP 'uuid\.NewV7\(\)' --type go -C2Repository: dagucloud/dagu
Length of output: 50371
🏁 Script executed:
# Read the file around the flagged lines
wc -l internal/agent/api.goRepository: dagucloud/dagu
Length of output: 85
🏁 Script executed:
# Get the specific lines and surrounding context
sed -n '1100,1110p; 1160,1170p; 1275,1285p' internal/agent/api.go | cat -nRepository: dagucloud/dagu
Length of output: 1289
🏁 Script executed:
# Get function signatures for the three methods mentioned
sed -n '1050,1115p; 1150,1175p; 1260,1290p' internal/agent/api.go | cat -nRepository: dagucloud/dagu
Length of output: 5032
🏁 Script executed:
# Check if there are any uuid.NewV7() calls with error handling in the codebase
rg 'uuid\.NewV7\(\)' --type go -B2 -A2 | grep -E '(err|error)' | head -20Repository: dagucloud/dagu
Length of output: 1489
🏁 Script executed:
# Search for error handling patterns with uuid generation
rg 'if err.*uuid' --type go -A2 -B2Repository: dagucloud/dagu
Length of output: 40
🏁 Script executed:
# Found it! Let's look at the uuid.NewV7() error handling in manager.go
sed -n '1,50p' internal/runtime/manager.go | rg -A5 'GenDAGRunID'Repository: dagucloud/dagu
Length of output: 40
🏁 Script executed:
# Get the full function that handles uuid.NewV7() errors
sed -n '/func.*GenDAGRunID/,/^}/p' internal/runtime/manager.go | head -20Repository: dagucloud/dagu
Length of output: 263
🏁 Script executed:
# Check how many uuid.NewV7() calls actually handle errors vs use Must
rg 'uuid\.NewV7\(\)' --type go | wc -l
rg 'uuid\.Must.*uuid\.NewV7' --type go | wc -lRepository: dagucloud/dagu
Length of output: 66
🏁 Script executed:
# Verify the three functions in api.go have error return types
sed -n '1105,1120p; 1160,1175p; 1275,1290p' internal/agent/api.go | grep -E '(func|return)'Repository: dagucloud/dagu
Length of output: 372
🏁 Script executed:
# Let's verify the actual line numbers more precisely for api.go
grep -n 'uuid.Must(uuid.NewV7())' internal/agent/api.goRepository: dagucloud/dagu
Length of output: 196
🏁 Script executed:
# Check the context around those lines to confirm they're in the functions mentioned
sed -n '1100,1107p; 1160,1167p; 1275,1282p' internal/agent/api.go | cat -nRepository: dagucloud/dagu
Length of output: 896
Avoid panic-based UUID generation in functions with error returns—but address this consistently across the codebase.
Lines 1104, 1164, and 1279 use uuid.Must(uuid.NewV7()). While these functions return error types and could propagate UUID generation failures, this pattern is used in 145+ places across the codebase (including other production request/session paths). Implementing error handling for only these three functions creates inconsistency. If the team intends to improve UUID error handling, apply it consistently across all production uses, or document a codebase-wide decision to rely on uuid.Must() for its extremely low failure rate.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/agent/api.go` at line 1104, The current use of
uuid.Must(uuid.NewV7()) (e.g., the id assignment at the shown location and the
similar usages at the other occurrences around lines 1164 and 1279) panics
instead of returning errors; replace these with non-panicking generation by
calling uuid.NewV7(), checking the returned error, and propagating or wrapping
that error via the function's existing error return (or returning a descriptive
error) where id is assigned, so the caller can handle failures; alternatively,
if the team prefers to keep uuid.Must(), document and apply that decision
consistently across all production UUID usages rather than changing only these
three sites.
| if c.Config.ControlPlaneStore.Backend == config.ControlPlaneStoreBackendPostgres { | ||
| if c.ControlPlaneStore != nil && controlPlaneStoreRoleForCommand(c.Command) == controlplanestore.RoleScheduler { | ||
| schedulerControlPlaneStore = c.ControlPlaneStore | ||
| } else { | ||
| schedulerControlPlaneStore, err = controlplanestore.New( | ||
| c, | ||
| c.Config, | ||
| controlplanestore.WithRole(controlplanestore.RoleScheduler), | ||
| controlplanestore.WithLatestStatusToday(false), | ||
| controlplanestore.WithLocation(c.Config.Core.Location), | ||
| controlplanestore.WithHistoryFileCache(statusCache), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to initialize scheduler control-plane store: %w", err) | ||
| } | ||
| } | ||
| if schedulerControlPlaneStore != nil { | ||
| schedulerRunStore = schedulerControlPlaneStore.DAGRuns() | ||
| schedulerQueueStore = schedulerControlPlaneStore.Queue() | ||
| schedulerServiceRegistry = schedulerControlPlaneStore.Services() | ||
| schedulerDAGRunLeaseStore = schedulerControlPlaneStore.DAGRunLeases() | ||
| schedulerDispatchTaskStore = schedulerControlPlaneStore.DispatchTasks() | ||
| } | ||
| } | ||
| if schedulerRunStore == nil { | ||
| schedulerRunStore, err = dagrunstore.New( | ||
| c, | ||
| c.Config, | ||
| dagrunstore.WithRole(dagrunstore.RoleScheduler), | ||
| dagrunstore.WithLatestStatusToday(false), | ||
| dagrunstore.WithLocation(c.Config.Core.Location), | ||
| dagrunstore.WithHistoryFileCache(statusCache), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to initialize scheduler DAG-run store: %w", err) | ||
| } | ||
| } | ||
| ownsSchedulerRunStore := true | ||
| defer func() { | ||
| if ownsSchedulerRunStore { | ||
| if closeErr := exec.CloseDAGRunStore(context.Background(), schedulerRunStore); closeErr != nil { | ||
| logger.Warn(c, "Failed to close scheduler DAG-run store", tag.Error(closeErr)) | ||
| } | ||
| } | ||
| }() | ||
| schedulerRunMgr := runtime.NewManager(schedulerRunStore, c.ProcStore, c.Config) | ||
|
|
||
| sched, err := scheduler.New(c.Config, m, schedulerRunMgr, schedulerRunStore, c.QueueStore, c.ProcStore, c.ServiceRegistry, coordinatorCli, wmStore) | ||
| sched, err := scheduler.New(c.Config, m, schedulerRunMgr, schedulerRunStore, schedulerQueueStore, c.ProcStore, schedulerServiceRegistry, coordinatorCli, wmStore) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| ownsSchedulerRunStore = false |
There was a problem hiding this comment.
Newly-opened schedulerControlPlaneStore is never closed.
When the postgres backend is selected and c.ControlPlaneStore is either nil or assigned to a different role, the else branch (Lines 865-877) opens a brand-new control-plane store with controlplanestore.New(...). Its DAG-run sub-store is then assigned to schedulerRunStore, but the existing defer at Line 900-906 only calls exec.CloseDAGRunStore(...) on the sub-store. That does not release the underlying pgx pool owned by the parent schedulerControlPlaneStore.Close() — so:
- On
scheduler.Newfailure (Line 909-912): the new control-plane store leaks (in addition to the dag-run sub-store close being effectively a no-op for the pool). - On success: the scheduler holds
schedulerRunStoreetc., but no shutdown path ever invokesschedulerControlPlaneStore.Close()— the pool lives until process exit.
This is the same ownership-transfer pattern fixed previously for schedulerRunStore; it needs to be extended to the control-plane store when this command opens it.
🔧 Sketch of the ownership pattern
if c.Config.ControlPlaneStore.Backend == config.ControlPlaneStoreBackendPostgres {
+ ownsSchedulerControlPlaneStore := false
if c.ControlPlaneStore != nil && controlPlaneStoreRoleForCommand(c.Command) == controlplanestore.RoleScheduler {
schedulerControlPlaneStore = c.ControlPlaneStore
} else {
schedulerControlPlaneStore, err = controlplanestore.New(
c, c.Config,
controlplanestore.WithRole(controlplanestore.RoleScheduler),
controlplanestore.WithLatestStatusToday(false),
controlplanestore.WithLocation(c.Config.Core.Location),
controlplanestore.WithHistoryFileCache(statusCache),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize scheduler control-plane store: %w", err)
}
+ ownsSchedulerControlPlaneStore = true
+ defer func() {
+ if ownsSchedulerControlPlaneStore {
+ if closeErr := schedulerControlPlaneStore.Close(); closeErr != nil {
+ logger.Warn(c, "Failed to close scheduler control-plane store", tag.Error(closeErr))
+ }
+ }
+ }()
}
// ... extract sub-stores ...
}
// ... open dagrun store fallback, scheduler.New, etc. ...
ownsSchedulerRunStore = false
+ // Hand ownership of the control-plane store to the scheduler/Context lifecycle.
+ ownsSchedulerControlPlaneStore = falseYou will also need a place — likely the scheduler shutdown or Context.Close — to actually invoke schedulerControlPlaneStore.Close(); otherwise the pool simply lives for the process lifetime.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/cmd/context.go` around lines 862 - 913, The newly-created
schedulerControlPlaneStore from controlplanestore.New is never closed; add
ownership tracking and proper cleanup similar to schedulerRunStore: set a bool
(e.g., ownsSchedulerControlPlaneStore) to true when you create
schedulerControlPlaneStore in the else branch, add a defer that calls
schedulerControlPlaneStore.Close() (or the appropriate close method) if
ownsSchedulerControlPlaneStore is true so it will be closed on early returns
(including on scheduler.New error), and ensure you clear the flag (set to false)
only when ownership is intentionally transferred to the longer-lived shutdown
path (e.g., register it so Context.Close or the scheduler shutdown will call
schedulerControlPlaneStore.Close()); reference schedulerControlPlaneStore,
controlplanestore.New, ownsSchedulerRunStore/exec.CloseDAGRunStore,
scheduler.New, and Context.Close when making these changes.
| "ControlPlaneStorePostgresAgentDef": { | ||
| "type": "object", | ||
| "description": "PostgreSQL control-plane store configuration for DAG execution processes.", | ||
| "additionalProperties": false, | ||
| "properties": { | ||
| "dsn": { | ||
| "type": "string", | ||
| "description": "PostgreSQL connection string used by DAG execution processes when direct_access is enabled." | ||
| }, | ||
| "auto_migrate": { | ||
| "type": "boolean", | ||
| "description": "Run control-plane store migrations when an agent process opens the store. Default: false." | ||
| }, | ||
| "direct_access": { | ||
| "type": "boolean", | ||
| "description": "Allow DAG execution processes to open the PostgreSQL control-plane store directly. Default: false; production PostgreSQL deployments should use coordinator/shared-nothing worker execution instead." | ||
| }, | ||
| "pool": { | ||
| "$ref": "#/definitions/PostgresPoolDef" | ||
| } | ||
| } |
There was a problem hiding this comment.
Require dsn when agent direct access is enabled.
Right now agent: { direct_access: true } passes schema validation without a connection string, but that configuration is not usable at runtime.
Suggested change
"ControlPlaneStorePostgresAgentDef": {
"type": "object",
"description": "PostgreSQL control-plane store configuration for DAG execution processes.",
"additionalProperties": false,
"properties": {
"dsn": {
"type": "string",
"description": "PostgreSQL connection string used by DAG execution processes when direct_access is enabled."
},
"auto_migrate": {
"type": "boolean",
"description": "Run control-plane store migrations when an agent process opens the store. Default: false."
},
"direct_access": {
"type": "boolean",
"description": "Allow DAG execution processes to open the PostgreSQL control-plane store directly. Default: false; production PostgreSQL deployments should use coordinator/shared-nothing worker execution instead."
},
"pool": {
"$ref": "#/definitions/PostgresPoolDef"
}
- }
+ },
+ "allOf": [
+ {
+ "if": {
+ "properties": {
+ "direct_access": { "const": true }
+ },
+ "required": ["direct_access"]
+ },
+ "then": {
+ "required": ["dsn"]
+ }
+ }
+ ]
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "ControlPlaneStorePostgresAgentDef": { | |
| "type": "object", | |
| "description": "PostgreSQL control-plane store configuration for DAG execution processes.", | |
| "additionalProperties": false, | |
| "properties": { | |
| "dsn": { | |
| "type": "string", | |
| "description": "PostgreSQL connection string used by DAG execution processes when direct_access is enabled." | |
| }, | |
| "auto_migrate": { | |
| "type": "boolean", | |
| "description": "Run control-plane store migrations when an agent process opens the store. Default: false." | |
| }, | |
| "direct_access": { | |
| "type": "boolean", | |
| "description": "Allow DAG execution processes to open the PostgreSQL control-plane store directly. Default: false; production PostgreSQL deployments should use coordinator/shared-nothing worker execution instead." | |
| }, | |
| "pool": { | |
| "$ref": "#/definitions/PostgresPoolDef" | |
| } | |
| } | |
| "ControlPlaneStorePostgresAgentDef": { | |
| "type": "object", | |
| "description": "PostgreSQL control-plane store configuration for DAG execution processes.", | |
| "additionalProperties": false, | |
| "properties": { | |
| "dsn": { | |
| "type": "string", | |
| "description": "PostgreSQL connection string used by DAG execution processes when direct_access is enabled." | |
| }, | |
| "auto_migrate": { | |
| "type": "boolean", | |
| "description": "Run control-plane store migrations when an agent process opens the store. Default: false." | |
| }, | |
| "direct_access": { | |
| "type": "boolean", | |
| "description": "Allow DAG execution processes to open the PostgreSQL control-plane store directly. Default: false; production PostgreSQL deployments should use coordinator/shared-nothing worker execution instead." | |
| }, | |
| "pool": { | |
| "$ref": "#/definitions/PostgresPoolDef" | |
| } | |
| }, | |
| "allOf": [ | |
| { | |
| "if": { | |
| "properties": { | |
| "direct_access": { "const": true } | |
| }, | |
| "required": ["direct_access"] | |
| }, | |
| "then": { | |
| "required": ["dsn"] | |
| } | |
| } | |
| ] | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/cmn/schema/config.schema.json` around lines 1011 - 1031, The schema
currently allows ControlPlaneStorePostgresAgentDef objects with direct_access:
true but no dsn; update the ControlPlaneStorePostgresAgentDef definition to
enforce that when "direct_access" is true the "dsn" property is required by
adding a JSON Schema conditional (an "if" that checks
properties.direct_access.const true and a matching "then" that adds required:
["dsn"]) so validation fails unless dsn is present; reference the
ControlPlaneStorePostgresAgentDef object and its properties "direct_access" and
"dsn" when making the change.
| RowLimit: int32(limit), //nolint:gosec | ||
| RowOffset: int32(offset), //nolint:gosec |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify int-based pagination fields and narrowing to int32 in the Postgres mapper.
rg -n -C2 'type QueryFilter|Limit|Offset' --type go internal
rg -n -C3 'auditQueryParams|RowLimit|RowOffset|int32\(limit\)|int32\(offset\)' internal/persis/controlplanestore/postgres/audit_store.goRepository: dagucloud/dagu
Length of output: 50371
🏁 Script executed:
cat -n internal/persis/controlplanestore/postgres/audit_store.go | head -120Repository: dagucloud/dagu
Length of output: 4051
🏁 Script executed:
rg -n "type QueryFilter" --type go -A 15 internal/service/audit/Repository: dagucloud/dagu
Length of output: 1126
🏁 Script executed:
rg -n "auditQueryParams" --type go internal/Repository: dagucloud/dagu
Length of output: 327
🏁 Script executed:
rg -n "\.Query\(|\.Audit\(\)" --type go internal/service/frontend/api | grep -i auditRepository: dagucloud/dagu
Length of output: 153
🏁 Script executed:
rg -n "audit.Query|auditStore.Query|AuditStore.*Query" --type go internal/Repository: dagucloud/dagu
Length of output: 2081
🏁 Script executed:
cat -n internal/service/frontend/api/v1/audit.go | head -100Repository: dagucloud/dagu
Length of output: 3248
🏁 Script executed:
rg -n "filter.Offset|filter.Limit" --type go internal/service/frontend/api/v1/audit.goRepository: dagucloud/dagu
Length of output: 323
🏁 Script executed:
# Check if there are other similar direct int to int32 casts for pagination
rg -n "int32\(" --type go internal/persis/controlplanestore/postgres/ | grep -i "limit\|offset"Repository: dagucloud/dagu
Length of output: 935
Clamp pagination values before narrowing to int32.
Lines 95-96 cast int values directly to int32 without upper bounds checking. While the API handler caps limit to 500, offset has no upper bound and can overflow to invalid negative values when cast to int32. Defensive clamping protects against direct internal callers that bypass API validation.
Proposed fix
func auditQueryParams(filter audit.QueryFilter, limit, offset int) db.QueryAuditEntriesParams {
+ const maxInt32 = int(^uint32(0) >> 1)
+ if limit > maxInt32 {
+ limit = maxInt32
+ }
+ if offset > maxInt32 {
+ offset = maxInt32
+ }
return db.QueryAuditEntriesParams{
HasCategory: filter.Category != "",
Category: string(filter.Category),🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/controlplanestore/postgres/audit_store.go` around lines 95 -
96, Clamp the pagination integers before converting to int32: ensure `limit` is
bounded (e.g., min 0, max 500 or math.MaxInt32 if you want a hard upper clamp)
and ensure `offset` is non-negative and not greater than math.MaxInt32, then
cast to int32 when assigning `RowLimit` and `RowOffset` in the code that builds
the query (the fields named RowLimit and RowOffset in audit_store.go). Replace
the direct casts int32(limit)/int32(offset) with a small helper or inline
clamping logic that applies bounds and then casts to int32 to prevent
overflow/negative values from internal callers.
| func (s *userStore) Update(ctx context.Context, user *auth.User) error { | ||
| if user == nil { | ||
| return errors.New("postgres user store: user cannot be nil") | ||
| } | ||
| id, err := parseUUIDv7(user.ID) | ||
| if err != nil { | ||
| return auth.ErrInvalidUserID | ||
| } | ||
| if user.Username == "" { | ||
| return auth.ErrInvalidUsername | ||
| } | ||
| if !user.Role.Valid() { | ||
| return auth.ErrInvalidRole | ||
| } | ||
| workspaceAccess, err := marshalWorkspaceAccess(user.WorkspaceAccess) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| data, err := json.Marshal(user.ToStorage()) | ||
| if err != nil { | ||
| return fmt.Errorf("marshal user: %w", err) | ||
| } | ||
| rows, err := s.store.queries.UpdateUser(ctx, db.UpdateUserParams{ | ||
| Username: user.Username, | ||
| PasswordHash: user.PasswordHash, | ||
| Role: string(user.Role), | ||
| AuthProvider: user.AuthProvider, | ||
| OidcIssuer: user.OIDCIssuer, | ||
| OidcSubject: user.OIDCSubject, | ||
| IsDisabled: user.IsDisabled, | ||
| WorkspaceAccess: workspaceAccess, | ||
| Data: data, | ||
| UpdatedAt: timestamptz(user.UpdatedAt), | ||
| ID: id, | ||
| }) |
There was a problem hiding this comment.
Keep the OIDC identity invariant on update.
Create rejects rows where only one of OIDCIssuer / OIDCSubject is set, but Update can write exactly that state. That leaves a user record that GetByOIDCIdentity cannot match correctly.
Suggested change
if user.Username == "" {
return auth.ErrInvalidUsername
}
if !user.Role.Valid() {
return auth.ErrInvalidRole
}
+ if (user.OIDCIssuer == "") != (user.OIDCSubject == "") {
+ return auth.ErrOIDCIdentityNotFound
+ }
workspaceAccess, err := marshalWorkspaceAccess(user.WorkspaceAccess)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *userStore) Update(ctx context.Context, user *auth.User) error { | |
| if user == nil { | |
| return errors.New("postgres user store: user cannot be nil") | |
| } | |
| id, err := parseUUIDv7(user.ID) | |
| if err != nil { | |
| return auth.ErrInvalidUserID | |
| } | |
| if user.Username == "" { | |
| return auth.ErrInvalidUsername | |
| } | |
| if !user.Role.Valid() { | |
| return auth.ErrInvalidRole | |
| } | |
| workspaceAccess, err := marshalWorkspaceAccess(user.WorkspaceAccess) | |
| if err != nil { | |
| return err | |
| } | |
| data, err := json.Marshal(user.ToStorage()) | |
| if err != nil { | |
| return fmt.Errorf("marshal user: %w", err) | |
| } | |
| rows, err := s.store.queries.UpdateUser(ctx, db.UpdateUserParams{ | |
| Username: user.Username, | |
| PasswordHash: user.PasswordHash, | |
| Role: string(user.Role), | |
| AuthProvider: user.AuthProvider, | |
| OidcIssuer: user.OIDCIssuer, | |
| OidcSubject: user.OIDCSubject, | |
| IsDisabled: user.IsDisabled, | |
| WorkspaceAccess: workspaceAccess, | |
| Data: data, | |
| UpdatedAt: timestamptz(user.UpdatedAt), | |
| ID: id, | |
| }) | |
| func (s *userStore) Update(ctx context.Context, user *auth.User) error { | |
| if user == nil { | |
| return errors.New("postgres user store: user cannot be nil") | |
| } | |
| id, err := parseUUIDv7(user.ID) | |
| if err != nil { | |
| return auth.ErrInvalidUserID | |
| } | |
| if user.Username == "" { | |
| return auth.ErrInvalidUsername | |
| } | |
| if !user.Role.Valid() { | |
| return auth.ErrInvalidRole | |
| } | |
| if (user.OIDCIssuer == "") != (user.OIDCSubject == "") { | |
| return auth.ErrOIDCIdentityNotFound | |
| } | |
| workspaceAccess, err := marshalWorkspaceAccess(user.WorkspaceAccess) | |
| if err != nil { | |
| return err | |
| } | |
| data, err := json.Marshal(user.ToStorage()) | |
| if err != nil { | |
| return fmt.Errorf("marshal user: %w", err) | |
| } | |
| rows, err := s.store.queries.UpdateUser(ctx, db.UpdateUserParams{ | |
| Username: user.Username, | |
| PasswordHash: user.PasswordHash, | |
| Role: string(user.Role), | |
| AuthProvider: user.AuthProvider, | |
| OidcIssuer: user.OIDCIssuer, | |
| OidcSubject: user.OIDCSubject, | |
| IsDisabled: user.IsDisabled, | |
| WorkspaceAccess: workspaceAccess, | |
| Data: data, | |
| UpdatedAt: timestamptz(user.UpdatedAt), | |
| ID: id, | |
| }) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/controlplanestore/postgres/auth_store.go` around lines 155 -
189, The Update method can write a user with only one of OIDCIssuer /
OIDCSubject set, breaking the invariant used by GetByOIDCIdentity; before
marshalling or calling s.store.queries.UpdateUser in userStore.Update, validate
that either both user.OIDCIssuer and user.OIDCSubject are empty or both are
non-empty, and if that check fails return a suitable error (e.g.
auth.ErrInvalidOIDCIdentity or another existing auth error) so Update rejects
partial OIDC identity updates just like Create does.
| for _, reg := range registrations { | ||
| if reg.cancel != nil { | ||
| reg.cancel() | ||
| } | ||
| reg.wg.Wait() | ||
| _, _ = s.queries.DeleteServiceInstance(ctx, db.DeleteServiceInstanceParams{ | ||
| ServiceName: string(reg.serviceName), | ||
| InstanceID: reg.hostInfo.ID, | ||
| }) |
There was a problem hiding this comment.
Don’t silently drop unregister cleanup failures.
Once the heartbeat is canceled, a failed DeleteServiceInstance leaves the row discoverable until serviceStaleTimeout elapses. This is especially easy to hit during shutdown if ctx is already canceled.
Suggested change
for _, reg := range registrations {
if reg.cancel != nil {
reg.cancel()
}
reg.wg.Wait()
- _, _ = s.queries.DeleteServiceInstance(ctx, db.DeleteServiceInstanceParams{
+ cleanupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second)
+ _, err := s.queries.DeleteServiceInstance(cleanupCtx, db.DeleteServiceInstanceParams{
ServiceName: string(reg.serviceName),
InstanceID: reg.hostInfo.ID,
})
+ cancel()
+ if err != nil {
+ // surface this through logging/metrics instead of dropping it silently
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, reg := range registrations { | |
| if reg.cancel != nil { | |
| reg.cancel() | |
| } | |
| reg.wg.Wait() | |
| _, _ = s.queries.DeleteServiceInstance(ctx, db.DeleteServiceInstanceParams{ | |
| ServiceName: string(reg.serviceName), | |
| InstanceID: reg.hostInfo.ID, | |
| }) | |
| for _, reg := range registrations { | |
| if reg.cancel != nil { | |
| reg.cancel() | |
| } | |
| reg.wg.Wait() | |
| cleanupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) | |
| _, err := s.queries.DeleteServiceInstance(cleanupCtx, db.DeleteServiceInstanceParams{ | |
| ServiceName: string(reg.serviceName), | |
| InstanceID: reg.hostInfo.ID, | |
| }) | |
| cancel() | |
| if err != nil { | |
| // surface this through logging/metrics instead of dropping it silently | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/controlplanestore/postgres/registry.go` around lines 89 - 97,
The loop currently ignores errors from s.queries.DeleteServiceInstance after
canceling reg.cancel and waiting reg.wg, so deletions can silently fail
(especially if ctx is canceled). Change the delete call to run with a fresh
bounded context (e.g., use context.Background() or context.WithTimeout when ctx
is already canceled) and capture the returned error instead of discarding it; if
deletion fails log the error (or retry once) including reg.serviceName and
reg.hostInfo.ID so the failure is observable and not left to
serviceStaleTimeout. Ensure you update the code around registrations,
reg.cancel, reg.wg.Wait and s.queries.DeleteServiceInstance to use the new ctx
and handle/log the error.
| msgIDString, msgUUID, err := ensureUUIDv7String(msg.ID) | ||
| if err != nil { | ||
| msgIDString, msgUUID, err = newUUIDv7String() | ||
| if err != nil { | ||
| return fmt.Errorf("generate message id: %w", err) | ||
| } | ||
| } | ||
| msg.ID = msgIDString |
There was a problem hiding this comment.
Silent replacement of a non-empty invalid msg.ID is unexpected.
ensureUUIDv7String returns an error for any non-UUID v7 string, including a non-empty but wrongly-formatted ID. When that happens the code silently generates a fresh UUID and overwrites msg.ID on the caller's object. Callers that set an explicit ID (e.g. for idempotent inserts) would have their ID silently changed without any indication.
A cleaner contract: only auto-generate when msg.ID == ""; reject a non-empty invalid ID with an explicit error.
🛡️ Proposed fix
- msgIDString, msgUUID, err := ensureUUIDv7String(msg.ID)
- if err != nil {
- msgIDString, msgUUID, err = newUUIDv7String()
- if err != nil {
- return fmt.Errorf("generate message id: %w", err)
- }
- }
+ var msgIDString string
+ var msgUUID uuid.UUID
+ if msg.ID == "" {
+ var err error
+ msgIDString, msgUUID, err = newUUIDv7String()
+ if err != nil {
+ return fmt.Errorf("generate message id: %w", err)
+ }
+ } else {
+ var err error
+ msgIDString, msgUUID, err = ensureUUIDv7String(msg.ID)
+ if err != nil {
+ return agent.ErrInvalidSessionID
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/controlplanestore/postgres/session_store.go` around lines 170
- 177, Currently the code silently replaces any invalid msg.ID by calling
ensureUUIDv7String and falling back to newUUIDv7String; change this so you only
auto-generate a new UUID when msg.ID is empty and otherwise validate the
provided ID: call ensureUUIDv7String only to validate a non-empty msg.ID and
return an explicit error if it is invalid, and call newUUIDv7String only when
msg.ID == "" to populate msg.ID; update the block around
ensureUUIDv7String/newUUIDv7String and the assignment to msg.ID accordingly.
| return s.store.withTx(ctx, func(q *db.Queries) error { | ||
| row, err := q.GetAgentSession(ctx, sessionUUID) | ||
| if errors.Is(err, pgx.ErrNoRows) { | ||
| return agent.ErrSessionNotFound | ||
| } | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if err := q.AddAgentSessionMessage(ctx, db.AddAgentSessionMessageParams{ | ||
| ID: msgUUID, | ||
| SessionID: sessionUUID, | ||
| MessageType: string(msg.Type), | ||
| SequenceID: msg.SequenceID, | ||
| CreatedAt: timestamptz(msg.CreatedAt), | ||
| Data: data, | ||
| }); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| updatedAt := time.Now().UTC() | ||
| sess, err := sessionFromRow(row) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if sess.Title == "" && msg.Type == agent.MessageTypeUser && msg.Content != "" { | ||
| sess.Title = truncateAgentSessionTitle(msg.Content) | ||
| } | ||
| sess.UpdatedAt = updatedAt | ||
| parentID, err := nullUUIDv7(sess.ParentSessionID) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| sessionData, err := json.Marshal(sessionStorageFromSession(sess, nil)) | ||
| if err != nil { | ||
| return fmt.Errorf("marshal session: %w", err) | ||
| } | ||
| rows, err := q.UpdateAgentSession(ctx, db.UpdateAgentSessionParams{ | ||
| ID: sessionUUID, | ||
| UserID: sess.UserID, | ||
| DagName: sess.DAGName, | ||
| Title: sess.Title, | ||
| Model: sess.Model, | ||
| ParentSessionID: parentID, | ||
| DelegateTask: sess.DelegateTask, | ||
| Data: sessionData, | ||
| UpdatedAt: timestamptz(updatedAt), | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if rows == 0 { | ||
| return agent.ErrSessionNotFound | ||
| } | ||
| return nil | ||
| }) |
There was a problem hiding this comment.
Lost-update race in AddMessage: missing row-level lock on session read.
Inside the transaction, GetAgentSession reads the session at READ COMMITTED isolation with no locking. Two concurrent AddMessage calls for the same session can both observe an empty title, both derive a title from their respective first user-message content, and the last writer silently overwrites the earlier commit. The same race applies to UpdatedAt.
Add SELECT ... FOR UPDATE (or a FOR UPDATE variant in the sqlc query) when fetching the session row to serialise concurrent writers.
-- name: GetAgentSessionForUpdate :one
SELECT *
FROM dagu_agent_sessions
WHERE id = sqlc.arg(id)
FOR UPDATE;Then use q.GetAgentSessionForUpdate inside withTx.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/controlplanestore/postgres/session_store.go` around lines 187
- 241, The AddMessage transaction reads the session with q.GetAgentSession
without a row-level lock, allowing lost-update races; change the sqlc query to
add FOR UPDATE (e.g., create "GetAgentSessionForUpdate :one" selecting FROM
dagu_agent_sessions ... FOR UPDATE) and replace the call to q.GetAgentSession
with q.GetAgentSessionForUpdate inside the withTx block so concurrent AddMessage
calls serialize updates to Title and UpdatedAt; ensure the new query is used in
the same transaction context (withTx) and update any error handling references
accordingly.
|
|
||
| enqueuedAt := time.Now().UTC() | ||
| fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), uuid.NewString()) | ||
| fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), uuid.Must(uuid.NewV7()).String()) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify all panic-based UUIDv7 call sites and google/uuid version usage.
set -euo pipefail
echo "== go.mod / go.sum references =="
rg -n --type=go 'github.com/google/uuid|^go [0-9]+\.[0-9]+' go.mod go.sum || true
echo
echo "== panic-based UUIDv7 call sites =="
rg -n --type=go 'uuid\.Must\s*\(\s*uuid\.NewV7\(\)\s*\)' internalRepository: dagucloud/dagu
Length of output: 14833
Handle UUID generation errors instead of panicking in dispatch task store
Lines 81 and 117 use uuid.Must(uuid.NewV7()), which panics on UUID generation failures. Since both Enqueue and ClaimNext already return errors, handle generation failures explicitly to avoid process crashes.
Proposed fix
- fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), uuid.Must(uuid.NewV7()).String())
+ taskID, err := uuid.NewV7()
+ if err != nil {
+ return fmt.Errorf("generate dispatch task id: %w", err)
+ }
+ fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), taskID.String())- claimToken := uuid.Must(uuid.NewV7()).String()
+ claimID, err := uuid.NewV7()
+ if err != nil {
+ return nil, fmt.Errorf("generate dispatch claim token: %w", err)
+ }
+ claimToken := claimID.String()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), uuid.Must(uuid.NewV7()).String()) | |
| taskID, err := uuid.NewV7() | |
| if err != nil { | |
| return fmt.Errorf("generate dispatch task id: %w", err) | |
| } | |
| fileName := fmt.Sprintf("task_%020d_%s.json", enqueuedAt.UnixMilli(), taskID.String()) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/filedistributed/dispatch_task_store.go` at line 81, The code
currently uses uuid.Must(uuid.NewV7()) which panics on failure; change both
places (in Enqueue where fileName := fmt.Sprintf("task_%020d_%s.json",
enqueuedAt.UnixMilli(), uuid.Must(uuid.NewV7()).String()) and in ClaimNext where
uuid.Must is used) to call uuid.NewV7() directly, check the returned error, and
propagate a wrapped error (return fmt.Errorf(...): include context like
"generating task UUID" or "generating claim UUID") instead of panicking; update
the fileName construction to use the validated uuid string and ensure both
Enqueue and ClaimNext return that error to the caller.
| user = &auth.User{ | ||
| ID: uuid.New().String(), | ||
| ID: uuid.Must(uuid.NewV7()).String(), | ||
| Username: username, |
There was a problem hiding this comment.
Replace uuid.Must() with explicit error handling in the request path.
NewV7 returns Nil and an error on failure (when crypto/rand is unavailable). Wrapping it in uuid.Must() inside a per-request handler causes a panic instead of a graceful error return, which would crash the goroutine serving this OIDC login.
🛡️ Proposed fix
+ id, err := uuid.NewV7()
+ if err != nil {
+ return nil, false, fmt.Errorf("failed to generate user ID: %w", err)
+ }
user = &auth.User{
- ID: uuid.Must(uuid.NewV7()).String(),
+ ID: id.String(),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| user = &auth.User{ | |
| ID: uuid.New().String(), | |
| ID: uuid.Must(uuid.NewV7()).String(), | |
| Username: username, | |
| id, err := uuid.NewV7() | |
| if err != nil { | |
| return nil, false, fmt.Errorf("failed to generate user ID: %w", err) | |
| } | |
| user = &auth.User{ | |
| ID: id.String(), | |
| Username: username, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/service/oidcprovision/service.go` around lines 159 - 161, Replace
the panic-prone uuid.Must call in the request path by calling uuid.NewV7()
directly, checking its returned error, and returning a graceful error response
from the OIDC provisioning handler instead of allowing a panic; update the code
that constructs the auth.User (where ID is set) to use the validated UUID string
only when err == nil, and propagate a suitable error/HTTP response from the
function that creates the user when uuid.NewV7() fails.
…-store # Conflicts: # internal/service/frontend/server.go
Summary
make dev-pgwith a PostgreSQL 18 compose serviceTesting
go tool sqlc generatego test ./internal/cmn/config ./internal/cmn/schema ./internal/cmd ./internal/enginego test ./internal/core/exec ./internal/persis/filedagrun ./internal/service/frontend/api/v1go test ./internal/persis/dagrunstore/...docker compose -p dagu-dev-pg-verify-18 -f compose.postgres.yaml configdocker compose -p dagu-dev-pg-verify-18 -f compose.postgres.yaml up -d postgres+pg_isready -U dagu -d daguSummary by CodeRabbit
Release Notes
New Features
Development