Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package puddle

import (
"sync/atomic"
"time"
)

// AcquireObserver is called after successful resource acquisition.
// When using AcquireObserver, make sure it executes as possible in
// order not to downgrade overall pool performance.
type AcquireObserver func(d time.Duration, isEmptyAcquire bool)

// Metrics hold puddle metrics.
type Metrics struct {
// externalAcquireObserver is an optional callback func that user
// may pass to enable extended monitoring, e.g. prom histogram measurement.
externalAcquireObserver AcquireObserver

acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount atomic.Int64
}

func NewMetrics(opts ...MetricsOption) *Metrics {
var m Metrics
for _, o := range opts {
o(&m)
}
return &m
}

func (m *Metrics) observeAcquireCancel() {
m.canceledAcquireCount.Add(1)
}

// observeAcquireDuration is not thread safe.
func (m *Metrics) observeAcquireDuration(d time.Duration, isEmptyAcquire bool) {
m.acquireCount++
m.acquireDuration += d
if isEmptyAcquire {
m.emptyAcquireCount++
}

if m.externalAcquireObserver != nil {
m.externalAcquireObserver(d, isEmptyAcquire)
}
}

type MetricsOption func(m *Metrics)

// WithExternalAcquireObserver sets optional callback function for duration observation.
func WithExternalAcquireObserver(observer AcquireObserver) MetricsOption {
return func(m *Metrics) { m.externalAcquireObserver = observer }
}
69 changes: 34 additions & 35 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/jackc/puddle/v2/internal/genstack"
"golang.org/x/sync/semaphore"

"github.com/jackc/puddle/v2/internal/genstack"
)

const (
Expand Down Expand Up @@ -132,15 +132,12 @@ type Pool[T any] struct {
allResources resList[T]
idleResources *genstack.GenStack[*Resource[T]]

metrics *Metrics

constructor Constructor[T]
destructor Destructor[T]
maxSize int32

acquireCount int64
acquireDuration time.Duration
emptyAcquireCount int64
canceledAcquireCount atomic.Int64

resetCount int

baseAcquireCtx context.Context
Expand All @@ -149,6 +146,7 @@ type Pool[T any] struct {
}

type Config[T any] struct {
Metrics *Metrics
Constructor Constructor[T]
Destructor Destructor[T]
MaxSize int32
Expand All @@ -160,17 +158,22 @@ func NewPool[T any](config *Config[T]) (*Pool[T], error) {
return nil, errors.New("MaxSize must be >= 1")
}

baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
p := &Pool[T]{
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
idleResources: genstack.NewGenStack[*Resource[T]](),
maxSize: config.MaxSize,
constructor: config.Constructor,
destructor: config.Destructor,
metrics: config.Metrics,
}

p.baseAcquireCtx, p.cancelBaseAcquireCtx = context.WithCancel(context.Background())

return &Pool[T]{
acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
idleResources: genstack.NewGenStack[*Resource[T]](),
maxSize: config.MaxSize,
constructor: config.Constructor,
destructor: config.Destructor,
baseAcquireCtx: baseAcquireCtx,
cancelBaseAcquireCtx: cancelBaseAcquireCtx,
}, nil
if p.metrics == nil {
p.metrics = NewMetrics()
}

return p, nil
}

// Close destroys all resources in the pool and rejects future Acquire calls.
Expand Down Expand Up @@ -264,10 +267,10 @@ func (p *Pool[T]) Stat() *Stat {

s := &Stat{
maxResources: p.maxSize,
acquireCount: p.acquireCount,
emptyAcquireCount: p.emptyAcquireCount,
canceledAcquireCount: p.canceledAcquireCount.Load(),
acquireDuration: p.acquireDuration,
acquireCount: p.metrics.acquireCount,
emptyAcquireCount: p.metrics.emptyAcquireCount,
canceledAcquireCount: p.metrics.canceledAcquireCount.Load(),
acquireDuration: p.metrics.acquireDuration,
}

for _, res := range p.allResources {
Expand Down Expand Up @@ -329,7 +332,7 @@ func (p *Pool[T]) createNewResource() *Resource[T] {
func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
select {
case <-ctx.Done():
p.canceledAcquireCount.Add(1)
p.metrics.observeAcquireCancel()
return nil, ctx.Err()
default:
}
Expand All @@ -349,7 +352,7 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
waitedForLock = true
err := p.acquireSem.Acquire(ctx, 1)
if err != nil {
p.canceledAcquireCount.Add(1)
p.metrics.observeAcquireCancel()
return nil, err
}
}
Expand All @@ -363,11 +366,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {

// If a resource is available in the pool.
if res := p.tryAcquireIdleResource(); res != nil {
if waitedForLock {
p.emptyAcquireCount += 1
}
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
d := time.Duration(nanotime() - startNano)
p.metrics.observeAcquireDuration(d, waitedForLock)
p.mux.Unlock()
return res, nil
}
Expand All @@ -389,9 +389,8 @@ func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
p.mux.Lock()
defer p.mux.Unlock()

p.emptyAcquireCount += 1
p.acquireCount += 1
p.acquireDuration += time.Duration(nanotime() - startNano)
d := time.Duration(nanotime() - startNano)
p.metrics.observeAcquireDuration(d, true)

return res, nil
}
Expand All @@ -414,7 +413,7 @@ func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Res

// The resource won't be acquired because its
// construction failed. We have to allow someone else to
// take that resouce.
// take that resource.
p.acquireSem.Release(1)
p.mux.Unlock()

Expand Down Expand Up @@ -444,7 +443,7 @@ func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Res

select {
case <-ctx.Done():
p.canceledAcquireCount.Add(1)
p.metrics.observeAcquireCancel()
return nil, ctx.Err()
case err := <-constructErrChan:
if err != nil {
Expand Down Expand Up @@ -472,7 +471,7 @@ func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {

// If a resource is available now
if res := p.tryAcquireIdleResource(); res != nil {
p.acquireCount += 1
p.metrics.observeAcquireDuration(0, false)
return res, nil
}

Expand Down Expand Up @@ -652,7 +651,7 @@ func (p *Pool[T]) Reset() {
}
}

// releaseAcquiredResource returns res to the the pool.
// releaseAcquiredResource returns res to the pool.
func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
p.mux.Lock()
defer p.mux.Unlock()
Expand Down