diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..4cf7353 --- /dev/null +++ b/metrics.go @@ -0,0 +1,55 @@ +package puddle + +import ( + "sync/atomic" + "time" +) + +// AcquireObserver is called after successful resource acquisition. +// When using AcquireObserver, make sure it executes as possible in +// order not to downgrade overall pool performance. +type AcquireObserver func(d time.Duration, isEmptyAcquire bool) + +// Metrics hold puddle metrics. +type Metrics struct { + // externalAcquireObserver is an optional callback func that user + // may pass to enable extended monitoring, e.g. prom histogram measurement. + externalAcquireObserver AcquireObserver + + acquireCount int64 + acquireDuration time.Duration + emptyAcquireCount int64 + canceledAcquireCount atomic.Int64 +} + +func NewMetrics(opts ...MetricsOption) *Metrics { + var m Metrics + for _, o := range opts { + o(&m) + } + return &m +} + +func (m *Metrics) observeAcquireCancel() { + m.canceledAcquireCount.Add(1) +} + +// observeAcquireDuration is not thread safe. +func (m *Metrics) observeAcquireDuration(d time.Duration, isEmptyAcquire bool) { + m.acquireCount++ + m.acquireDuration += d + if isEmptyAcquire { + m.emptyAcquireCount++ + } + + if m.externalAcquireObserver != nil { + m.externalAcquireObserver(d, isEmptyAcquire) + } +} + +type MetricsOption func(m *Metrics) + +// WithExternalAcquireObserver sets optional callback function for duration observation. +func WithExternalAcquireObserver(observer AcquireObserver) MetricsOption { + return func(m *Metrics) { m.externalAcquireObserver = observer } +} diff --git a/pool.go b/pool.go index c8edc0f..50c97d4 100644 --- a/pool.go +++ b/pool.go @@ -4,11 +4,11 @@ import ( "context" "errors" "sync" - "sync/atomic" "time" - "github.com/jackc/puddle/v2/internal/genstack" "golang.org/x/sync/semaphore" + + "github.com/jackc/puddle/v2/internal/genstack" ) const ( @@ -132,15 +132,12 @@ type Pool[T any] struct { allResources resList[T] idleResources *genstack.GenStack[*Resource[T]] + metrics *Metrics + constructor Constructor[T] destructor Destructor[T] maxSize int32 - acquireCount int64 - acquireDuration time.Duration - emptyAcquireCount int64 - canceledAcquireCount atomic.Int64 - resetCount int baseAcquireCtx context.Context @@ -149,6 +146,7 @@ type Pool[T any] struct { } type Config[T any] struct { + Metrics *Metrics Constructor Constructor[T] Destructor Destructor[T] MaxSize int32 @@ -160,17 +158,22 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) { return nil, errors.New("MaxSize must be >= 1") } - baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background()) + p := &Pool[T]{ + acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), + idleResources: genstack.NewGenStack[*Resource[T]](), + maxSize: config.MaxSize, + constructor: config.Constructor, + destructor: config.Destructor, + metrics: config.Metrics, + } + + p.baseAcquireCtx, p.cancelBaseAcquireCtx = context.WithCancel(context.Background()) - return &Pool[T]{ - acquireSem: semaphore.NewWeighted(int64(config.MaxSize)), - idleResources: genstack.NewGenStack[*Resource[T]](), - maxSize: config.MaxSize, - constructor: config.Constructor, - destructor: config.Destructor, - baseAcquireCtx: baseAcquireCtx, - cancelBaseAcquireCtx: cancelBaseAcquireCtx, - }, nil + if p.metrics == nil { + p.metrics = NewMetrics() + } + + return p, nil } // Close destroys all resources in the pool and rejects future Acquire calls. @@ -264,10 +267,10 @@ func (p *Pool[T]) Stat() *Stat { s := &Stat{ maxResources: p.maxSize, - acquireCount: p.acquireCount, - emptyAcquireCount: p.emptyAcquireCount, - canceledAcquireCount: p.canceledAcquireCount.Load(), - acquireDuration: p.acquireDuration, + acquireCount: p.metrics.acquireCount, + emptyAcquireCount: p.metrics.emptyAcquireCount, + canceledAcquireCount: p.metrics.canceledAcquireCount.Load(), + acquireDuration: p.metrics.acquireDuration, } for _, res := range p.allResources { @@ -329,7 +332,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] { func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) { select { case <-ctx.Done(): - p.canceledAcquireCount.Add(1) + p.metrics.observeAcquireCancel() return nil, ctx.Err() default: } @@ -349,7 +352,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { waitedForLock = true err := p.acquireSem.Acquire(ctx, 1) if err != nil { - p.canceledAcquireCount.Add(1) + p.metrics.observeAcquireCancel() return nil, err } } @@ -363,11 +366,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { // If a resource is available in the pool. if res := p.tryAcquireIdleResource(); res != nil { - if waitedForLock { - p.emptyAcquireCount += 1 - } - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) + d := time.Duration(nanotime() - startNano) + p.metrics.observeAcquireDuration(d, waitedForLock) p.mux.Unlock() return res, nil } @@ -389,9 +389,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) { p.mux.Lock() defer p.mux.Unlock() - p.emptyAcquireCount += 1 - p.acquireCount += 1 - p.acquireDuration += time.Duration(nanotime() - startNano) + d := time.Duration(nanotime() - startNano) + p.metrics.observeAcquireDuration(d, true) return res, nil } @@ -414,7 +413,7 @@ func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Res // The resource won't be acquired because its // construction failed. We have to allow someone else to - // take that resouce. + // take that resource. p.acquireSem.Release(1) p.mux.Unlock() @@ -444,7 +443,7 @@ func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Res select { case <-ctx.Done(): - p.canceledAcquireCount.Add(1) + p.metrics.observeAcquireCancel() return nil, ctx.Err() case err := <-constructErrChan: if err != nil { @@ -472,7 +471,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) { // If a resource is available now if res := p.tryAcquireIdleResource(); res != nil { - p.acquireCount += 1 + p.metrics.observeAcquireDuration(0, false) return res, nil } @@ -652,7 +651,7 @@ func (p *Pool[T]) Reset() { } } -// releaseAcquiredResource returns res to the the pool. +// releaseAcquiredResource returns res to the pool. func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) { p.mux.Lock() defer p.mux.Unlock()