Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queue
import (
"context"
"fmt"
"strings"
"time"
)

Expand Down Expand Up @@ -127,9 +126,6 @@ func (job *Job) IsFinished() bool {
}

func (job *Job) getKey() string {
if job.queue.config.Prefix != "" {
prefix := job.queue.config.Prefix
return fmt.Sprintf("%s:%s", strings.ToLower(prefix+job.queue.Name), job.Id)
}
return fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id)
// Use cached queue key to avoid repeated string operations
return job.queue.cachedKey + ":" + job.Id
}
64 changes: 53 additions & 11 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Queue struct {
running bool
config Options
Logger Logger
cachedKey string // Cache the computed key to avoid repeated string operations
}

type RateLimiter struct {
Expand Down Expand Up @@ -95,6 +96,13 @@ func New(name string, opt *Options) *Queue {
queue.config.Timeout = 1 * time.Minute
}

// Pre-compute and cache the key
if opt.Prefix != "" {
queue.cachedKey = strings.ToLower(opt.Prefix + name)
} else {
queue.cachedKey = strings.ToLower(name)
}

return queue
}

Expand All @@ -110,8 +118,12 @@ func (q *Queue) AddJob(opt AddJobOptions) {
q.formatLog(LoggerInfo, "Add job %s to waiting", opt.Id)
job = q.newJob(opt)
}
q.jobs = append(q.jobs, *job)
sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority })
// Use binary search to find insertion point for O(log n) instead of O(n log n) sort
insertIdx := sort.Search(len(q.jobs), func(i int) bool {
return q.jobs[i].Priority < job.Priority
})
// Efficient insertion: grow slice and insert at correct position
q.jobs = append(q.jobs[:insertIdx], append([]Job{*job}, q.jobs[insertIdx:]...)...)
q.Run()
}

Expand All @@ -125,7 +137,16 @@ type AddJobOptions struct {
// rate limited, the jobs are delayed. Otherwise, the jobs are added to the
// waiting list and the queue is run.
func (q *Queue) BulkAddJob(options []AddJobOptions) {
if len(options) == 0 {
q.Run()
return
}

// Sort input options by priority once
sort.SliceStable(options, func(i, j int) bool { return options[i].Priority > options[j].Priority })

// Pre-allocate space for new jobs
newJobs := make([]Job, 0, len(options))
for _, option := range options {
var job *Job
if q.IsLimit() {
Expand All @@ -135,12 +156,34 @@ func (q *Queue) BulkAddJob(options []AddJobOptions) {
q.formatLog(LoggerInfo, "Add job %s to waiting", option.Id)
job = q.newJob(option)
}
q.jobs = append(q.jobs, *job)
newJobs = append(newJobs, *job)
}
sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority })

// Merge sorted slices efficiently
q.jobs = mergeSortedJobs(q.jobs, newJobs)
q.Run()
}

// mergeSortedJobs merges two sorted job slices by priority (descending)
func mergeSortedJobs(jobs1, jobs2 []Job) []Job {
result := make([]Job, 0, len(jobs1)+len(jobs2))
i, j := 0, 0

for i < len(jobs1) && j < len(jobs2) {
if jobs1[i].Priority >= jobs2[j].Priority {
result = append(result, jobs1[i])
i++
} else {
result = append(result, jobs2[j])
j++
}
}

result = append(result, jobs1[i:]...)
result = append(result, jobs2[j:]...)
return result
}

// Process sets the callback for the queue to process jobs. If the queue has a
// scheduler, it will be started with the given cron pattern. Otherwise, the
// callback is simply stored.
Expand Down Expand Up @@ -187,7 +230,6 @@ func (q *Queue) Run() {
numJobs := execJobs[:min]

ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout)
defer cancel()

var wg sync.WaitGroup
done := make(chan struct{})
Expand Down Expand Up @@ -218,6 +260,7 @@ func (q *Queue) Run() {
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
}
cancel()

// Handle remove job
execJobs = execJobs[min:]
Expand Down Expand Up @@ -256,20 +299,22 @@ func (q *Queue) Retry() {
numJobs := execJobs[:min]

ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout)
defer cancel()

var wg sync.WaitGroup
done := make(chan struct{})

var finishedJob []string
var finishedMu sync.Mutex
for i := range numJobs {
job := numJobs[i]
wg.Add(1)
go func(job *Job) {
defer wg.Done()
q.jobFnc(job)
if job.IsFinished() {
finishedMu.Lock()
finishedJob = append(finishedJob, job.Id)
finishedMu.Unlock()
}
}(job)
}
Expand All @@ -285,6 +330,7 @@ func (q *Queue) Retry() {
case <-ctx.Done():
q.MarkJobFailedTimeout(numJobs)
}
cancel()

if len(finishedJob) > 0 {
for _, id := range finishedJob {
Expand Down Expand Up @@ -423,9 +469,5 @@ func (q *Queue) log(logType LoggerType, format string, v ...any) {
}

func (q *Queue) getKey() string {
if q.config.Prefix != "" {
prefix := q.config.Prefix
return strings.ToLower(prefix + q.Name)
}
return strings.ToLower(q.Name)
return q.cachedKey
}