diff --git a/pool.go b/pool.go index c411d2f..245ad26 100644 --- a/pool.go +++ b/pool.go @@ -114,6 +114,18 @@ func (res *Resource[T]) IdleDuration() time.Duration { return time.Duration(nanotime() - res.lastUsedNano) } +type ResourceTraceStats struct { + CreationTime time.Time + IdleDuration time.Duration +} + +func (res *Resource[T]) traceStats() ResourceTraceStats { + return ResourceTraceStats{ + CreationTime: res.CreationTime(), + IdleDuration: res.IdleDuration(), + } +} + // Pool is a concurrency-safe resource pool. type Pool[T any] struct { // mux is the pool internal lock. Any modification of shared state of @@ -135,6 +147,7 @@ type Pool[T any] struct { constructor Constructor[T] destructor Destructor[T] maxSize int32 + tracer Tracer acquireCount int64 acquireDuration time.Duration @@ -153,6 +166,7 @@ type Config[T any] struct { Constructor Constructor[T] Destructor Destructor[T] MaxSize int32 + Tracer Tracer } // NewPool creates a new pool. Returns an error iff MaxSize is less than 1. @@ -167,6 +181,7 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), idleResources: genstack.NewGenStack[*Resource[T]](), maxSize: config.MaxSize, + tracer: config.Tracer, constructor: config.Constructor, destructor: config.Destructor, baseAcquireCtx: baseAcquireCtx, @@ -352,13 +367,25 @@ func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { // // This function exists solely only for benchmarking purposes. func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { + var trace tracerSpan startNano := nanotime() + if p.tracer != nil { + ctx = p.tracer.AcquireStart(ctx, AcquireStartData{ + StartNano: startNano, + }) + trace = tracerSpan{ + t: p.tracer, + start: startNano, + } + } var waitedForLock bool + var waitTime time.Duration if !p.acquireSem.TryAcquire(1) { waitedForLock = true err := p.acquireSem.Acquire(ctx, 1) if err != nil { + defer trace.acquireEndErr(ctx, err) p.canceledAcquireCount.Add(1) return nil, err } @@ -366,6 +393,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { p.mux.Lock() if p.closed { + defer trace.acquireEndErr(ctx, ErrClosedPool) p.acquireSem.Release(1) p.mux.Unlock() return nil, ErrClosedPool @@ -373,11 +401,12 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // If a resource is available in the pool. if res := p.tryAcquireIdleResource(); res != nil { - waitTime := time.Duration(nanotime() - startNano) + waitTime = time.Duration(nanotime() - startNano) if waitedForLock { p.emptyAcquireCount += 1 p.emptyAcquireWaitTime += waitTime } + defer trace.acquireEnd(ctx, waitTime, res.traceStats, false) p.acquireCount += 1 p.acquireDuration += waitTime p.mux.Unlock() @@ -395,6 +424,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { res, err := p.initResourceValue(ctx, res) if err != nil { + defer trace.acquireEndErr(ctx, err) return nil, err } @@ -403,7 +433,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { p.emptyAcquireCount += 1 p.acquireCount += 1 - waitTime := time.Duration(nanotime() - startNano) + waitTime = time.Duration(nanotime() - startNano) + defer trace.acquireEnd(ctx, waitTime, res.traceStats, true) p.acquireDuration += waitTime p.emptyAcquireWaitTime += waitTime diff --git a/pool_test.go b/pool_test.go index a313568..54543b2 100644 --- a/pool_test.go +++ b/pool_test.go @@ -20,6 +20,31 @@ import ( "golang.org/x/sync/semaphore" ) +// TODO(draft) This is just to demo. +// Will do testable example for final PR. +type testLogTracer struct { + puddle.BaseTracer + t *testing.T +} + +func (t *testLogTracer) Logf(format string, a ...any) { + t.t.Helper() + t.t.Logf(format, a...) +} + +func (t *testLogTracer) AcquireEnd(ctx context.Context, data puddle.AcquireEndData) { + t.t.Helper() + if data.Err != nil { + t.Logf("Acquire error after %v: %v", data.AcquireDuration, data.Err) + return + } + if data.InitDuration > 0 { + t.Logf("got new resource after %v. blocked for %v, init took: %v", data.AcquireDuration, data.WaitDuration, data.InitDuration) + return + } + t.Logf("got pooled resource after %v. blocked for %v, resource is %v old and has been idle for %v", data.AcquireDuration, data.WaitDuration, time.Since(data.ResourceStats.CreationTime), data.ResourceStats.IdleDuration) +} + type Counter struct { mutex sync.Mutex n int @@ -218,7 +243,14 @@ func TestPoolAcquireCreatesResourceRespectingContext(t *testing.T) { func TestPoolAcquireReusesResources(t *testing.T) { constructor, createCounter := createConstructor() - pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + pool, err := puddle.NewPool(&puddle.Config[int]{ + Constructor: constructor, + Destructor: stubDestructor, + MaxSize: 10, + Tracer: &testLogTracer{ + t: t, + }, + }) require.NoError(t, err) res, err := pool.Acquire(context.Background()) @@ -328,7 +360,14 @@ func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) { } return constructorCalls.Next(), nil } - pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10}) + pool, err := puddle.NewPool(&puddle.Config[int]{ + Constructor: constructor, + Destructor: stubDestructor, + MaxSize: 10, + Tracer: &testLogTracer{ + t: t, + }, + }) require.NoError(t, err) res, err := pool.Acquire(ctx) @@ -1173,7 +1212,6 @@ func startAcceptOnceDummyServer(laddr string) { } } }() - } func ExamplePool() { diff --git a/tracing.go b/tracing.go new file mode 100644 index 0000000..84b2e4a --- /dev/null +++ b/tracing.go @@ -0,0 +1,101 @@ +package puddle + +import ( + "context" + "time" +) + +// Tracer traces pool actions. +type Tracer interface { + // AcquireStart is called at the beginning of Acquire calls. The return + // context is used for the rest of the call and will be passed to AcquireEnd + AcquireStart(ctx context.Context, data AcquireStartData) context.Context + AcquireEnd(ctx context.Context, data AcquireEndData) + // ReleaseStart is called at the beginning of Release calls. The return + // context is used for the rest of the call and will be passed to ReleaseEnd + ReleaseStart(ctx context.Context, data ReleaseStartData) context.Context + ReleaseEnd(ctx context.Context, data ReleaseEndData) +} + +type AcquireStartData struct { + StartNano int64 +} + +type AcquireEndData struct { + WaitDuration time.Duration + AcquireDuration time.Duration + InitDuration time.Duration + ResourceStats ResourceTraceStats + Err error +} + +type ReleaseStartData struct { + HeldDuration time.Duration +} + +type ReleaseEndData struct { + BlockDuration time.Duration + Err error +} + +type tracerSpan struct { + t Tracer + start int64 +} + +func (t tracerSpan) acquireEndErr(ctx context.Context, err error) { + if t.t == nil { + return + } + t.t.AcquireEnd(ctx, AcquireEndData{ + AcquireDuration: time.Duration(nanotime() - t.start), + Err: err, + }) +} + +func (t tracerSpan) acquireEnd(ctx context.Context, waitTime time.Duration, stats statFn, isNew bool) { + if t.t == nil { + return + } + var initDuration time.Duration + resourceStats := stats() + if isNew { + initDuration = time.Since(resourceStats.CreationTime) + } + t.t.AcquireEnd(ctx, AcquireEndData{ + WaitDuration: waitTime, + AcquireDuration: time.Duration(nanotime() - t.start), + InitDuration: initDuration, + ResourceStats: resourceStats, + }) +} + +type statFn func() ResourceTraceStats + +// BaseTracer implements [Tracer] methods as no-ops. +// +// It is intended to be composed with types that only need to implement a subset +// of Tracer methods. +// +// Example usage: +// +// // MyTracer only hooks AcquireEnd +// type MyTracer struct { +// pool.BaseTracer +// } +// +// func AcquireEnd(ctx context.Context, d AcquireEndData) { +// /* do something with d */ +// } +type BaseTracer struct{} + +func (BaseTracer) AcquireStart(ctx context.Context, _ AcquireStartData) context.Context { + return ctx +} +func (BaseTracer) AcquireEnd(context.Context, AcquireEndData) {} +func (BaseTracer) ReleaseStart(ctx context.Context, _ ReleaseStartData) context.Context { + return ctx +} +func (BaseTracer) ReleaseEnd(context.Context, ReleaseEndData) {} + +var _ Tracer = BaseTracer{}