diff --git a/consumer.go b/consumer.go index 8a31c15..1e0de97 100644 --- a/consumer.go +++ b/consumer.go @@ -221,7 +221,7 @@ func (c *Consumer) StopTimeout(timeout time.Duration) error { }() timer := time.NewTimer(timeout) - defer timer.Stop() + defer internal.CleanupTimer(timer) done := make(chan struct{}, 1) go func() { @@ -375,6 +375,7 @@ func (c *Consumer) reserveOne(ctx context.Context) (*Message, error) { func (c *Consumer) fetcher(ctx context.Context, fetcherID int32) { timer := time.NewTimer(time.Minute) timer.Stop() + defer internal.CleanupTimer(timer) fetchTimeout := c.opt.ReservationTimeout fetchTimeout -= fetchTimeout / 10 @@ -428,7 +429,9 @@ func (c *Consumer) fetchMessages( c.voteQueueFull() } + internal.CleanupTimer(timer) timer.Reset(timeout) + defer internal.CleanupTimer(timer) for i := range msgs { msg := &msgs[i] @@ -442,10 +445,6 @@ func (c *Consumer) fetchMessages( } } - if !timer.Stop() { - <-timer.C - } - return false, nil } @@ -459,6 +458,7 @@ func (c *Consumer) worker(ctx context.Context, workerID int32) { timer := time.NewTimer(time.Minute) timer.Stop() + defer internal.CleanupTimer(timer) for { if workerID >= atomic.LoadInt32(&c.numWorker) { @@ -492,12 +492,11 @@ func (c *Consumer) waitMessage(ctx context.Context, timer *time.Timer) *Message c.ensureFetcher(ctx) + internal.CleanupTimer(timer) timer.Reset(workerIdleTimeout) + defer internal.CleanupTimer(timer) select { case msg := <-c.buffer: - if !timer.Stop() { - <-timer.C - } return msg case <-timer.C: c.voteQueueEmpty() @@ -712,6 +711,7 @@ func (c *Consumer) lockWorker( timer := time.NewTimer(time.Minute) timer.Stop() + defer internal.CleanupTimer(timer) for { var err error @@ -733,6 +733,7 @@ func (c *Consumer) lockWorker( lock = nil } + internal.CleanupTimer(timer) timer.Reset(500 * time.Millisecond) select { case <-timer.C: @@ -791,9 +792,10 @@ func (c *Consumer) queueEmpty() bool { func (c *Consumer) autotune(ctx context.Context, cfg *consumerConfig) { timer := time.NewTimer(time.Hour) - defer timer.Stop() + defer internal.CleanupTimer(timer) for c.timing() == 0 { + internal.CleanupTimer(timer) timer.Reset(250 * time.Millisecond) select { case <-timer.C: @@ -804,6 +806,7 @@ func (c *Consumer) autotune(ctx context.Context, cfg *consumerConfig) { } for { + internal.CleanupTimer(timer) timer.Reset(c.autotuneInterval()) select { case <-timer.C: diff --git a/internal/util.go b/internal/util.go index 04aa6a0..09b4810 100644 --- a/internal/util.go +++ b/internal/util.go @@ -3,6 +3,7 @@ package internal import ( "encoding/ascii85" "errors" + "time" ) func MaxEncodedLen(n int) int { @@ -27,3 +28,16 @@ func DecodeString(src string) ([]byte, error) { } return dst[:ndst], nil } + +//------------------------------------------------------------------------------ + +// CleanupTimer will stop a timer and purge the timer's channel to ensure +// all timer related resources are cleaned up. +func CleanupTimer(timer *time.Timer) { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } +} diff --git a/memqueue/queue.go b/memqueue/queue.go index 844dafe..24b3178 100644 --- a/memqueue/queue.go +++ b/memqueue/queue.go @@ -24,9 +24,7 @@ func (q *scheduler) Schedule(msg *taskq.Message, fn func()) { timer := time.AfterFunc(msg.Delay, func() { // Remove our entry from the map - q.timerLock.Lock() - delete(q.timerMap, msg) - q.timerLock.Unlock() + q.Remove(msg) fn() }) @@ -43,7 +41,7 @@ func (q *scheduler) Remove(msg *taskq.Message) { timer, ok := q.timerMap[msg] if ok { - timer.Stop() + internal.CleanupTimer(timer) delete(q.timerMap, msg) } } @@ -54,7 +52,7 @@ func (q *scheduler) Purge() int { // Stop all delayed items for _, timer := range q.timerMap { - timer.Stop() + internal.CleanupTimer(timer) } n := len(q.timerMap)