From f3d3a2eb944726d9a944777bcdf2ae32c99040f1 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Mon, 9 Jun 2025 13:36:50 -0600 Subject: [PATCH 1/7] GODRIVER-3419 Sketch out pattern --- x/mongo/driver/topology/pool.go | 148 ++-------------------------- x/mongo/driver/topology/pool.tmp.go | 81 +++++++++++++++ 2 files changed, 87 insertions(+), 142 deletions(-) create mode 100644 x/mongo/driver/topology/pool.tmp.go diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 162bb9c1af..64a3a9b85b 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -128,6 +128,8 @@ type pool struct { idleConns []*connection // idleConns holds all idle connections. idleConnWait wantConnQueue // idleConnWait holds all wantConn requests for idle connections. connectTimeout time.Duration + + connectionSem chan struct{} } // getState returns the current state of the pool. Callers must not hold the stateMu lock. @@ -226,6 +228,7 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool { conns: make(map[int64]*connection, config.MaxPoolSize), idleConns: make([]*connection, 0, config.MaxPoolSize), connectTimeout: config.ConnectTimeout, + connectionSem: make(chan struct{}, maxConnecting), } // minSize must not exceed maxSize if maxSize is not 0 if pool.maxSize != 0 && pool.minSize > pool.maxSize { @@ -241,11 +244,6 @@ func newPool(config poolConfig, connOpts ...ConnectionOption) *pool { var ctx context.Context ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background()) - for i := 0; i < int(pool.maxConnecting); i++ { - pool.backgroundDone.Add(1) - go pool.createConnections(ctx, pool.backgroundDone) - } - // If maintainInterval is not positive, don't start the maintain() goroutine. Expect that // negative values are only used in testing; this config value is not user-configurable. if maintainInterval > 0 { @@ -1125,6 +1123,9 @@ func (p *pool) queueForNewConn(w *wantConn) { p.newConnWait.cleanFront() p.newConnWait.pushBack(w) p.createConnectionsCond.Signal() + + // Try to spawn without blocking the caller. + go p.spawnConnectionIfNeeded() } func (p *pool) totalConnectionCount() int { @@ -1141,143 +1142,6 @@ func (p *pool) availableConnectionCount() int { return len(p.idleConns) } -// createConnections creates connections for wantConn requests on the newConnWait queue. -func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) { - defer wg.Done() - - // condition returns true if the createConnections() loop should continue and false if it should - // wait. Note that the condition also listens for Context cancellation, which also causes the - // loop to continue, allowing for a subsequent check to return from createConnections(). - condition := func() bool { - checkOutWaiting := p.newConnWait.len() > 0 - poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize - cancelled := ctx.Err() != nil - return (checkOutWaiting && poolHasSpace) || cancelled - } - - // wait waits for there to be an available wantConn and for the pool to have space for a new - // connection. When the condition becomes true, it creates a new connection and returns the - // waiting wantConn and new connection. If the Context is cancelled or there are any - // errors, wait returns with "ok = false". - wait := func() (*wantConn, *connection, bool) { - p.createConnectionsCond.L.Lock() - defer p.createConnectionsCond.L.Unlock() - - for !condition() { - p.createConnectionsCond.Wait() - } - - if ctx.Err() != nil { - return nil, nil, false - } - - p.newConnWait.cleanFront() - w := p.newConnWait.popFront() - if w == nil { - return nil, nil, false - } - - conn := newConnection(p.address, p.connOpts...) - conn.pool = p - conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1) - p.conns[conn.driverConnectionID] = conn - - return w, conn, true - } - - for ctx.Err() == nil { - w, conn, ok := wait() - if !ok { - continue - } - - if mustLogPoolMessage(p) { - keysAndValues := logger.KeyValues{ - logger.KeyDriverConnectionID, conn.driverConnectionID, - } - - logPoolMessage(p, logger.ConnectionCreated, keysAndValues...) - } - - if p.monitor != nil { - p.monitor.Event(&event.PoolEvent{ - Type: event.ConnectionCreated, - Address: p.address.String(), - ConnectionID: conn.driverConnectionID, - }) - } - - start := time.Now() - // Pass the createConnections context to connect to allow pool close to - // cancel connection establishment so shutdown doesn't block indefinitely if - // connectTimeout=0. - // - // Per the specifications, an explicit value of connectTimeout=0 means the - // timeout is "infinite". - - var cancel context.CancelFunc - - connctx := context.Background() - if p.connectTimeout != 0 { - connctx, cancel = context.WithTimeout(ctx, p.connectTimeout) - } - - err := conn.connect(connctx) - - if cancel != nil { - cancel() - } - - if err != nil { - w.tryDeliver(nil, err) - - // If there's an error connecting the new connection, call the handshake error handler - // that implements the SDAM handshake error handling logic. This must be called after - // delivering the connection error to the waiting wantConn. If it's called before, the - // handshake error handler may clear the connection pool, leading to a different error - // message being delivered to the same waiting wantConn in idleConnWait when the wait - // queues are cleared. - if p.handshakeErrFn != nil { - p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID) - } - - _ = p.removeConnection(conn, reason{ - loggerConn: logger.ReasonConnClosedError, - event: event.ReasonError, - }, err) - - _ = p.closeConnection(conn) - - continue - } - - duration := time.Since(start) - if mustLogPoolMessage(p) { - keysAndValues := logger.KeyValues{ - logger.KeyDriverConnectionID, conn.driverConnectionID, - logger.KeyDurationMS, duration.Milliseconds(), - } - - logPoolMessage(p, logger.ConnectionReady, keysAndValues...) - } - - if p.monitor != nil { - p.monitor.Event(&event.PoolEvent{ - Type: event.ConnectionReady, - Address: p.address.String(), - ConnectionID: conn.driverConnectionID, - Duration: duration, - }) - } - - if w.tryDeliver(conn, nil) { - continue - } - - _ = p.checkInNoEvent(conn) - } -} - func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() diff --git a/x/mongo/driver/topology/pool.tmp.go b/x/mongo/driver/topology/pool.tmp.go new file mode 100644 index 0000000000..1d3d5f46d1 --- /dev/null +++ b/x/mongo/driver/topology/pool.tmp.go @@ -0,0 +1,81 @@ +package topology + +import ( + "context" + "sync/atomic" +) + +func (p *pool) spawnConnection(ctx context.Context, w *wantConn, conn *connection) { + //// Relase slot when done. + //defer func() { <-p.sem }() + + //// Simulate connection setup delay + //time.Sleep(1000 * time.Millisecond) + + //// Register new conenction + //p.mu.Lock() + //p.nextID++ + //conn := &conn{id: p.nextID} + //p.conns[conn.id] = conn + //p.mu.Unlock() + + //w.ready <- conn // Notify the waiting goroutine that the connection is ready +} + +// hasSpace checks if the pool has space for a new connection. +func (p *pool) hasSpace() bool { + return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize +} + +// checkOutWaiting checks if there are any waiting connections that need to be +// checked out. +func (p *pool) checkOutWaiting() bool { + return p.newConnWait.len() > 0 +} + +// waitForNewConn blocks until there's both work and room in the pool (or the +// context is canceled) then pops exactly one wantconn and creates+registes its +// connection. +func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) { + p.createConnectionsCond.L.Lock() + defer p.createConnectionsCond.L.Unlock() + + for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil { + p.createConnectionsCond.Wait() + } + + if ctx.Err() != nil { + return nil, nil, false + } + + p.newConnWait.cleanFront() + w := p.newConnWait.popFront() + if w == nil { + return nil, nil, false + } + + conn := newConnection(p.address, p.connOpts...) + conn.pool = p + conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1) + p.conns[conn.driverConnectionID] = conn + + return w, conn, true +} + +// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its +// connection creation subject to the semaphore limit. +func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { + // Block until we're allowed to start another connection. + p.connectionSem <- struct{}{} + + // Wait on pool space & context. + w, conn, ok := p.waitForNewConn(ctx) + if !ok { + <-p.connectionSem // Release slot on failure. + + return + } + + // Check out connection in background as non-blocking. + go p.spawnConnection(ctx, w, conn) +} From 58f5fa5b8f359f6afc22e4af5b14eebe8e02b0b0 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 11 Jun 2025 13:12:40 -0600 Subject: [PATCH 2/7] GODRIVER-3419 Mark spawnConnection as TODO --- x/mongo/driver/topology/pool.tmp.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/x/mongo/driver/topology/pool.tmp.go b/x/mongo/driver/topology/pool.tmp.go index 1d3d5f46d1..84df857ab3 100644 --- a/x/mongo/driver/topology/pool.tmp.go +++ b/x/mongo/driver/topology/pool.tmp.go @@ -6,20 +6,7 @@ import ( ) func (p *pool) spawnConnection(ctx context.Context, w *wantConn, conn *connection) { - //// Relase slot when done. - //defer func() { <-p.sem }() - - //// Simulate connection setup delay - //time.Sleep(1000 * time.Millisecond) - - //// Register new conenction - //p.mu.Lock() - //p.nextID++ - //conn := &conn{id: p.nextID} - //p.conns[conn.id] = conn - //p.mu.Unlock() - - //w.ready <- conn // Notify the waiting goroutine that the connection is ready + // TODO } // hasSpace checks if the pool has space for a new connection. From 7561f454bcd0131ceb9630bdff791c24e2d31d4c Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Wed, 11 Jun 2025 19:39:44 -0600 Subject: [PATCH 3/7] GODRIVER-3419 Remove tmp file --- x/mongo/driver/topology/pool.go | 154 +++++++++++++++++++++++++++- x/mongo/driver/topology/pool.tmp.go | 68 ------------ 2 files changed, 150 insertions(+), 72 deletions(-) delete mode 100644 x/mongo/driver/topology/pool.tmp.go diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 64a3a9b85b..fbfd614f9c 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -596,7 +596,7 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) { // If we didn't get an immediately available idle connection, also get in the queue for a new // connection while we're waiting for an idle connection. - p.queueForNewConn(w) + p.queueForNewConn(ctx, w) p.stateMu.RUnlock() // Wait for either the wantConn to be ready or for the Context to time out. @@ -1116,7 +1116,7 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool { return false } -func (p *pool) queueForNewConn(w *wantConn) { +func (p *pool) queueForNewConn(ctx context.Context, w *wantConn) { p.createConnectionsCond.L.Lock() defer p.createConnectionsCond.L.Unlock() @@ -1125,7 +1125,7 @@ func (p *pool) queueForNewConn(w *wantConn) { p.createConnectionsCond.Signal() // Try to spawn without blocking the caller. - go p.spawnConnectionIfNeeded() + go p.spawnConnectionIfNeeded(ctx) } func (p *pool) totalConnectionCount() int { @@ -1216,7 +1216,7 @@ func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) { for i := 0; i < n; i++ { w := newWantConn() - p.queueForNewConn(w) + p.queueForNewConn(ctx, w) wantConns = append(wantConns, w) // Start a goroutine for each new wantConn, waiting for it to be ready. @@ -1403,3 +1403,149 @@ func (q *wantConnQueue) cleanFront() { q.popFront() } } + +func (p *pool) spawnConnection(w *wantConn, conn *connection) { + defer func() { <-p.connectionSem }() // Release slot when done, see maxConnecting. + + // Perform dial/handshake with optional timeout. + start := time.Now() + + // Pass the createConnections context to connect to allow pool close to + // cancel connection establishment so shutdown doesn't block indefinitely if + // connectTimeout=0. + // + // Per the specifications, an explicit value of connectTimeout=0 means the + // timeout is "infinite". + dialCtx := context.Background() + var cancel context.CancelFunc + if p.connectTimeout > 0 { + dialCtx, cancel = context.WithTimeout(dialCtx, p.connectTimeout) + defer cancel() + } + + err := conn.connect(dialCtx) + + if err != nil { + // Deliver error and run SDAM handshake error logic + w.tryDeliver(nil, err) + + // If there's an error connecting the new connection, call the handshake error handler + // that implements the SDAM handshake error handling logic. This must be called after + // delivering the connection error to the waiting wantConn. If it's called before, the + // handshake error handler may clear the connection pool, leading to a different error + // message being delivered to the same waiting wantConn in idleConnWait when the wait + // queues are cleared. + if p.handshakeErrFn != nil { + p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID) + } + + _ = p.removeConnection(conn, reason{ + loggerConn: logger.ReasonConnClosedError, + event: event.ReasonError, + }, err) + + _ = p.closeConnection(conn) + + return + } + + // emit "ConnectionReady" + duration := time.Since(start) + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + logger.KeyDurationMS, duration.Milliseconds(), + } + + logPoolMessage(p, logger.ConnectionReady, keysAndValues...) + } + + if p.monitor != nil { + p.monitor.Event(&event.PoolEvent{ + Type: event.ConnectionReady, + Address: p.address.String(), + ConnectionID: conn.driverConnectionID, + Duration: duration, + }) + } + + // deliver the connection or check it back in on spurious wakeup + if !w.tryDeliver(conn, nil) { + _ = p.checkInNoEvent(conn) + } +} + +// hasSpace checks if the pool has space for a new connection. +func (p *pool) hasSpace() bool { + return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize +} + +// checkOutWaiting checks if there are any waiting connections that need to be +// checked out. +func (p *pool) checkOutWaiting() bool { + return p.newConnWait.len() > 0 +} + +// waitForNewConn blocks until there's both work and room in the pool (or the +// context is canceled) then pops exactly one wantconn and creates+registers its +// connection. +func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) { + p.createConnectionsCond.L.Lock() + defer p.createConnectionsCond.L.Unlock() + + for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil { + p.createConnectionsCond.Wait() + } + + if ctx.Err() != nil { + return nil, nil, false + } + + p.newConnWait.cleanFront() + w := p.newConnWait.popFront() + if w == nil { + return nil, nil, false + } + + conn := newConnection(p.address, p.connOpts...) + conn.pool = p + conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1) + p.conns[conn.driverConnectionID] = conn + + return w, conn, true +} + +// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its +// connection creation subject to the semaphore limit. +func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { + // Block until we're allowed to start another connection. + p.connectionSem <- struct{}{} + + // Wait on pool space & context. + w, conn, ok := p.waitForNewConn(ctx) + if !ok { + <-p.connectionSem // Release slot on failure. + + return + } + + // Emit "ConnectionCreated" + if mustLogPoolMessage(p) { + keysAndValues := logger.KeyValues{ + logger.KeyDriverConnectionID, conn.driverConnectionID, + } + + logPoolMessage(p, logger.ConnectionCreated, keysAndValues...) + } + + if p.monitor != nil { + p.monitor.Event(&event.PoolEvent{ + Type: event.ConnectionCreated, + Address: p.address.String(), + ConnectionID: conn.driverConnectionID, + }) + } + + // Dial the connection and spawn it in the background. + go p.spawnConnection(w, conn) +} diff --git a/x/mongo/driver/topology/pool.tmp.go b/x/mongo/driver/topology/pool.tmp.go deleted file mode 100644 index 84df857ab3..0000000000 --- a/x/mongo/driver/topology/pool.tmp.go +++ /dev/null @@ -1,68 +0,0 @@ -package topology - -import ( - "context" - "sync/atomic" -) - -func (p *pool) spawnConnection(ctx context.Context, w *wantConn, conn *connection) { - // TODO -} - -// hasSpace checks if the pool has space for a new connection. -func (p *pool) hasSpace() bool { - return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize -} - -// checkOutWaiting checks if there are any waiting connections that need to be -// checked out. -func (p *pool) checkOutWaiting() bool { - return p.newConnWait.len() > 0 -} - -// waitForNewConn blocks until there's both work and room in the pool (or the -// context is canceled) then pops exactly one wantconn and creates+registes its -// connection. -func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool) { - p.createConnectionsCond.L.Lock() - defer p.createConnectionsCond.L.Unlock() - - for !(p.checkOutWaiting() && p.hasSpace()) && ctx.Err() == nil { - p.createConnectionsCond.Wait() - } - - if ctx.Err() != nil { - return nil, nil, false - } - - p.newConnWait.cleanFront() - w := p.newConnWait.popFront() - if w == nil { - return nil, nil, false - } - - conn := newConnection(p.address, p.connOpts...) - conn.pool = p - conn.driverConnectionID = atomic.AddInt64(&p.nextID, 1) - p.conns[conn.driverConnectionID] = conn - - return w, conn, true -} - -// spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its -// connection creation subject to the semaphore limit. -func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { - // Block until we're allowed to start another connection. - p.connectionSem <- struct{}{} - - // Wait on pool space & context. - w, conn, ok := p.waitForNewConn(ctx) - if !ok { - <-p.connectionSem // Release slot on failure. - - return - } - - // Check out connection in background as non-blocking. - go p.spawnConnection(ctx, w, conn) -} From 36f1afff50c92707656b24b4ed448936c0c446d4 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 13 Jun 2025 11:25:46 -0600 Subject: [PATCH 4/7] GODRIVER-3419 Clean up comments --- x/mongo/driver/topology/pool.go | 37 ++++++++++++++++----------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index fbfd614f9c..9c7e6f468c 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -1404,18 +1404,19 @@ func (q *wantConnQueue) cleanFront() { } } +// spawnConnection establishes a new connection and delivers it to a waiting +// request. It handles dialing, handshaking, and error handling. This function +// is intended to be run in its own goroutine. func (p *pool) spawnConnection(w *wantConn, conn *connection) { - defer func() { <-p.connectionSem }() // Release slot when done, see maxConnecting. + // Release a slot from the connection semaphore when this function returns. + // This ensures that another connection can be spawned. + defer func() { <-p.connectionSem }() - // Perform dial/handshake with optional timeout. + // Record the start time to calculate the total connection setup duration. start := time.Now() - // Pass the createConnections context to connect to allow pool close to - // cancel connection establishment so shutdown doesn't block indefinitely if - // connectTimeout=0. - // - // Per the specifications, an explicit value of connectTimeout=0 means the - // timeout is "infinite". + // Create a context for the dial operation. If a connection timeout is + // configured, the context will be set to time out after that duration. dialCtx := context.Background() var cancel context.CancelFunc if p.connectTimeout > 0 { @@ -1423,18 +1424,14 @@ func (p *pool) spawnConnection(w *wantConn, conn *connection) { defer cancel() } - err := conn.connect(dialCtx) - - if err != nil { - // Deliver error and run SDAM handshake error logic + // Attempt to connect + if err := conn.connect(dialCtx); err != nil { + // If connection fails, deliver the error to the waiting requester. w.tryDeliver(nil, err) - // If there's an error connecting the new connection, call the handshake error handler - // that implements the SDAM handshake error handling logic. This must be called after - // delivering the connection error to the waiting wantConn. If it's called before, the - // handshake error handler may clear the connection pool, leading to a different error - // message being delivered to the same waiting wantConn in idleConnWait when the wait - // queues are cleared. + // If a handshake error handler is defined, invoke it to handle SDAM state + // changes. This is done after delivering the error to prevent race + // conditions where the pool might be cleared before the error is delivered. if p.handshakeErrFn != nil { p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID) } @@ -1475,7 +1472,9 @@ func (p *pool) spawnConnection(w *wantConn, conn *connection) { } } -// hasSpace checks if the pool has space for a new connection. +// hasSpace checks if the pool has space for a new connection. It returns +// "true" if the maximum size is unlimited (0) or if the current number of +// connections is less than the maximum size. func (p *pool) hasSpace() bool { return p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize } From 76bf805e61555189f6d1ac04c02ac2804ae4ec1b Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 19:08:15 -0600 Subject: [PATCH 5/7] queue new connection when removing one to prevent stalled checkOut --- x/mongo/driver/topology/pool.go | 33 ++++++++++++++++++- x/mongo/driver/topology/pool_test.go | 48 ++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index dcf66d1ff7..65a7d3a88b 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -934,6 +934,18 @@ func (p *pool) checkInNoEvent(conn *connection) error { go func() { _ = p.closeConnection(conn) }() + + // Since we are removing the connection, we should try to queue another + // in good faith in case the current idle wait queue is being awaited + // in a checkOut() call. + p.createConnectionsCond.L.Lock() + w := p.newConnWait.popFront() + p.createConnectionsCond.L.Unlock() + + if w != nil { + p.queueForNewConn(context.Background(), w) + } + return nil } @@ -1128,15 +1140,29 @@ func (p *pool) getOrQueueForIdleConn(w *wantConn) bool { return false } +// queueForNewConn enqueues a checkout request and signals the +// connection-creation state machine. It does NOT initiate dialing directly, +// but places the wantConn into the pending queue and wakes a background worker +// using sync.Cond. That worker will then dequeue in FIFO order and perform the +// actual dial under it's own synchronization, preserving order. func (p *pool) queueForNewConn(ctx context.Context, w *wantConn) { p.createConnectionsCond.L.Lock() defer p.createConnectionsCond.L.Unlock() + // Remove any wantConn entries at the front that are no longer waiting. This + // keeps the queue clean and avoids delivering to canceled requests. p.newConnWait.cleanFront() + + // Enqueu this wantConn for allocation of a new connection. p.newConnWait.pushBack(w) + + // Signale on goroutine waiting in waitForNewConn that pool state changed and + // new wantConn is available. That goroutine will then dequeue under lock. p.createConnectionsCond.Signal() - // Try to spawn without blocking the caller. + // Spawn a background worker to service the queue without blocking callers. We + // do NOT pass "w" here because the worker must re-acquite the queue lock and + // pick the next available wantConn in FIFO order via waitForNewConn. go p.spawnConnectionIfNeeded(ctx) } @@ -1529,6 +1555,11 @@ func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool // spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its // connection creation subject to the semaphore limit. func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { + if !p.hasSpace() { + // If the pool is full, we can't spawn a new connection. + return + } + // Block until we're allowed to start another connection. p.connectionSem <- struct{}{} diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go index 17e803ea49..52b5ef7d01 100644 --- a/x/mongo/driver/topology/pool_test.go +++ b/x/mongo/driver/topology/pool_test.go @@ -11,6 +11,7 @@ import ( "errors" "net" "regexp" + "runtime" "sync" "testing" "time" @@ -1608,3 +1609,50 @@ func TestPool_Error(t *testing.T) { p.close(context.Background()) }) } + +// Test that if the pool is already at MaxPoolSize, a flood of checkOuts with +// a background context spins up unbounded goroutines. +func TestPool_unboundedGoroutines(t *testing.T) { + // Start a server that never response so no connection ever frees up. + addr := bootstrapConnections(t, 1, func(net.Conn) { + <-make(chan struct{}) + }) + + // Create pool with exactly 1 slot and 1 dial slot. + p := newPool(poolConfig{ + Address: address.Address(addr.String()), + MaxPoolSize: 1, + MaxConnecting: 1, + ConnectTimeout: defaultConnectionTimeout, + }) + require.NoError(t, p.ready(), "pool ready error") + + // Drain the only connection so the pool is full. + c, err := p.checkOut(context.Background()) + require.NoError(t, err, "checkOut error") + + defer func() { + _ = p.checkIn(c) + p.close(context.Background()) + }() + + // Snapshot base goroutine count. + before := runtime.NumGoroutine() + + // Flood with N background checkOuts + const N = 100 + for i := 0; i < N; i++ { + go func() { + _, _ = p.checkOut(context.Background()) + }() + } + + // Give them a moment to spin up + time.Sleep(1000 * time.Millisecond) + + after := runtime.NumGoroutine() + delta := after - before - N + + assert.LessOrEqual(t, delta, int(p.maxConnecting)) + +} From 15b5a476c88fff0a603edc8c9d02b7b17bb279f7 Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 19:11:53 -0600 Subject: [PATCH 6/7] clarify why we are guarding conn spawn --- x/mongo/driver/topology/pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 65a7d3a88b..4164aefa15 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -1556,7 +1556,8 @@ func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool // connection creation subject to the semaphore limit. func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { if !p.hasSpace() { - // If the pool is full, we can't spawn a new connection. + // If the pool is full, we can't spawn a new connection. This guard prevents + // spawning an unbound number of goroutines. return } From e5918e5cba7269e0743b163d5e227bc7aa29a17d Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 11 Jul 2025 19:44:12 -0600 Subject: [PATCH 7/7] Resolve race conflict --- x/mongo/driver/topology/pool.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go index 4164aefa15..34c3c604d6 100644 --- a/x/mongo/driver/topology/pool.go +++ b/x/mongo/driver/topology/pool.go @@ -1555,7 +1555,11 @@ func (p *pool) waitForNewConn(ctx context.Context) (*wantConn, *connection, bool // spawnConnectionIfNeeded takes on waiting waitConn (if any) and starts its // connection creation subject to the semaphore limit. func (p *pool) spawnConnectionIfNeeded(ctx context.Context) { - if !p.hasSpace() { + p.createConnectionsCond.L.Lock() + openSlot := p.hasSpace() + p.createConnectionsCond.L.Unlock() + + if !openSlot { // If the pool is full, we can't spawn a new connection. This guard prevents // spawning an unbound number of goroutines. return