Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package queue

import (
"testing"
)

// BenchmarkMergeSortedJobs benchmarks the mergeSortedJobs function
func BenchmarkMergeSortedJobs(b *testing.B) {
// Create two sorted job slices
jobs1 := make([]Job, 100)
jobs2 := make([]Job, 100)
for i := 0; i < 100; i++ {
jobs1[i] = Job{Priority: 200 - i*2}
jobs2[i] = Job{Priority: 199 - i*2}
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = mergeSortedJobs(jobs1, jobs2)
}
}

// BenchmarkSliceInsertion benchmarks the optimized slice insertion
func BenchmarkSliceInsertion(b *testing.B) {
b.Run("optimized_copy", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := make([]Job, 100)
for j := 0; j < 100; j++ {
jobs[j] = Job{Priority: 100 - j}
}
// Optimized insertion at middle
insertIdx := 50
job := Job{Priority: 75}
jobs = append(jobs, Job{})
copy(jobs[insertIdx+1:], jobs[insertIdx:])
jobs[insertIdx] = job
}
})

b.Run("double_append", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := make([]Job, 100)
for j := 0; j < 100; j++ {
jobs[j] = Job{Priority: 100 - j}
}
// Old double-append pattern
insertIdx := 50
job := Job{Priority: 75}
jobs = append(jobs[:insertIdx], append([]Job{job}, jobs[insertIdx:]...)...)
_ = jobs
}
})
}

// BenchmarkPreallocatedSlice benchmarks the preallocated vs non-preallocated slice
func BenchmarkPreallocatedSlice(b *testing.B) {
b.Run("preallocated", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := make([]*Job, 0, 100)
for j := 0; j < 100; j++ {
jobs = append(jobs, &Job{Priority: j})
}
}
})

b.Run("non_preallocated", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := []*Job{}
for j := 0; j < 100; j++ {
jobs = append(jobs, &Job{Priority: j})
}
}
})
}

// BenchmarkSliceClear benchmarks clearing a slice
func BenchmarkSliceClear(b *testing.B) {
b.Run("reuse_capacity", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := make([]*Job, 100)
for j := 0; j < 100; j++ {
jobs[j] = &Job{Priority: j}
}
// Clear by reslicing to reuse capacity
jobs = jobs[:0]
_ = jobs
}
})

b.Run("new_slice", func(b *testing.B) {
for i := 0; i < b.N; i++ {
jobs := make([]*Job, 100)
for j := 0; j < 100; j++ {
jobs[j] = &Job{Priority: j}
}
// Create new empty slice
jobs = []*Job{}
_ = jobs
}
})
}

// BenchmarkMin benchmarks the Min function
func BenchmarkMin(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = Min(100, 50)
_ = Min(50, 100)
}
}

// BenchmarkJobCreation benchmarks job creation with nil vs empty stacktrace
func BenchmarkJobCreation(b *testing.B) {
b.Run("nil_stacktrace", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = &Job{
Id: "test",
Priority: 1,
Status: WaitStatus,
}
}
})

b.Run("empty_stacktrace", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = &Job{
Id: "test",
Priority: 1,
Status: WaitStatus,
Stacktrace: []string{},
}
}
})
}

3 changes: 1 addition & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func (queue *Queue) newJob(opt AddJobOptions) *Job {
Data: opt.Data,
Priority: opt.Priority,
Status: WaitStatus,
Stacktrace: []string{},
queue: queue,
RetryFailures: queue.config.RetryFailures,
}
Expand All @@ -53,7 +52,6 @@ func (queue *Queue) delayJob(opt AddJobOptions) *Job {
Data: opt.Data,
Priority: opt.Priority,
Status: DelayedStatus,
Stacktrace: []string{},
queue: queue,
RetryFailures: queue.config.RetryFailures,
}
Expand Down Expand Up @@ -92,6 +90,7 @@ func (job *Job) Process(cb Callback) {
func (job *Job) HandlerError(reasonError string) {
job.FailedReason = reasonError
job.Status = FailedStatus
job.Stacktrace = append(job.Stacktrace, reasonError)

// Store error
if job.RetryFailures <= 0 {
Expand Down
17 changes: 11 additions & 6 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func (q *Queue) AddJob(opt AddJobOptions) {
insertIdx := sort.Search(len(q.jobs), func(i int) bool {
return q.jobs[i].Priority < job.Priority
})
// Efficient insertion: grow slice and insert at correct position
q.jobs = append(q.jobs[:insertIdx], append([]Job{*job}, q.jobs[insertIdx:]...)...)
// Efficient insertion: grow slice by one, shift elements, and insert at correct position
q.jobs = append(q.jobs, Job{})
copy(q.jobs[insertIdx+1:], q.jobs[insertIdx:])
q.jobs[insertIdx] = *job
q.Run()
}

Expand Down Expand Up @@ -212,7 +214,8 @@ func (q *Queue) Run() {
q.formatLog(LoggerError, "Error when lock mutex: %v", err)
return
}
execJobs := []*Job{}
// Pre-allocate with estimated capacity to reduce allocations
execJobs := make([]*Job, 0, len(q.jobs))
for i := range q.jobs {
if q.jobs[i].IsReady() {
execJobs = append(execJobs, &q.jobs[i])
Expand Down Expand Up @@ -281,7 +284,8 @@ func (q *Queue) Run() {
// and removes it from the list of jobs to retry. Finally, it unlocks the mutex.

func (q *Queue) Retry() {
execJobs := []*Job{}
// Pre-allocate with estimated capacity to reduce allocations
execJobs := make([]*Job, 0, len(q.jobs))
// For retry failures
for i := range q.jobs {
if q.jobs[i].Status == DelayedStatus {
Expand All @@ -303,7 +307,8 @@ func (q *Queue) Retry() {
var wg sync.WaitGroup
done := make(chan struct{})

var finishedJob []string
// Pre-allocate finishedJob slice with expected capacity
finishedJob := make([]string, 0, min)
var finishedMu sync.Mutex
for i := range numJobs {
job := numJobs[i]
Expand Down Expand Up @@ -335,7 +340,7 @@ func (q *Queue) Retry() {
if len(finishedJob) > 0 {
for _, id := range finishedJob {
if len(execJobs) == 1 && execJobs[0].Id == id {
execJobs = []*Job{}
execJobs = execJobs[:0]
break
}
idx := slices.IndexFunc(execJobs, func(j *Job) bool { return j.Id == id })
Expand Down