Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ toolchain go1.24.1
require (
github.com/go-redsync/redsync/v4 v4.15.0
github.com/redis/go-redis/v9 v9.17.2
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.9.0
github.com/tinh-tinh/tinhtinh/v2 v2.5.0
golang.org/x/crypto v0.45.0
Expand Down
24 changes: 8 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,33 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.14.0 h1:zyxzFJsmQHIPBl8iBT7KFKohWsjsghgGLiP8TnFMLNc=
github.com/go-redsync/redsync/v4 v4.14.0/go.mod h1:twMlVd19upZ/juvJyJGlQOSQxor1oeHtjs62l4pRFzo=
github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54=
github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s=
github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8=
github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y=
github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
github.com/redis/rueidis/rueidiscompat v1.0.64 h1:M8JbLP4LyHQhBLBRsUQIzui8/LyTtdESNIMVveqm4RY=
github.com/redis/rueidis/rueidiscompat v1.0.64/go.mod h1:8pJVPhEjpw0izZFSxYwDziUiEYEkEklTSw/nZzga61M=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4=
github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs=
github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/tinh-tinh/tinhtinh/v2 v2.3.4 h1:vxhaoPnp3pGNcdXKDG7nVai+V+lYoJHWtm7pzTNapJY=
github.com/tinh-tinh/tinhtinh/v2 v2.3.4/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g=
github.com/tinh-tinh/tinhtinh/v2 v2.5.0 h1:SqCanZJKKgbVsDwoaPe136fZGYoXSKZ6fLciGO0KsoY=
github.com/tinh-tinh/tinhtinh/v2 v2.5.0/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
10 changes: 3 additions & 7 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,10 @@ func (job *Job) HandlerError(reasonError string) {
job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures)
}

// IsReady returns true if the job is ready to be processed. If the job uses a
// scheduler, it will always be ready. Otherwise, the job is ready if it is
// waiting or active.
// IsReady returns true if the job is ready to be processed.
// Jobs are ready if they are waiting or active.
func (job *Job) IsReady() bool {
if job.queue.scheduler == nil {
return job.Status == WaitStatus || job.Status == ActiveStatus
}
return true
return job.Status == WaitStatus || job.Status == ActiveStatus
}
Comment on lines +112 to 116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find how IsReady is used in the codebase
rg -n -A5 -B2 'IsReady\(\)' job.go

# Also check the status constants to understand their meaning
echo "=== Job Status Constants ==="
rg -n 'Status.*=.*iota|WaitStatus|ActiveStatus|CompletedStatus|FailedStatus' job.go | head -20

Repository: tinh-tinh/queue

Length of output: 975


🏁 Script executed:

#!/bin/bash
# Find all calls to IsReady() across the codebase
rg -n 'IsReady\(\)' --type go -A3 -B3

# Also find any filtering or selection logic related to job status
echo "=== Job selection/filtering logic ==="
rg -n 'Status.*==' --type go -B2 -A2 | head -40

Repository: tinh-tinh/queue

Length of output: 2113


🏁 Script executed:

#!/bin/bash
# Get context around IsReady() usage in queue.go
sed -n '210,240p' queue.go

# Also check if there's any special handling for ActiveStatus
echo "=== Context around ActiveStatus handling ==="
sed -n '460,470p' queue.go

Repository: tinh-tinh/queue

Length of output: 1108


🏁 Script executed:

#!/bin/bash
# Get the full Run() function to understand execution flow
sed -n '210,280p' queue.go

Repository: tinh-tinh/queue

Length of output: 1551


🏁 Script executed:

#!/bin/bash
# Check the entire job.go file to understand state transitions
wc -l job.go
cat -n job.go

Repository: tinh-tinh/queue

Length of output: 4272


🏁 Script executed:

#!/bin/bash
# Check if there are any other places where job status is modified
rg -n '\.Status\s*=' --type go -B2 -A1

Repository: tinh-tinh/queue

Length of output: 2115


ActiveStatus should not be included in IsReady().

A job in ActiveStatus represents one currently being or previously being processed. Including it in the ready check violates job state semantics and could cause duplicate processing.

The job lifecycle should be: WaitStatusActiveStatus (during Process()) → CompletedStatus/FailedStatus/DelayedStatus (final states). Only WaitStatus jobs should be returned from IsReady(). Delayed jobs are handled separately in the Retry() method and should not be included here.

The comment "Jobs are ready if they are waiting or active" is misleading—only waiting jobs are ready for initial processing.

Suggested fix
 // IsReady returns true if the job is ready to be processed.
