feat: add methods to list and manage failed jobs in Redis#69
Conversation
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughAdds Redis-backed failed-job management to Queue: scanning failed-job keys, listing all failed jobs, fetching a failed job by ID, and clearing failed-job entries. New tests validate listing, lookup, clearing, and Redis error handling. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Producer
participant Queue as Queue
participant Worker as Worker
participant Redis as Redis
participant Client as Client
rect rgba(200,200,255,0.5)
Producer->>Queue: AddJob(job)
Queue->>Redis: RPUSH queue:list job
end
rect rgba(200,255,200,0.5)
Worker->>Redis: BRPOP queue:list
Worker->>Worker: Process job (fails)
Worker->>Redis: SET failed:<jobId> = failureReason
end
rect rgba(255,200,200,0.5)
Client->>Queue: GetFailedJobs()
Queue->>Redis: SCAN failed:* -> keys
Queue->>Redis: MGET keys -> failureReasons
Queue-->>Client: []Job{Id, FailureReason, Status}
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
bdcbc62 to
0664f40
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@failed_jobs_test.go`:
- Around line 49-50: Replace the fixed time.Sleep(500 * time.Millisecond) calls
in failed_jobs_test.go with an eventual/polling assertion: repeatedly check the
expected condition (e.g., call the same helper that inspects job state such as
listFailedJobs/getJobStatus or the DB/query used elsewhere in this test) at
short intervals until it becomes true or a reasonable timeout elapses, failing
the test on timeout; do this for the three sleep sites (the Sleep at the shown
diff and the other occurrences around lines noted 101-103 and 149-150) so tests
wait deterministically for the expected state instead of sleeping a fixed
duration.
- Around line 14-23: Add a Redis connectivity gate before creating the
failedQueue in failed_jobs_test.go: attempt a quick ping/connect to the same
Redis address used in queue.New (localhost:6379) and if it fails, call t.Skipf
(or skip the test) unless an explicit env override (e.g., RUN_REDIS_TESTS=true)
is set; apply the same pattern to the other places noted (lines ~75-83 and
~120-128) so any code that constructs queues via queue.New is guarded by the
connectivity check and optional env override.
In `@queue.go`:
- Around line 507-531: In GetFailedJobs, don't swallow all q.client.Get(...)
errors; only ignore redis.Nil (missing key) and propagate any other errors
(network/timeouts) up to the caller. Update the loop in GetFailedJobs to check
if err == redis.Nil and continue, otherwise return nil, err; keep using
scanFailedJobKeys and the same job construction (Id, FailedReason, Status,
queue) when a value is present. Ensure the error handling mirrors the pattern
used in GetFailedJob so transient Redis failures are not hidden.
🧹 Nitpick comments (1)
queue.go (1)
479-501: Consider adding explicit:failed:namespace prefix for better key isolation.Currently,
scanFailedJobKeys()uses the patterncachedKey + ":*"which works correctly because rate limiter keys use onlycachedKey(without colon). However, this relies on an implementation detail. Adding an explicit:failed:namespace would make the separation clearer and prevent accidental collisions if new key types are added later.The silent error handling in
GetFailedJobs(lines 515–518) is intentional—it retrieves available job records despite transient Redis errors, which is reasonable behavior.🔧 Suggested refactoring (optional)
+func (q *Queue) failedKeyPrefix() string { + return q.cachedKey + ":failed:" +} + func (q *Queue) scanFailedJobKeys() ([]string, error) { - pattern := q.cachedKey + ":*" + pattern := q.failedKeyPrefix() + "*" @@ - jobId := strings.TrimPrefix(key, q.cachedKey+":") + jobId := strings.TrimPrefix(key, q.failedKeyPrefix()) @@ - key := q.cachedKey + ":" + jobId + key := q.failedKeyPrefix() + jobIdAlso applies to job.go line 129 if desired.
| failedQueue := queue.New("failed_jobs_test", &queue.Options{ | ||
| Connect: &redis.Options{ | ||
| Addr: "localhost:6379", | ||
| Password: "", | ||
| DB: 0, | ||
| }, | ||
| Workers: 3, | ||
| RetryFailures: 0, // No retries, so jobs fail immediately | ||
| }) | ||
|
|
There was a problem hiding this comment.
Gate Redis-backed tests when Redis isn’t available.
These tests hard-depend on a local Redis at localhost:6379, which will fail in many dev/CI environments. Add a connectivity check and skip if Redis is unreachable (or allow override via env var).
💡 Suggested pattern (apply to each test)
import (
+ "context"
"errors"
"testing"
"time"
@@
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"github.com/tinh-tinh/queue/v2"
)
+func requireRedis(t *testing.T, opt *redis.Options) {
+ t.Helper()
+ client := redis.NewClient(opt)
+ if err := client.Ping(context.Background()).Err(); err != nil {
+ t.Skipf("redis not available: %v", err)
+ }
+}
+
func Test_GetFailedJobs(t *testing.T) {
- failedQueue := queue.New("failed_jobs_test", &queue.Options{
- Connect: &redis.Options{
- Addr: "localhost:6379",
- Password: "",
- DB: 0,
- },
+ redisOpt := &redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ }
+ requireRedis(t, redisOpt)
+ failedQueue := queue.New("failed_jobs_test", &queue.Options{
+ Connect: redisOpt,
Workers: 3,
RetryFailures: 0, // No retries, so jobs fail immediately
})Also applies to: 75-83, 120-128
🤖 Prompt for AI Agents
In `@failed_jobs_test.go` around lines 14 - 23, Add a Redis connectivity gate
before creating the failedQueue in failed_jobs_test.go: attempt a quick
ping/connect to the same Redis address used in queue.New (localhost:6379) and if
it fails, call t.Skipf (or skip the test) unless an explicit env override (e.g.,
RUN_REDIS_TESTS=true) is set; apply the same pattern to the other places noted
(lines ~75-83 and ~120-128) so any code that constructs queues via queue.New is
guarded by the connectivity check and optional env override.
| // Wait a bit for jobs to be processed | ||
| time.Sleep(500 * time.Millisecond) |
There was a problem hiding this comment.
Avoid fixed sleeps; they’re prone to flakes.
Replace time.Sleep with an eventual assertion (poll until the expected state appears). This avoids timing sensitivity under load/slow CI.
♻️ Example replacement (apply similarly to other sleeps)
- // Wait a bit for jobs to be processed
- time.Sleep(500 * time.Millisecond)
-
- // Retrieve failed jobs
- failedJobs, err := failedQueue.GetFailedJobs()
- require.Nil(t, err)
- require.Equal(t, 3, len(failedJobs))
+ // Wait until failed jobs are visible
+ var failedJobs []queue.Job
+ var err error
+ require.Eventually(t, func() bool {
+ failedJobs, err = failedQueue.GetFailedJobs()
+ return err == nil && len(failedJobs) == 3
+ }, 5*time.Second, 50*time.Millisecond)
+ require.NoError(t, err)Also applies to: 101-103, 149-150
🤖 Prompt for AI Agents
In `@failed_jobs_test.go` around lines 49 - 50, Replace the fixed time.Sleep(500 *
time.Millisecond) calls in failed_jobs_test.go with an eventual/polling
assertion: repeatedly check the expected condition (e.g., call the same helper
that inspects job state such as listFailedJobs/getJobStatus or the DB/query used
elsewhere in this test) at short intervals until it becomes true or a reasonable
timeout elapses, failing the test on timeout; do this for the three sleep sites
(the Sleep at the shown diff and the other occurrences around lines noted
101-103 and 149-150) so tests wait deterministically for the expected state
instead of sleeping a fixed duration.
- Add GetFailedJobs() to retrieve all failed jobs from Redis - Add GetFailedJob(jobId) to get a specific failed job's error - Add ClearFailedJobs() to remove all failed job records - Add scanFailedJobKeys() helper to eliminate code duplication - Reuse existing Job struct instead of creating new FailedJobInfo - Add comprehensive unit tests in failed_jobs_test.go Failed jobs are stored in Redis after exhausting all retries. These methods allow listing, inspecting, and clearing failed jobs. Closes #68
0664f40 to
435cfa9
Compare
Failed jobs are stored in Redis after exhausting all retries. These methods allow listing, inspecting, and clearing failed jobs.
Closes #68