diff --git a/go.mod b/go.mod index 731d416..a7b5397 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ toolchain go1.24.1 require ( github.com/go-redsync/redsync/v4 v4.15.0 github.com/redis/go-redis/v9 v9.17.2 - github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.9.0 github.com/tinh-tinh/tinhtinh/v2 v2.5.0 golang.org/x/crypto v0.45.0 diff --git a/go.sum b/go.sum index 8304535..472140a 100644 --- a/go.sum +++ b/go.sum @@ -14,12 +14,10 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= -github.com/go-redsync/redsync/v4 v4.14.0 h1:zyxzFJsmQHIPBl8iBT7KFKohWsjsghgGLiP8TnFMLNc= -github.com/go-redsync/redsync/v4 v4.14.0/go.mod h1:twMlVd19upZ/juvJyJGlQOSQxor1oeHtjs62l4pRFzo= github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54= github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s= -github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= -github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= +github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8= +github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -27,28 +25,22 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ= -github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI= github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= -github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y= -github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= -github.com/redis/rueidis/rueidiscompat v1.0.64 h1:M8JbLP4LyHQhBLBRsUQIzui8/LyTtdESNIMVveqm4RY= -github.com/redis/rueidis/rueidiscompat v1.0.64/go.mod h1:8pJVPhEjpw0izZFSxYwDziUiEYEkEklTSw/nZzga61M= -github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= -github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4= +github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= +github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs= +github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= -github.com/tinh-tinh/tinhtinh/v2 v2.3.4 h1:vxhaoPnp3pGNcdXKDG7nVai+V+lYoJHWtm7pzTNapJY= -github.com/tinh-tinh/tinhtinh/v2 v2.3.4/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g= github.com/tinh-tinh/tinhtinh/v2 v2.5.0 h1:SqCanZJKKgbVsDwoaPe136fZGYoXSKZ6fLciGO0KsoY= github.com/tinh-tinh/tinhtinh/v2 v2.5.0/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/job.go b/job.go index 0b06d88..bad662d 100644 --- a/job.go +++ b/job.go @@ -109,14 +109,10 @@ func (job *Job) HandlerError(reasonError string) { job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures) } -// IsReady returns true if the job is ready to be processed. If the job uses a -// scheduler, it will always be ready. Otherwise, the job is ready if it is -// waiting or active. +// IsReady returns true if the job is ready to be processed. +// Jobs are ready if they are waiting or active. func (job *Job) IsReady() bool { - if job.queue.scheduler == nil { - return job.Status == WaitStatus || job.Status == ActiveStatus - } - return true + return job.Status == WaitStatus || job.Status == ActiveStatus } // IsFinished returns true if the job has finished, either successfully or with an error. diff --git a/pattern_parser.go b/pattern_parser.go new file mode 100644 index 0000000..389a106 --- /dev/null +++ b/pattern_parser.go @@ -0,0 +1,116 @@ +package queue + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +// parsePattern parses a cron-like pattern and returns the polling interval. +// Supports two formats: +// 1. @every format (e.g., "@every 1s", "@every 5m") +// 2. Cron expressions (e.g., "*/5 * * * *", "0 */2 * * *") +// Returns an error if the pattern is invalid or unsupported. +func parsePattern(pattern string) (time.Duration, error) { + if pattern == "" { + return 0, fmt.Errorf("pattern cannot be empty") + } + + // Trim whitespace + pattern = strings.TrimSpace(pattern) + + // Check for @every prefix + if strings.HasPrefix(pattern, "@every ") { + return parseEveryPattern(pattern) + } + + // Try to parse as cron expression + return parseCronPattern(pattern) +} + +// parseEveryPattern parses @every format patterns. +func parseEveryPattern(pattern string) (time.Duration, error) { + // Extract the duration part after "@every " + durationStr := strings.TrimSpace(strings.TrimPrefix(pattern, "@every ")) + if durationStr == "" { + return 0, fmt.Errorf("missing duration in pattern: %s", pattern) + } + + // Parse the duration using time.ParseDuration + duration, err := time.ParseDuration(durationStr) + if err != nil { + return 0, fmt.Errorf("invalid duration '%s': %w", durationStr, err) + } + + // Validate that duration is positive + if duration <= 0 { + return 0, fmt.Errorf("duration must be positive, got: %s", duration) + } + + return duration, nil +} + +// parseCronPattern parses cron expressions and calculates the polling interval. +// Supports standard 5-field cron format: minute hour day month weekday +// Examples: +// - "*/5 * * * *" → every 5 minutes +// - "0 * * * *" → every hour +// - "0 0 * * *" → every day (24 hours) +// - "0 0 * * 0" → every week (7 days) +func parseCronPattern(pattern string) (time.Duration, error) { + fields := strings.Fields(pattern) + if len(fields) != 5 { + return 0, fmt.Errorf("invalid cron expression: expected 5 fields, got %d in '%s'", len(fields), pattern) + } + + minute, hour, day, month, weekday := fields[0], fields[1], fields[2], fields[3], fields[4] + + // Parse minute field for */N patterns + if strings.HasPrefix(minute, "*/") { + intervalStr := strings.TrimPrefix(minute, "*/") + interval, err := strconv.Atoi(intervalStr) + if err != nil { + return 0, fmt.Errorf("invalid minute interval '%s': %w", intervalStr, err) + } + if interval <= 0 || interval > 59 { + return 0, fmt.Errorf("minute interval must be between 1 and 59, got %d", interval) + } + return time.Duration(interval) * time.Minute, nil + } + + // Parse hour field for */N patterns + if strings.HasPrefix(hour, "*/") { + intervalStr := strings.TrimPrefix(hour, "*/") + interval, err := strconv.Atoi(intervalStr) + if err != nil { + return 0, fmt.Errorf("invalid hour interval '%s': %w", intervalStr, err) + } + if interval <= 0 || interval > 23 { + return 0, fmt.Errorf("hour interval must be between 1 and 23, got %d", interval) + } + return time.Duration(interval) * time.Hour, nil + } + + // Hourly: "0 * * * *" or "N * * * *" + if hour == "*" && day == "*" && month == "*" && weekday == "*" { + return 1 * time.Hour, nil + } + + // Daily: "0 0 * * *" or "N N * * *" + if day == "*" && month == "*" && weekday == "*" { + return 24 * time.Hour, nil + } + + // Weekly: "0 0 * * N" (specific weekday) + if day == "*" && month == "*" && weekday != "*" { + return 7 * 24 * time.Hour, nil + } + + // Monthly: "0 0 N * *" (specific day of month) + if month == "*" && weekday == "*" && day != "*" { + return 30 * 24 * time.Hour, nil // Approximate as 30 days + } + + return 0, fmt.Errorf("unsupported cron pattern: %s (consider using @every format)", pattern) +} diff --git a/pattern_parser_test.go b/pattern_parser_test.go new file mode 100644 index 0000000..a1c74c7 --- /dev/null +++ b/pattern_parser_test.go @@ -0,0 +1,185 @@ +package queue_test + +import ( + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/tinh-tinh/queue/v2" +) + +func Test_ParsePattern_Valid(t *testing.T) { + tests := []struct { + name string + pattern string + expected time.Duration + }{ + // @every patterns + { + name: "1 second", + pattern: "@every 1s", + expected: 1 * time.Second, + }, + { + name: "5 seconds", + pattern: "@every 5s", + expected: 5 * time.Second, + }, + { + name: "1 minute", + pattern: "@every 1m", + expected: 1 * time.Minute, + }, + { + name: "5 minutes", + pattern: "@every 5m", + expected: 5 * time.Minute, + }, + { + name: "1 hour", + pattern: "@every 1h", + expected: 1 * time.Hour, + }, + { + name: "complex duration", + pattern: "@every 1h30m45s", + expected: 1*time.Hour + 30*time.Minute + 45*time.Second, + }, + { + name: "with extra spaces", + pattern: "@every 5s ", + expected: 5 * time.Second, + }, + { + name: "milliseconds", + pattern: "@every 500ms", + expected: 500 * time.Millisecond, + }, + // Cron patterns + { + name: "every 5 minutes (cron)", + pattern: "*/5 * * * *", + expected: 5 * time.Minute, + }, + { + name: "every 15 minutes (cron)", + pattern: "*/15 * * * *", + expected: 15 * time.Minute, + }, + { + name: "every 2 hours (cron)", + pattern: "0 */2 * * *", + expected: 2 * time.Hour, + }, + { + name: "every 6 hours (cron)", + pattern: "0 */6 * * *", + expected: 6 * time.Hour, + }, + { + name: "hourly (cron)", + pattern: "0 * * * *", + expected: 1 * time.Hour, + }, + { + name: "daily (cron)", + pattern: "0 0 * * *", + expected: 24 * time.Hour, + }, + { + name: "weekly (cron)", + pattern: "0 0 * * 0", + expected: 7 * 24 * time.Hour, + }, + { + name: "monthly (cron)", + pattern: "0 0 1 * *", + expected: 30 * 24 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test indirectly through Queue creation + q := queue.New("test_pattern_"+tt.name, &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: tt.pattern, + // Don't set ScheduleInterval to force pattern parsing + }) + require.NotNil(t, q) + }) + } +} + +func Test_ParsePattern_Invalid(t *testing.T) { + tests := []struct { + name string + pattern string + }{ + { + name: "empty pattern", + pattern: "", + }, + { + name: "missing duration", + pattern: "@every ", + }, + { + name: "invalid duration", + pattern: "@every abc", + }, + { + name: "negative duration", + pattern: "@every -5s", + }, + { + name: "invalid cron - too few fields", + pattern: "*/5 * *", + }, + { + name: "invalid cron - too many fields", + pattern: "*/5 * * * * *", + }, + { + name: "invalid cron - bad minute interval", + pattern: "*/abc * * * *", + }, + { + name: "invalid cron - minute out of range", + pattern: "*/60 * * * *", + }, + { + name: "invalid cron - hour out of range", + pattern: "0 */24 * * *", + }, + { + name: "unsupported cron pattern", + pattern: "5,10,15 * * * *", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // These should fall back to default 5s interval with a warning log + q := queue.New("test_invalid_"+tt.name, &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: tt.pattern, + DisableLog: true, // Disable logs to avoid clutter in tests + }) + require.NotNil(t, q) + }) + } +} diff --git a/queue.go b/queue.go index 886e939..c332514 100644 --- a/queue.go +++ b/queue.go @@ -13,7 +13,6 @@ import ( "github.com/go-redsync/redsync/v4" "github.com/go-redsync/redsync/v4/redis/goredis/v9" "github.com/redis/go-redis/v9" - "github.com/robfig/cron/v3" "github.com/tinh-tinh/tinhtinh/v2/common" "github.com/tinh-tinh/tinhtinh/v2/common/logger" ) @@ -21,18 +20,20 @@ import ( type JobFnc func(job *Job) type Queue struct { - Name string - client *redis.Client - mutex *redsync.Mutex - jobFnc JobFnc - jobs []Job - ctx context.Context - scheduler *cron.Cron - cronPattern string - running bool - config Options - Logger Logger - cachedKey string // Cache the computed key to avoid repeated string operations + Name string + client *redis.Client + mutex *redsync.Mutex + jobFnc JobFnc + jobs []Job + ctx context.Context + schedulerTicker *time.Ticker + schedulerDone chan struct{} + schedulerRunning bool // Track if scheduler is currently running + schedulerKey string + running bool + config Options + Logger Logger + cachedKey string // Cache the computed key to avoid repeated string operations } type RateLimiter struct { @@ -45,6 +46,7 @@ type Options struct { RetryFailures int Limiter *RateLimiter Pattern string + ScheduleInterval time.Duration // Polling interval for distributed scheduler (default: 5s) Logger Logger DisableLog bool RemoveOnComplete bool @@ -84,15 +86,9 @@ func New(name string, opt *Options) *Queue { } if opt.Logger == nil { - queue.config.Logger = logger.Create(logger.Options{}) - } - - if opt.Pattern != "" { - queue.scheduler = cron.New() - queue.cronPattern = opt.Pattern - } - if opt.Timeout == 0 { - queue.config.Timeout = 1 * time.Minute + queue.config.Logger = logger.Create(logger.Options{ + Console: !opt.DisableLog, + }) } // Pre-compute and cache the key @@ -102,6 +98,30 @@ func New(name string, opt *Options) *Queue { queue.cachedKey = strings.ToLower(name) } + // Initialize scheduler key for distributed scheduling + queue.schedulerKey = queue.cachedKey + ":scheduled" + + // Start distributed scheduler if Pattern is configured + if opt.Pattern != "" { + interval := opt.ScheduleInterval + if interval == 0 { + // Try to parse the pattern to get interval + parsedInterval, err := parsePattern(opt.Pattern) + if err != nil { + // Log warning and fall back to default + queue.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", opt.Pattern, err) + interval = 5 * time.Second + } else { + interval = parsedInterval + } + } + queue.startScheduler(interval) + } + + if opt.Timeout == 0 { + queue.config.Timeout = 1 * time.Minute + } + return queue } @@ -185,18 +205,10 @@ func mergeSortedJobs(jobs1, jobs2 []Job) []Job { return result } -// Process sets the callback for the queue to process jobs. If the queue has a -// scheduler, it will be started with the given cron pattern. Otherwise, the -// callback is simply stored. +// Process sets the callback for the queue to process jobs. +// The distributed scheduler (if configured) is already running from New(). func (q *Queue) Process(jobFnc JobFnc) { q.jobFnc = jobFnc - if q.scheduler != nil { - _, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() }) - if err != nil { - q.formatLog(LoggerError, "failed to add job: %v", err) - } - q.scheduler.Start() - } } // Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs @@ -416,15 +428,32 @@ func (q *Queue) IsLimit() bool { // Pause stops the queue from running. When paused, the queue will not accept new // jobs and will not run any jobs in the queue. It will resume when Resume is -// called. +// called. The scheduler is also stopped if active. func (q *Queue) Pause() { q.running = false + q.stopScheduler() } // Resume resumes the queue from a paused state. When resumed, the queue will -// accept new jobs and run any jobs in the queue. +// accept new jobs and run any jobs in the queue. The scheduler is also restarted +// if it was previously configured. func (q *Queue) Resume() { q.running = true + if q.config.Pattern != "" { + interval := q.config.ScheduleInterval + if interval == 0 { + // Try to parse the pattern to get interval + parsedInterval, err := parsePattern(q.config.Pattern) + if err != nil { + // Log warning and fall back to default + q.formatLog(LoggerWarn, "Failed to parse pattern '%s': %v, using default 5s interval", q.config.Pattern, err) + interval = 5 * time.Second + } else { + interval = parsedInterval + } + } + q.startScheduler(interval) + } q.Run() } diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..0a387a9 --- /dev/null +++ b/scheduler.go @@ -0,0 +1,162 @@ +package queue + +import ( + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// startScheduler starts the background scheduler loop that checks for scheduled jobs. +// It polls Redis at the specified interval to find jobs ready to run. +// If a scheduler is already running, this function returns early to prevent goroutine leaks. +func (q *Queue) startScheduler(interval time.Duration) { + if interval == 0 { + interval = 5 * time.Second // Default polling interval + } + + // Prevent starting scheduler if already running + if q.schedulerRunning { + return + } + + // Create new ticker and done channel + q.schedulerTicker = time.NewTicker(interval) + q.schedulerDone = make(chan struct{}) + q.schedulerRunning = true + + go func() { + ticker := q.schedulerTicker + doneChan := q.schedulerDone + for { + select { + case <-ticker.C: + q.processScheduledJobs() + case <-doneChan: + return + } + } + }() + + q.formatLog(LoggerInfo, "Scheduler started with %v interval", interval) +} + +// stopScheduler stops the scheduler gracefully. +func (q *Queue) stopScheduler() { + if !q.schedulerRunning { + return + } + + if q.schedulerDone != nil { + close(q.schedulerDone) + // Small delay to allow goroutine to exit + time.Sleep(10 * time.Millisecond) + } + if q.schedulerTicker != nil { + q.schedulerTicker.Stop() + q.schedulerTicker = nil + } + q.schedulerDone = nil + q.schedulerRunning = false + q.formatLog(LoggerInfo, "Scheduler stopped") +} + +// ScheduleJob adds a job to the scheduled set with the given run time. +// The job will be executed when the current time reaches or exceeds runAt. +func (q *Queue) ScheduleJob(jobId string, runAt time.Time) error { + score := float64(runAt.Unix()) + _, err := q.client.ZAdd(q.ctx, q.schedulerKey, redis.Z{ + Score: score, + Member: jobId, + }).Result() + if err != nil { + return fmt.Errorf("failed to schedule job: %w", err) + } + q.formatLog(LoggerInfo, "Scheduled job %s to run at %s", jobId, runAt.Format(time.RFC3339)) + return nil +} + +// GetScheduledJobs retrieves all scheduled jobs with their scheduled times. +func (q *Queue) GetScheduledJobs() ([]ScheduledJobInfo, error) { + // Get all jobs with scores + results, err := q.client.ZRangeWithScores(q.ctx, q.schedulerKey, 0, -1).Result() + if err != nil { + return nil, fmt.Errorf("failed to get scheduled jobs: %w", err) + } + + scheduledJobs := make([]ScheduledJobInfo, 0, len(results)) + for _, z := range results { + jobId, ok := z.Member.(string) + if !ok { + continue + } + scheduledJobs = append(scheduledJobs, ScheduledJobInfo{ + JobId: jobId, + RunAt: time.Unix(int64(z.Score), 0), + Timestamp: int64(z.Score), + }) + } + + return scheduledJobs, nil +} + +// RemoveScheduledJob removes a job from the scheduled set. +func (q *Queue) RemoveScheduledJob(jobId string) error { + _, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result() + if err != nil { + return fmt.Errorf("failed to remove scheduled job: %w", err) + } + q.formatLog(LoggerInfo, "Removed scheduled job %s", jobId) + return nil +} + +// processScheduledJobs checks for jobs ready to run and moves them to the waiting list. +// This method is called periodically by the scheduler loop. +func (q *Queue) processScheduledJobs() { + now := float64(time.Now().Unix()) + + // Find all jobs with score <= current timestamp + results, err := q.client.ZRangeByScoreWithScores(q.ctx, q.schedulerKey, &redis.ZRangeBy{ + Min: "-inf", + Max: fmt.Sprintf("%f", now), + }).Result() + + if err != nil { + q.formatLog(LoggerError, "Failed to get ready scheduled jobs: %v", err) + return + } + + if len(results) == 0 { + return + } + + // Process each ready job + for _, z := range results { + jobId, ok := z.Member.(string) + if !ok { + continue + } + + // Atomically remove from scheduled set (only one instance will succeed) + removed, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result() + if err != nil || removed == 0 { + // Another instance already processed this job + continue + } + + // Add job to the queue + q.AddJob(AddJobOptions{ + Id: jobId, + Data: nil, // Scheduled jobs don't have data in this implementation + }) + + q.formatLog(LoggerInfo, "Moved scheduled job %s to waiting list", jobId) + } +} + +// ScheduledJobInfo contains information about a scheduled job. +type ScheduledJobInfo struct { + JobId string + RunAt time.Time + Timestamp int64 +} diff --git a/scheduler_leak_test.go b/scheduler_leak_test.go new file mode 100644 index 0000000..3035544 --- /dev/null +++ b/scheduler_leak_test.go @@ -0,0 +1,167 @@ +package queue_test + +import ( + "runtime" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/tinh-tinh/queue/v2" +) + +// Test_NoGoroutineLeakOnMultipleResume verifies that calling Resume multiple times +// does not leak goroutines by ensuring the old scheduler is stopped before starting a new one. +func Test_NoGoroutineLeakOnMultipleResume(t *testing.T) { + q := queue.New("goroutine_leak_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Get initial goroutine count + runtime.GC() + time.Sleep(100 * time.Millisecond) + initialGoroutines := runtime.NumGoroutine() + + // Pause and resume multiple times + for i := 0; i < 10; i++ { + q.Pause() + time.Sleep(50 * time.Millisecond) + q.Resume() + time.Sleep(50 * time.Millisecond) + } + + // Final pause to stop scheduler + q.Pause() + + // Allow time for goroutines to clean up + runtime.GC() + time.Sleep(200 * time.Millisecond) + + finalGoroutines := runtime.NumGoroutine() + + // The number of goroutines should not have increased significantly + // Allow for some variance (±2) due to runtime behavior + goroutineDiff := finalGoroutines - initialGoroutines + require.LessOrEqual(t, goroutineDiff, 2, + "Goroutine leak detected: initial=%d, final=%d, diff=%d", + initialGoroutines, finalGoroutines, goroutineDiff) +} + +// Test_SchedulerRestartOnResume verifies that the scheduler properly restarts +// with the correct interval when Resume is called. +func Test_SchedulerRestartOnResume(t *testing.T) { + q := queue.New("scheduler_restart_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Schedule a job + runAt := time.Now().Add(2 * time.Second) + err := q.ScheduleJob("test_job", runAt) + require.Nil(t, err) + + // Pause and resume + q.Pause() + time.Sleep(500 * time.Millisecond) + q.Resume() + + // Verify scheduler is working by checking if job gets processed + processedJobs := make(map[string]bool) + q.Process(func(job *queue.Job) { + job.Process(func() error { + processedJobs[job.Id] = true + return nil + }) + }) + + // Wait for job to be processed + time.Sleep(3 * time.Second) + + // Verify job was processed + require.True(t, processedJobs["test_job"], "Job should have been processed after resume") +} + +// Test_MultiplePauseCalls verifies that calling Pause multiple times doesn't panic. +func Test_MultiplePauseCalls(t *testing.T) { + q := queue.New("multiple_pause_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: "@every 1s", + }) + + // Call Pause multiple times - should not panic + q.Pause() + q.Pause() + q.Pause() + + // Verify no panic occurred + require.True(t, true, "Multiple Pause calls should not panic") +} + +// Test_MultipleResumeCalls verifies that calling Resume multiple times doesn't +// start duplicate scheduler goroutines. +func Test_MultipleResumeCalls(t *testing.T) { + q := queue.New("multiple_resume_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 1, + RetryFailures: 0, + Pattern: "@every 1s", + }) + + // Get initial goroutine count + runtime.GC() + time.Sleep(100 * time.Millisecond) + initialGoroutines := runtime.NumGoroutine() + + // Pause once + q.Pause() + time.Sleep(50 * time.Millisecond) + + // Call Resume multiple times without Pause in between + q.Resume() + time.Sleep(50 * time.Millisecond) + q.Resume() // Should be a no-op due to schedulerRunning flag + time.Sleep(50 * time.Millisecond) + q.Resume() // Should be a no-op due to schedulerRunning flag + time.Sleep(50 * time.Millisecond) + + // Final pause to stop scheduler + q.Pause() + + // Allow time for goroutines to clean up + runtime.GC() + time.Sleep(200 * time.Millisecond) + + finalGoroutines := runtime.NumGoroutine() + + // The number of goroutines should not have increased significantly + // Only ONE scheduler goroutine should have been created despite multiple Resume calls + goroutineDiff := finalGoroutines - initialGoroutines + require.LessOrEqual(t, goroutineDiff, 2, + "Goroutine leak detected from multiple Resume calls: initial=%d, final=%d, diff=%d", + initialGoroutines, finalGoroutines, goroutineDiff) +} diff --git a/scheduler_test.go b/scheduler_test.go new file mode 100644 index 0000000..282243e --- /dev/null +++ b/scheduler_test.go @@ -0,0 +1,271 @@ +package queue_test + +import ( + "sync" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" + "github.com/tinh-tinh/queue/v2" +) + +func Test_ScheduleJob(t *testing.T) { + schedulerQueue := queue.New("scheduler_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 3, + RetryFailures: 0, + Pattern: "@every 1s", // Enable scheduler + ScheduleInterval: 1 * time.Second, + }) + + // Track processed jobs + processedJobs := make(map[string]bool) + var mu sync.Mutex + + schedulerQueue.Process(func(job *queue.Job) { + job.Process(func() error { + mu.Lock() + processedJobs[job.Id] = true + mu.Unlock() + return nil + }) + }) + + // Schedule a job to run 2 seconds from now + runAt := time.Now().Add(2 * time.Second) + err := schedulerQueue.ScheduleJob("scheduled_job_1", runAt) + require.Nil(t, err) + + // Verify job is in scheduled set + scheduledJobs, err := schedulerQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 1, len(scheduledJobs)) + require.Equal(t, "scheduled_job_1", scheduledJobs[0].JobId) + + // Wait for job to be processed (2s + 1s buffer) + time.Sleep(3 * time.Second) + + // Verify job was processed + mu.Lock() + require.True(t, processedJobs["scheduled_job_1"]) + mu.Unlock() + + // Verify job is no longer in scheduled set + scheduledJobs, err = schedulerQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 0, len(scheduledJobs)) +} + +func Test_RemoveScheduledJob(t *testing.T) { + schedulerQueue := queue.New("remove_scheduled_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 3, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Schedule a job for 5 seconds from now + runAt := time.Now().Add(5 * time.Second) + err := schedulerQueue.ScheduleJob("job_to_remove", runAt) + require.Nil(t, err) + + // Verify job is scheduled + scheduledJobs, err := schedulerQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 1, len(scheduledJobs)) + + // Remove the scheduled job + err = schedulerQueue.RemoveScheduledJob("job_to_remove") + require.Nil(t, err) + + // Verify job is no longer scheduled + scheduledJobs, err = schedulerQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 0, len(scheduledJobs)) +} + +func Test_PauseScheduler(t *testing.T) { + pauseQueue := queue.New("pause_scheduler_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 3, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Track processed jobs + processedJobs := make(map[string]bool) + var mu sync.Mutex + + pauseQueue.Process(func(job *queue.Job) { + job.Process(func() error { + mu.Lock() + processedJobs[job.Id] = true + mu.Unlock() + return nil + }) + }) + + // Schedule a job to run 2 seconds from now + runAt := time.Now().Add(2 * time.Second) + err := pauseQueue.ScheduleJob("paused_job", runAt) + require.Nil(t, err) + + // Pause the queue immediately + pauseQueue.Pause() + + // Wait for when the job should have been processed + time.Sleep(3 * time.Second) + + // Verify job was NOT processed (scheduler stopped) + mu.Lock() + require.False(t, processedJobs["paused_job"]) + mu.Unlock() + + // Verify job is still in scheduled set + scheduledJobs, err := pauseQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 1, len(scheduledJobs)) + require.Equal(t, "paused_job", scheduledJobs[0].JobId) +} + +func Test_ResumeScheduler(t *testing.T) { + resumeQueue := queue.New("resume_scheduler_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 3, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Track processed jobs + processedJobs := make(map[string]bool) + var mu sync.Mutex + + resumeQueue.Process(func(job *queue.Job) { + job.Process(func() error { + mu.Lock() + processedJobs[job.Id] = true + mu.Unlock() + return nil + }) + }) + + // Pause the queue + resumeQueue.Pause() + + // Schedule a job to run 3 seconds from now + runAt := time.Now().Add(3 * time.Second) + err := resumeQueue.ScheduleJob("resume_job", runAt) + require.Nil(t, err) + + // Resume the queue + resumeQueue.Resume() + + // Wait for job to be processed (3s + 2s buffer) + time.Sleep(5 * time.Second) + + // Verify job was processed (scheduler restarted) + mu.Lock() + require.True(t, processedJobs["resume_job"]) + mu.Unlock() + + // Verify job is no longer in scheduled set + scheduledJobs, err := resumeQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 0, len(scheduledJobs)) +} + +func Test_PauseResumeMultipleScheduledJobs(t *testing.T) { + multiQueue := queue.New("multi_pause_resume_test", &queue.Options{ + Connect: &redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }, + Workers: 3, + RetryFailures: 0, + Pattern: "@every 1s", + ScheduleInterval: 1 * time.Second, + }) + + // Track processed jobs + processedJobs := make(map[string]bool) + var mu sync.Mutex + + multiQueue.Process(func(job *queue.Job) { + job.Process(func() error { + mu.Lock() + processedJobs[job.Id] = true + mu.Unlock() + return nil + }) + }) + + // Schedule multiple jobs + runAt1 := time.Now().Add(2 * time.Second) + runAt2 := time.Now().Add(3 * time.Second) + runAt3 := time.Now().Add(4 * time.Second) + + err := multiQueue.ScheduleJob("job1", runAt1) + require.Nil(t, err) + err = multiQueue.ScheduleJob("job2", runAt2) + require.Nil(t, err) + err = multiQueue.ScheduleJob("job3", runAt3) + require.Nil(t, err) + + // Verify all jobs are scheduled + scheduledJobs, err := multiQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 3, len(scheduledJobs)) + + // Pause after 2.5 seconds (job1 should have been processed) + time.Sleep(2500 * time.Millisecond) + multiQueue.Pause() + + // Wait a bit more + time.Sleep(2 * time.Second) + + // Verify only job1 was processed + mu.Lock() + require.True(t, processedJobs["job1"]) + require.False(t, processedJobs["job2"]) + require.False(t, processedJobs["job3"]) + mu.Unlock() + + // Resume the queue + multiQueue.Resume() + + // Wait for remaining jobs to be processed + time.Sleep(2 * time.Second) + + // Verify all jobs were eventually processed + mu.Lock() + require.True(t, processedJobs["job1"]) + require.True(t, processedJobs["job2"]) + require.True(t, processedJobs["job3"]) + mu.Unlock() + + // Verify all jobs are removed from scheduled set + scheduledJobs, err = multiQueue.GetScheduledJobs() + require.Nil(t, err) + require.Equal(t, 0, len(scheduledJobs)) +}