Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions failed_jobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package queue_test

import (
"errors"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"github.com/tinh-tinh/queue/v2"
)

func Test_GetFailedJobs(t *testing.T) {
failedQueue := queue.New("failed_jobs_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0, // No retries, so jobs fail immediately
})

Comment on lines +14 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Gate Redis-backed tests when Redis isn’t available.

These tests hard-depend on a local Redis at localhost:6379, which will fail in many dev/CI environments. Add a connectivity check and skip if Redis is unreachable (or allow override via env var).

💡 Suggested pattern (apply to each test)
 import (
+	"context"
 	"errors"
 	"testing"
 	"time"
@@
 	"github.com/redis/go-redis/v9"
 	"github.com/stretchr/testify/require"
 	"github.com/tinh-tinh/queue/v2"
 )
 
+func requireRedis(t *testing.T, opt *redis.Options) {
+	t.Helper()
+	client := redis.NewClient(opt)
+	if err := client.Ping(context.Background()).Err(); err != nil {
+		t.Skipf("redis not available: %v", err)
+	}
+}
+
 func Test_GetFailedJobs(t *testing.T) {
-	failedQueue := queue.New("failed_jobs_test", &queue.Options{
-		Connect: &redis.Options{
-			Addr:     "localhost:6379",
-			Password: "",
-			DB:       0,
-		},
+	redisOpt := &redis.Options{
+		Addr:     "localhost:6379",
+		Password: "",
+		DB:       0,
+	}
+	requireRedis(t, redisOpt)
+	failedQueue := queue.New("failed_jobs_test", &queue.Options{
+		Connect: redisOpt,
 		Workers:       3,
 		RetryFailures: 0, // No retries, so jobs fail immediately
 	})

Also applies to: 75-83, 120-128

🤖 Prompt for AI Agents
In `@failed_jobs_test.go` around lines 14 - 23, Add a Redis connectivity gate
before creating the failedQueue in failed_jobs_test.go: attempt a quick
ping/connect to the same Redis address used in queue.New (localhost:6379) and if
it fails, call t.Skipf (or skip the test) unless an explicit env override (e.g.,
RUN_REDIS_TESTS=true) is set; apply the same pattern to the other places noted
(lines ~75-83 and ~120-128) so any code that constructs queues via queue.New is
guarded by the connectivity check and optional env override.

// Clear any existing failed jobs first
err := failedQueue.ClearFailedJobs()
require.Nil(t, err)

failedQueue.Process(func(job *queue.Job) {
job.Process(func() error {
// All jobs will fail
return errors.New("intentional failure for test")
})
})

// Add multiple jobs that will fail
failedQueue.AddJob(queue.AddJobOptions{
Id: "fail1",
Data: "value 1",
})
failedQueue.AddJob(queue.AddJobOptions{
Id: "fail2",
Data: "value 2",
})
failedQueue.AddJob(queue.AddJobOptions{
Id: "fail3",
Data: "value 3",
})

// Wait a bit for jobs to be processed
time.Sleep(500 * time.Millisecond)
Comment on lines +49 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid fixed sleeps; they’re prone to flakes.

Replace time.Sleep with an eventual assertion (poll until the expected state appears). This avoids timing sensitivity under load/slow CI.

♻️ Example replacement (apply similarly to other sleeps)
-	// Wait a bit for jobs to be processed
-	time.Sleep(500 * time.Millisecond)
-
-	// Retrieve failed jobs
-	failedJobs, err := failedQueue.GetFailedJobs()
-	require.Nil(t, err)
-	require.Equal(t, 3, len(failedJobs))
+	// Wait until failed jobs are visible
+	var failedJobs []queue.Job
+	var err error
+	require.Eventually(t, func() bool {
+		failedJobs, err = failedQueue.GetFailedJobs()
+		return err == nil && len(failedJobs) == 3
+	}, 5*time.Second, 50*time.Millisecond)
+	require.NoError(t, err)

Also applies to: 101-103, 149-150

🤖 Prompt for AI Agents
In `@failed_jobs_test.go` around lines 49 - 50, Replace the fixed time.Sleep(500 *
time.Millisecond) calls in failed_jobs_test.go with an eventual/polling
assertion: repeatedly check the expected condition (e.g., call the same helper
that inspects job state such as listFailedJobs/getJobStatus or the DB/query used
elsewhere in this test) at short intervals until it becomes true or a reasonable
timeout elapses, failing the test on timeout; do this for the three sleep sites
(the Sleep at the shown diff and the other occurrences around lines noted
101-103 and 149-150) so tests wait deterministically for the expected state
instead of sleeping a fixed duration.


// Retrieve failed jobs
failedJobs, err := failedQueue.GetFailedJobs()
require.Nil(t, err)
require.Equal(t, 3, len(failedJobs))

// Verify job IDs are present
jobIds := make(map[string]bool)
for _, job := range failedJobs {
jobIds[job.Id] = true
require.NotEmpty(t, job.FailedReason)
require.Contains(t, job.FailedReason, "intentional failure for test")
require.Equal(t, queue.FailedStatus, job.Status)
}
require.True(t, jobIds["fail1"])
require.True(t, jobIds["fail2"])
require.True(t, jobIds["fail3"])

// Clean up
err = failedQueue.ClearFailedJobs()
require.Nil(t, err)
}

func Test_GetFailedJob(t *testing.T) {
singleFailQueue := queue.New("single_fail_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0,
})

// Clear any existing failed jobs first
err := singleFailQueue.ClearFailedJobs()
require.Nil(t, err)

singleFailQueue.Process(func(job *queue.Job) {
job.Process(func() error {
return errors.New("specific error for job " + job.Id)
})
})

// Add a job that will fail
singleFailQueue.AddJob(queue.AddJobOptions{
Id: "specific_fail",
Data: "test data",
})

// Wait for job to be processed
time.Sleep(500 * time.Millisecond)

// Retrieve the specific failed job
reason, err := singleFailQueue.GetFailedJob("specific_fail")
require.Nil(t, err)
require.Contains(t, reason, "specific error for job specific_fail")

// Try to get a non-existent failed job
_, err = singleFailQueue.GetFailedJob("non_existent")
require.NotNil(t, err)
require.Contains(t, err.Error(), "not found")

// Clean up
err = singleFailQueue.ClearFailedJobs()
require.Nil(t, err)
}

func Test_ClearFailedJobs(t *testing.T) {
clearQueue := queue.New("clear_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0,
})

// Clear any existing failed jobs first
err := clearQueue.ClearFailedJobs()
require.Nil(t, err)

clearQueue.Process(func(job *queue.Job) {
job.Process(func() error {
return errors.New("error for clearing test")
})
})

// Add multiple jobs that will fail
clearQueue.BulkAddJob([]queue.AddJobOptions{
{Id: "clear1", Data: "value 1"},
{Id: "clear2", Data: "value 2"},
{Id: "clear3", Data: "value 3"},
{Id: "clear4", Data: "value 4"},
{Id: "clear5", Data: "value 5"},
})

// Wait for jobs to be processed
time.Sleep(500 * time.Millisecond)

// Verify failed jobs exist
failedJobs, err := clearQueue.GetFailedJobs()
require.Nil(t, err)
require.Equal(t, 5, len(failedJobs))

// Clear all failed jobs
err = clearQueue.ClearFailedJobs()
require.Nil(t, err)

// Verify all failed jobs are cleared
failedJobs, err = clearQueue.GetFailedJobs()
require.Nil(t, err)
require.Equal(t, 0, len(failedJobs))

// Clearing again should not cause an error
err = clearQueue.ClearFailedJobs()
require.Nil(t, err)
}