-// Jobs are ready if they are waiting or active.
+// Jobs are ready if they are waiting.
 func (job *Job) IsReady() bool {
-	return job.Status == WaitStatus || job.Status == ActiveStatus
+	return job.Status == WaitStatus
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// IsReady returns true if the job is ready to be processed.
// Jobs are ready if they are waiting or active.
func (job *Job) IsReady() bool {
if job.queue.scheduler == nil {
return job.Status == WaitStatus || job.Status == ActiveStatus
}
return true
return job.Status == WaitStatus || job.Status == ActiveStatus
}
// IsReady returns true if the job is ready to be processed.
// Jobs are ready if they are waiting.
func (job *Job) IsReady() bool {
return job.Status == WaitStatus
}
🤖 Prompt for AI Agents
In `@job.go` around lines 112 - 116, The IsReady method currently returns true for
both WaitStatus and ActiveStatus; remove ActiveStatus from the readiness check
so IsReady only returns true when job.Status == WaitStatus, and update the
comment to state that only waiting jobs are ready (delayed/retries handled
elsewhere); locate the IsReady function and adjust the boolean comparison to
reference only WaitStatus (keep references to WaitStatus and ActiveStatus in the
codebase for clarity but do not include ActiveStatus in IsReady).


// IsFinished returns true if the job has finished, either successfully or with an error.
Expand Down
116 changes: 116 additions & 0 deletions pattern_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package queue

import (
"fmt"
"strconv"
"strings"
"time"
)

// parsePattern parses a cron-like pattern and returns the polling interval.
// Supports two formats:
// 1. @every <duration> format (e.g., "@every 1s", "@every 5m")
// 2. Cron expressions (e.g., "*/5 * * * *", "0 */2 * * *")
// Returns an error if the pattern is invalid or unsupported.
func parsePattern(pattern string) (time.Duration, error) {
if pattern == "" {
return 0, fmt.Errorf("pattern cannot be empty")
}

// Trim whitespace
pattern = strings.TrimSpace(pattern)

// Check for @every prefix
if strings.HasPrefix(pattern, "@every ") {
return parseEveryPattern(pattern)
}

// Try to parse as cron expression
return parseCronPattern(pattern)
}

// parseEveryPattern parses @every <duration> format patterns.
func parseEveryPattern(pattern string) (time.Duration, error) {
// Extract the duration part after "@every "
durationStr := strings.TrimSpace(strings.TrimPrefix(pattern, "@every "))
if durationStr == "" {
return 0, fmt.Errorf("missing duration in pattern: %s", pattern)
}

// Parse the duration using time.ParseDuration
duration, err := time.ParseDuration(durationStr)
if err != nil {
return 0, fmt.Errorf("invalid duration '%s': %w", durationStr, err)
}

// Validate that duration is positive
if duration <= 0 {
return 0, fmt.Errorf("duration must be positive, got: %s", duration)
}

return duration, nil
}

// parseCronPattern parses cron expressions and calculates the polling interval.
// Supports standard 5-field cron format: minute hour day month weekday
// Examples:
// - "*/5 * * * *" → every 5 minutes
// - "0 * * * *" → every hour
// - "0 0 * * *" → every day (24 hours)
// - "0 0 * * 0" → every week (7 days)
func parseCronPattern(pattern string) (time.Duration, error) {
fields := strings.Fields(pattern)
if len(fields) != 5 {
return 0, fmt.Errorf("invalid cron expression: expected 5 fields, got %d in '%s'", len(fields), pattern)
}

minute, hour, day, month, weekday := fields[0], fields[1], fields[2], fields[3], fields[4]

// Parse minute field for */N patterns
if strings.HasPrefix(minute, "*/") {
intervalStr := strings.TrimPrefix(minute, "*/")
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return 0, fmt.Errorf("invalid minute interval '%s': %w", intervalStr, err)
}
if interval <= 0 || interval > 59 {
return 0, fmt.Errorf("minute interval must be between 1 and 59, got %d", interval)
}
return time.Duration(interval) * time.Minute, nil
}

// Parse hour field for */N patterns
if strings.HasPrefix(hour, "*/") {
intervalStr := strings.TrimPrefix(hour, "*/")
interval, err := strconv.Atoi(intervalStr)
if err != nil {
return 0, fmt.Errorf("invalid hour interval '%s': %w", intervalStr, err)
}
if interval <= 0 || interval > 23 {
return 0, fmt.Errorf("hour interval must be between 1 and 23, got %d", interval)
}
return time.Duration(interval) * time.Hour, nil
}

// Hourly: "0 * * * *" or "N * * * *"
if hour == "*" && day == "*" && month == "*" && weekday == "*" {
return 1 * time.Hour, nil
}

// Daily: "0 0 * * *" or "N N * * *"
if day == "*" && month == "*" && weekday == "*" {
return 24 * time.Hour, nil
}

// Weekly: "0 0 * * N" (specific weekday)
if day == "*" && month == "*" && weekday != "*" {
return 7 * 24 * time.Hour, nil
}

// Monthly: "0 0 N * *" (specific day of month)
if month == "*" && weekday == "*" && day != "*" {
return 30 * 24 * time.Hour, nil // Approximate as 30 days
}

return 0, fmt.Errorf("unsupported cron pattern: %s (consider using @every <duration> format)", pattern)
}
185 changes: 185 additions & 0 deletions pattern_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package queue_test

