diff --git a/job.go b/job.go index 435edc9..42aa051 100644 --- a/job.go +++ b/job.go @@ -3,7 +3,6 @@ package queue import ( "context" "fmt" - "strings" "time" ) @@ -127,9 +126,6 @@ func (job *Job) IsFinished() bool { } func (job *Job) getKey() string { - if job.queue.config.Prefix != "" { - prefix := job.queue.config.Prefix - return fmt.Sprintf("%s:%s", strings.ToLower(prefix+job.queue.Name), job.Id) - } - return fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id) + // Use cached queue key to avoid repeated string operations + return job.queue.cachedKey + ":" + job.Id } diff --git a/queue.go b/queue.go index e744192..af82696 100644 --- a/queue.go +++ b/queue.go @@ -32,6 +32,7 @@ type Queue struct { running bool config Options Logger Logger + cachedKey string // Cache the computed key to avoid repeated string operations } type RateLimiter struct { @@ -95,6 +96,13 @@ func New(name string, opt *Options) *Queue { queue.config.Timeout = 1 * time.Minute } + // Pre-compute and cache the key + if opt.Prefix != "" { + queue.cachedKey = strings.ToLower(opt.Prefix + name) + } else { + queue.cachedKey = strings.ToLower(name) + } + return queue } @@ -110,8 +118,12 @@ func (q *Queue) AddJob(opt AddJobOptions) { q.formatLog(LoggerInfo, "Add job %s to waiting", opt.Id) job = q.newJob(opt) } - q.jobs = append(q.jobs, *job) - sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority }) + // Use binary search to find insertion point for O(log n) instead of O(n log n) sort + insertIdx := sort.Search(len(q.jobs), func(i int) bool { + return q.jobs[i].Priority < job.Priority + }) + // Efficient insertion: grow slice and insert at correct position + q.jobs = append(q.jobs[:insertIdx], append([]Job{*job}, q.jobs[insertIdx:]...)...) q.Run() } @@ -125,7 +137,16 @@ type AddJobOptions struct { // rate limited, the jobs are delayed. Otherwise, the jobs are added to the // waiting list and the queue is run. func (q *Queue) BulkAddJob(options []AddJobOptions) { + if len(options) == 0 { + q.Run() + return + } + + // Sort input options by priority once sort.SliceStable(options, func(i, j int) bool { return options[i].Priority > options[j].Priority }) + + // Pre-allocate space for new jobs + newJobs := make([]Job, 0, len(options)) for _, option := range options { var job *Job if q.IsLimit() { @@ -135,12 +156,34 @@ func (q *Queue) BulkAddJob(options []AddJobOptions) { q.formatLog(LoggerInfo, "Add job %s to waiting", option.Id) job = q.newJob(option) } - q.jobs = append(q.jobs, *job) + newJobs = append(newJobs, *job) } - sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority }) + + // Merge sorted slices efficiently + q.jobs = mergeSortedJobs(q.jobs, newJobs) q.Run() } +// mergeSortedJobs merges two sorted job slices by priority (descending) +func mergeSortedJobs(jobs1, jobs2 []Job) []Job { + result := make([]Job, 0, len(jobs1)+len(jobs2)) + i, j := 0, 0 + + for i < len(jobs1) && j < len(jobs2) { + if jobs1[i].Priority >= jobs2[j].Priority { + result = append(result, jobs1[i]) + i++ + } else { + result = append(result, jobs2[j]) + j++ + } + } + + result = append(result, jobs1[i:]...) + result = append(result, jobs2[j:]...) + return result +} + // Process sets the callback for the queue to process jobs. If the queue has a // scheduler, it will be started with the given cron pattern. Otherwise, the // callback is simply stored. @@ -187,7 +230,6 @@ func (q *Queue) Run() { numJobs := execJobs[:min] ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) - defer cancel() var wg sync.WaitGroup done := make(chan struct{}) @@ -218,6 +260,7 @@ func (q *Queue) Run() { case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } + cancel() // Handle remove job execJobs = execJobs[min:] @@ -256,12 +299,12 @@ func (q *Queue) Retry() { numJobs := execJobs[:min] ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) - defer cancel() var wg sync.WaitGroup done := make(chan struct{}) var finishedJob []string + var finishedMu sync.Mutex for i := range numJobs { job := numJobs[i] wg.Add(1) @@ -269,7 +312,9 @@ func (q *Queue) Retry() { defer wg.Done() q.jobFnc(job) if job.IsFinished() { + finishedMu.Lock() finishedJob = append(finishedJob, job.Id) + finishedMu.Unlock() } }(job) } @@ -285,6 +330,7 @@ func (q *Queue) Retry() { case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } + cancel() if len(finishedJob) > 0 { for _, id := range finishedJob { @@ -423,9 +469,5 @@ func (q *Queue) log(logType LoggerType, format string, v ...any) { } func (q *Queue) getKey() string { - if q.config.Prefix != "" { - prefix := q.config.Prefix - return strings.ToLower(prefix + q.Name) - } - return strings.ToLower(q.Name) + return q.cachedKey }