func Test_GetFailedJobs_RedisError(t *testing.T) {
// Create a queue with invalid Redis connection
invalidQueue := queue.New("redis_error_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:9999", // Invalid port
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0,
})

// Attempt to get failed jobs should return an error
// This tests that SCAN errors are propagated
_, err := invalidQueue.GetFailedJobs()
require.NotNil(t, err)
require.Contains(t, err.Error(), "failed to scan Redis keys")
}

func Test_ClearFailedJobs_RedisError(t *testing.T) {
// Create a queue with invalid Redis connection
invalidQueue := queue.New("redis_clear_error_test", &queue.Options{
Connect: &redis.Options{
Addr: "localhost:9999", // Invalid port
Password: "",
DB: 0,
},
Workers: 3,
RetryFailures: 0,
})

// Attempt to clear failed jobs should return an error
// This tests that SCAN errors are propagated
err := invalidQueue.ClearFailedJobs()
require.NotNil(t, err)
require.Contains(t, err.Error(), "failed to scan Redis keys")
}
93 changes: 92 additions & 1 deletion queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type RateLimiter struct {
Max int
Duration time.Duration
}

type Options struct {
Connect *redis.Options
Workers int
Expand Down Expand Up @@ -476,3 +475,95 @@ func (q *Queue) log(logType LoggerType, format string, v ...any) {
func (q *Queue) getKey() string {
return q.cachedKey
}

// scanFailedJobKeys scans Redis for all keys matching the failed job pattern.
// Returns a slice of all matching keys or an error if the scan fails.
func (q *Queue) scanFailedJobKeys() ([]string, error) {
pattern := q.cachedKey + ":*"
var allKeys []string

var cursor uint64
for {
keys, nextCursor, err := q.client.Scan(q.ctx, cursor, pattern, 100).Result()
if err != nil {
return nil, fmt.Errorf("failed to scan Redis keys: %w", err)
}

allKeys = append(allKeys, keys...)

cursor = nextCursor
if cursor == 0 {
break
}
}

return allKeys, nil
}

// GetFailedJobs retrieves all failed jobs stored in Redis for this queue.
// It returns a slice of Job with Id, FailedReason, and Status populated.
// Other fields (Data, Priority, etc.) are not available as only the failure
// reason is stored in Redis. Returns an error if the Redis operation fails.
func (q *Queue) GetFailedJobs() ([]Job, error) {
keys, err := q.scanFailedJobKeys()
if err != nil {
return nil, err
}

var failedJobs []Job
for _, key := range keys {
reason, err := q.client.Get(q.ctx, key).Result()
if err != nil {
// Only skip missing keys; propagate all other errors (network, timeout, etc.)
if err == redis.Nil {
continue
}
return nil, fmt.Errorf("failed to retrieve job data: %w", err)
}

// Extract job ID from key (format: {prefix}{queueName}:{jobId})
jobId := strings.TrimPrefix(key, q.cachedKey+":")
failedJobs = append(failedJobs, Job{
Id: jobId,
FailedReason: reason,
Status: FailedStatus,
queue: q,
})
}

return failedJobs, nil
}

// GetFailedJob retrieves the failure reason for a specific job by its ID.
// Returns the failure reason string or an error if the job is not found
// or if the Redis operation fails.
func (q *Queue) GetFailedJob(jobId string) (string, error) {
key := q.cachedKey + ":" + jobId
reason, err := q.client.Get(q.ctx, key).Result()
if err != nil {
if err == redis.Nil {
return "", fmt.Errorf("failed job with ID '%s' not found", jobId)
}
return "", fmt.Errorf("failed to retrieve job: %w", err)
}
return reason, nil
}

// ClearFailedJobs removes all failed job records from Redis for this queue.
// Returns an error if the Redis operation fails.
func (q *Queue) ClearFailedJobs() error {
keysToDelete, err := q.scanFailedJobKeys()
if err != nil {
return err
}

if len(keysToDelete) > 0 {
_, err := q.client.Del(q.ctx, keysToDelete...).Result()
if err != nil {
return fmt.Errorf("failed to delete keys: %w", err)
}
q.formatLog(LoggerInfo, "Cleared %d failed job(s)", len(keysToDelete))
}

return nil
}