import (
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"github.com/tinh-tinh/queue/v2"
)

func Test_ParsePattern_Valid(t *testing.T) {
tests := []struct {
name string
pattern string
expected time.Duration
}{
// @every patterns
{
name: "1 second",
pattern: "@every 1s",
expected: 1 * time.Second,
},
{
name: "5 seconds",
pattern: "@every 5s",
expected: 5 * time.Second,
},
{
name: "1 minute",
pattern: "@every 1m",
expected: 1 * time.Minute,
},
{
name: "5 minutes",
pattern: "@every 5m",
expected: 5 * time.Minute,
},
{
name: "1 hour",
pattern: "@every 1h",
expected: 1 * time.Hour,
},
{
name: "complex duration",
pattern: "@every 1h30m45s",
expected: 1*time.Hour + 30*time.Minute + 45*time.Second,
},
{
name: "with extra spaces",
pattern: "@every 5s ",
expected: 5 * time.Second,
},
{
name: "milliseconds",
pattern: "@every 500ms",
expected: 500 * time.Millisecond,
},
// Cron patterns
{
name: "every 5 minutes (cron)",
pattern: "*/5 * * * *",
expected: 5 * time.Minute,
},
{
name: "every 15 minutes (cron)",
pattern: "*/15 * * * *",
expected: 15 * time.Minute,
},
{
name: "every 2 hours (cron)",
pattern: "0 */2 * * *",
expected: 2 * time.Hour,
},
{
name: "every 6 hours (cron)",
pattern: "0 */6 * * *",
expected: 6 * time.Hour,
},
{
name: "hourly (cron)",
pattern: "0 * * * *",
expected: 1 * time.Hour,
},
{
name: "daily (cron)",
pattern: "0 0 * * *",
expected: 24 * time.Hour,
},
{
name: "weekly (cron)",
pattern: "0 0 * * 0",
expected: 7 * 24 * time.Hour,
},
{
name: "monthly (cron)",
pattern: "0 0 1 * *",
expected: 30 * 24 * time.Hour,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test indirectly through Queue creation
q := queue.New("test_pattern_"+tt.name, &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 1,
RetryFailures: 0,
Pattern: tt.pattern,
// Don't set ScheduleInterval to force pattern parsing
})
require.NotNil(t, q)
})
}
}

func Test_ParsePattern_Invalid(t *testing.T) {
tests := []struct {
name string
pattern string
}{
{
name: "empty pattern",
pattern: "",
},
Comment on lines +126 to +129
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Empty pattern test doesn't exercise the fallback behavior.

Based on queue.go line 105, when Pattern is empty (""), the scheduler initialization is skipped entirely (if opt.Pattern != "" { ... }). This test case won't trigger the pattern parsing fallback—it simply won't start a scheduler at all.

Consider removing this case or documenting that empty pattern means "no scheduler" rather than "fallback to default."

🤖 Prompt for AI Agents
In `@pattern_parser_test.go` around lines 126 - 129, The "empty pattern" test case
is misleading because opt.Pattern == "" causes scheduler initialization to be
skipped (see queue.go check if opt.Pattern != ""), so either remove this test
case or change it to assert the "no scheduler started" behavior; locate the test
case named "empty pattern" in pattern_parser_test.go and either delete the table
entry or update its expected outcome to reflect that an empty Pattern means "no
scheduler" (or add a comment clarifying that behavior), and ensure any
assertions reference the scheduler initialization logic (e.g., opt.Pattern and
the scheduler start path) rather than expecting fallback parsing.

{
name: "missing duration",
pattern: "@every ",
},
{
name: "invalid duration",
pattern: "@every abc",
},
{
name: "negative duration",
pattern: "@every -5s",
},
{
name: "invalid cron - too few fields",
pattern: "*/5 * *",
},
{
name: "invalid cron - too many fields",
pattern: "*/5 * * * * *",
},
{
name: "invalid cron - bad minute interval",
pattern: "*/abc * * * *",
},
{
name: "invalid cron - minute out of range",
pattern: "*/60 * * * *",
},
{
name: "invalid cron - hour out of range",
pattern: "0 */24 * * *",
},
{
name: "unsupported cron pattern",
pattern: "5,10,15 * * * *",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// These should fall back to default 5s interval with a warning log
q := queue.New("test_invalid_"+tt.name, &queue.Options{
Connect: &redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
},
Workers: 1,
RetryFailures: 0,
Pattern: tt.pattern,
DisableLog: true, // Disable logs to avoid clutter in tests
})
require.NotNil(t, q)
})
}
}
Loading