Motivation
github.com/storacha/go-libstoracha/jobqueue is used in exactly two places in sprue:
pkg/store/agent/aws/store.go
pkg/store/agent/postgres/store.go
Both use it for the same purpose: bounding concurrency (25) on the fan-out of writes in agent.Store.Write (one S3 payload put + N metadata index writes). No other store uses it. The jobqueue is solving a rate-limiting problem with an async-background-work tool, and the impedance mismatch introduces real data-durability risks for little gain.
Problems with the current design
1. No atomicity across the fan-out
A single Write submits 1 S3 put + N independent index writes. There is no transaction wrapping them. Partial failures leave:
- S3 payload written, index failed → the CAR is in S3 but
GetInvocation/GetReceipt never find it. Effectively lost.
- Index written, S3 failed → dangling pointer;
getByTask resolves to a missing S3 key.
- Partial index → some invocations/receipts in one agent message are queryable, others silently missing.
The Postgres backend is worse: each index insert is a separate INSERT outside a transaction. A mid-batch pool failure commits some rows and drops others with no rollback.
2. Cancellation leaks writes past Write's return
jobqueue executes jobs on an internal context.Background() (jobqueue.go:169). The caller's ctx is only used for the queueing step, not for the job itself. Consequences:
- Caller cancels mid-
Write: already-queued jobs finish on the background ctx (bounded only by the 30s job timeout) and land in S3/Postgres after Write returned an error.
Write does wg.Add(1) before Queue(). If Queue returns ctx.Err(), wg.Done is never called — but Write bails immediately on the queue error without draining the in-flight goroutines. In-flight writes keep racing to commit while the caller walks away.
3. Shutdown can silently drop in-flight work
No WithShutdownTimeout is configured. The only bound on shutdown is the ctx passed to Store.Shutdown. If that ctx expires first, Shutdown returns ctx.Err() — but the workers keep running on jobqueue's background ctx. When fx then tears down the pool / HTTP client, those in-flight writes fail silently.
4. Error reporting is lossy
Both handlers do job.callback(err); return nil. The jobqueue treats every job as successful regardless of outcome, so there is no retry path and WithErrorHandler is unset. The only error signal is the sync.WaitGroup callback — ephemeral, and already orphaned by the time any of the above failure modes kick in.
5. Complexity with no unique benefit
All the jobqueue gives here is a bounded-concurrency counter. Everything else (handler registration, shutdown channels, quit-or-job type switching, buffer config) is overhead for a use case that doesn't need async decoupling — Write already blocks on wg.Wait() in the happy path.
Proposal
Remove jobqueue from both agent store backends. Replace with in-band, caller-context-bound writes and bounded concurrency via a semaphore.
Concrete changes
- Delete
jobqueue.JobQueue, writeJob/awsWriteJob, and the shutdown plumbing from both stores.
- Bound concurrency with a semaphore (
chan struct{} of size 25) acquired inside Write. This keeps writes on the caller's context, so cancellation actually cancels work and shutdown becomes trivial.
- Write order: payload first, then index. Commit the index only after the S3 payload is durable. Orphans in S3 are tolerable / GC-able; dangling metadata pointers are not.
- Batch index writes in a single operation per call:
- Postgres: one
INSERT ... VALUES (...), (...) (or COPY for large batches) inside a pgx.Tx.
- DynamoDB:
BatchWriteItem (25-item limit fits the typical fan-out).
- Drop the
Shutdown implementation on agent.Store (or keep it as a no-op), since there's no background queue to drain.
Affected files
pkg/store/agent/aws/store.go
pkg/store/agent/postgres/store.go
- No other sprue code imports
jobqueue (grep -rn jobqueue pkg/ shows only these two files).
go-libstoracha dep stays — only the jobqueue sub-import goes away.
Acceptance criteria
agent.Store.Write runs with caller-bound context; cancelling the caller's context stops pending work before it commits.
- Index writes are atomic per
Write call (single tx / batch).
- Write ordering guarantees: payload durable before index.
- No background goroutines owned by
agent.Store; Shutdown is a no-op (or removed if the interface allows).
- Existing
agent.Store tests pass; add a test exercising partial-failure semantics (payload OK, index tx fails, and vice versa).
jobqueue import removed from pkg/store/agent/{aws,postgres}/store.go.
Out of scope
go-libstoracha/jobqueue itself — it has legitimate users outside sprue. This issue is about sprue's usage, not the package.
- Other stores — none of them use jobqueue today.
Motivation
github.com/storacha/go-libstoracha/jobqueueis used in exactly two places in sprue:pkg/store/agent/aws/store.gopkg/store/agent/postgres/store.goBoth use it for the same purpose: bounding concurrency (25) on the fan-out of writes in
agent.Store.Write(one S3 payload put + N metadata index writes). No other store uses it. The jobqueue is solving a rate-limiting problem with an async-background-work tool, and the impedance mismatch introduces real data-durability risks for little gain.Problems with the current design
1. No atomicity across the fan-out
A single
Writesubmits 1 S3 put + N independent index writes. There is no transaction wrapping them. Partial failures leave:GetInvocation/GetReceiptnever find it. Effectively lost.getByTaskresolves to a missing S3 key.The Postgres backend is worse: each index insert is a separate
INSERToutside a transaction. A mid-batch pool failure commits some rows and drops others with no rollback.2. Cancellation leaks writes past
Write's returnjobqueueexecutes jobs on an internalcontext.Background()(jobqueue.go:169). The caller's ctx is only used for the queueing step, not for the job itself. Consequences:Write: already-queued jobs finish on the background ctx (bounded only by the 30s job timeout) and land in S3/Postgres afterWritereturned an error.Writedoeswg.Add(1)beforeQueue(). IfQueuereturnsctx.Err(),wg.Doneis never called — butWritebails immediately on the queue error without draining the in-flight goroutines. In-flight writes keep racing to commit while the caller walks away.3. Shutdown can silently drop in-flight work
No
WithShutdownTimeoutis configured. The only bound on shutdown is the ctx passed toStore.Shutdown. If that ctx expires first,Shutdownreturnsctx.Err()— but the workers keep running on jobqueue's background ctx. When fx then tears down the pool / HTTP client, those in-flight writes fail silently.4. Error reporting is lossy
Both handlers do
job.callback(err); return nil. The jobqueue treats every job as successful regardless of outcome, so there is no retry path andWithErrorHandleris unset. The only error signal is thesync.WaitGroupcallback — ephemeral, and already orphaned by the time any of the above failure modes kick in.5. Complexity with no unique benefit
All the jobqueue gives here is a bounded-concurrency counter. Everything else (handler registration, shutdown channels, quit-or-job type switching, buffer config) is overhead for a use case that doesn't need async decoupling —
Writealready blocks onwg.Wait()in the happy path.Proposal
Remove
jobqueuefrom both agent store backends. Replace with in-band, caller-context-bound writes and bounded concurrency via a semaphore.Concrete changes
jobqueue.JobQueue,writeJob/awsWriteJob, and the shutdown plumbing from both stores.chan struct{}of size 25) acquired insideWrite. This keeps writes on the caller's context, so cancellation actually cancels work and shutdown becomes trivial.INSERT ... VALUES (...), (...)(orCOPYfor large batches) inside apgx.Tx.BatchWriteItem(25-item limit fits the typical fan-out).Shutdownimplementation onagent.Store(or keep it as a no-op), since there's no background queue to drain.Affected files
pkg/store/agent/aws/store.gopkg/store/agent/postgres/store.gojobqueue(grep -rn jobqueue pkg/shows only these two files).go-libstorachadep stays — only thejobqueuesub-import goes away.Acceptance criteria
agent.Store.Writeruns with caller-bound context; cancelling the caller's context stops pending work before it commits.Writecall (single tx / batch).agent.Store;Shutdownis a no-op (or removed if the interface allows).agent.Storetests pass; add a test exercising partial-failure semantics (payload OK, index tx fails, and vice versa).jobqueueimport removed frompkg/store/agent/{aws,postgres}/store.go.Out of scope
go-libstoracha/jobqueueitself — it has legitimate users outside sprue. This issue is about sprue's usage, not the package.