From e60c54d531daaf41058c8ba294011b82b597d2b1 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 01:52:52 +0200 Subject: [PATCH 1/8] Add test exposing keepalive bug --- tests/_async/test_connection_pool.py | 62 ++++++++++++++++++++++++++++ tests/_sync/test_connection_pool.py | 62 ++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2fc272049..2b95e575a 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -830,3 +830,65 @@ async def trace(name, kwargs): "http11.response_closed.started", "http11.response_closed.complete", ] + + +@pytest.mark.anyio +async def test_keepalive_idle_connections(): + """ + With max_keepalive_connections=1 and max_connections=5, after making 2 requests + and reading 1 response, we should have 1 IDLE and 1 ACTIVE connection. + The IDLE connection should NOT be closed because we're under the keepalive limit. + """ + network_backend = httpcore.AsyncMockBackend( + [ + # First request/response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + # Second request/response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + max_connections=5, # Allow multiple connections + max_keepalive_connections=1, # But only keep 1 idle + keepalive_expiry=10.0, # Long timeout to avoid expiry issues + http1=True, + http2=False, + ) as pool: + async with pool.stream("GET", "https://example.com/") as response1: + async with pool.stream("GET", "https://example.com/") as response2: + # At this point, both connections are ACTIVE and both requests are ACTIVE + assert ( + repr(pool) + == "" + ) + # CRITICAL: Must read response to allow h11 state machine to progress to DONE + await response2.aread() + + # After finishing one request, we should have: + # - 1 ACTIVE connection + # - 1 IDLE connection + # The IDLE connection should NOT be closed because idle_count (1) <= max_keepalive_connections (1) + assert ( + repr(pool) + == "" + ) + + # Read response to allow state machine to progress to DONE + await response1.aread() + + # After both responses are complete, we have 2 idle connections + # but the cleanup logic should close 1 to respect max_keepalive_connections=1 + assert ( + repr(pool) + == "" + ) diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index ee303e5cf..495c5e19b 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -830,3 +830,65 @@ def trace(name, kwargs): "http11.response_closed.started", "http11.response_closed.complete", ] + + + +def test_keepalive_idle_connections(): + """ + With max_keepalive_connections=1 and max_connections=5, after making 2 requests + and reading 1 response, we should have 1 IDLE and 1 ACTIVE connection. + The IDLE connection should NOT be closed because we're under the keepalive limit. + """ + network_backend = httpcore.MockBackend( + [ + # First request/response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + # Second request/response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + + with httpcore.ConnectionPool( + network_backend=network_backend, + max_connections=5, # Allow multiple connections + max_keepalive_connections=1, # But only keep 1 idle + keepalive_expiry=10.0, # Long timeout to avoid expiry issues + http1=True, + http2=False, + ) as pool: + with pool.stream("GET", "https://example.com/") as response1: + with pool.stream("GET", "https://example.com/") as response2: + # At this point, both connections are ACTIVE and both requests are ACTIVE + assert ( + repr(pool) + == "" + ) + # CRITICAL: Must read response to allow h11 state machine to progress to DONE + response2.read() + + # After finishing one request, we should have: + # - 1 ACTIVE connection + # - 1 IDLE connection + # The IDLE connection should NOT be closed because idle_count (1) <= max_keepalive_connections (1) + assert ( + repr(pool) + == "" + ) + + # Read response to allow state machine to progress to DONE + response1.read() + + # After both responses are complete, we have 2 idle connections + # but the cleanup logic should close 1 to respect max_keepalive_connections=1 + assert ( + repr(pool) + == "" + ) From 18271de8d9e907f34cd594efbe1b064f6402a7f5 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 03:18:35 +0200 Subject: [PATCH 2/8] Add test demonstrating double assignment bug --- tests/_async_only/test_connection_pool.py | 102 ++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 tests/_async_only/test_connection_pool.py diff --git a/tests/_async_only/test_connection_pool.py b/tests/_async_only/test_connection_pool.py new file mode 100644 index 000000000..d25b3ed71 --- /dev/null +++ b/tests/_async_only/test_connection_pool.py @@ -0,0 +1,102 @@ +import anyio +import pytest + +import httpcore +from httpcore._models import ( + enforce_bytes, + enforce_headers, + enforce_url, + include_request_headers, +) + + +@pytest.mark.anyio +async def test_available_connections_reassigned(): + """ + Setup: max_connections=1, start 3 requests + Expected: 2 should be queued, 1 should be active + + After reading/closing first request: + Expected: 1 active, 1 queued, 1 connection + """ + network_backend = httpcore.AsyncMockBackend( + [ + # First response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + # Second response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + # Third response + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + max_connections=1, # Allow a single concurrent request + max_keepalive_connections=1, + keepalive_expiry=10.0, # Long timeout to avoid expiry issues + http1=True, + http2=False, + ) as pool: + method_str = "GET" + url_str = "https://example.com/" + headers = None + + method = enforce_bytes(method_str, name="method") + url = enforce_url(url_str, name="url") + headers = enforce_headers(headers, name="headers") + headers = include_request_headers(headers, url=url, content=None) + + request1 = httpcore.Request(method, url, headers=headers) + request2 = httpcore.Request(method, url, headers=headers) + request3 = httpcore.Request(method, url, headers=headers) + + # Do the first request + response1 = await pool.handle_async_request(request1) + + # Start requests 2 and 3 as tasks so they get queued but don't block + async with anyio.create_task_group() as tg: + tg.start_soon(pool.handle_async_request, request2) + tg.start_soon(pool.handle_async_request, request3) + + # Give a short time for the tasks to start and for the requests to get added to the queue + await anyio.sleep(0.01) + + # With max_connections=1, we should have: + # - 1 active request (request1) + # - 2 queued requests + # - 1 connection + assert ( + repr(pool) + == "" + ) + + # Read and close the first response + await response1.aread() + await response1.aclose() + + # After finishing the first request, the pool automatically assigns requests + # from the request queue to available connections. + # At this point, we should have: + # - 1 active request (request2 should become active) + # - 1 queued request (request3 should remain queued) + # - 1 connection (same connection, now available for request2) + assert ( + repr(pool) + == "" + ) + + # cancel taskgroup to avoid a hanging test + tg.cancel_scope.cancel() From b7ce985c971389365f2304f8b2a0a6a6a9fe9c5b Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 17:46:21 +0200 Subject: [PATCH 3/8] get_available_stream_capacity for http2 connections --- httpcore/_async/http2.py | 19 +++++++++++++++---- httpcore/_sync/http2.py | 19 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index dbd0beeb4..f8c43dc5a 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -64,6 +64,11 @@ def __init__( self._used_all_stream_ids = False self._connection_error = False + # self._max_streams contains the maximum number of concurrent requests that this connection can handle + # Initially start with just 1 until the remote server provides its max_concurrent_streams value + self._max_streams = 1 + self._concurrent_streams = 0 # Tracks currently active requests. + # Mapping from stream ID to response stream events. self._events: dict[ int, @@ -116,10 +121,6 @@ async def handle_async_request(self, request: Request) -> Response: self._sent_connection_init = True - # Initially start with just 1 until the remote server provides - # its max_concurrent_streams value - self._max_streams = 1 - local_settings_max_streams = ( self._h2_state.local_settings.max_concurrent_streams ) @@ -129,6 +130,7 @@ async def handle_async_request(self, request: Request) -> Response: await self._max_streams_semaphore.acquire() await self._max_streams_semaphore.acquire() + self._concurrent_streams += 1 try: stream_id = self._h2_state.get_next_available_stream_id() @@ -408,6 +410,7 @@ async def _receive_remote_settings_change( async def _response_closed(self, stream_id: int) -> None: await self._max_streams_semaphore.release() + self._concurrent_streams -= 1 del self._events[stream_id] async with self._state_lock: if self._connection_terminated and not self._events: @@ -529,6 +532,14 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._state == HTTPConnectionState.CLOSED + def get_available_stream_capacity(self) -> int: + """ + Return the number of additional streams that can be handled by this connection. + This is useful for determining how many more requests can be sent on this HTTP/2 connection. + Uses the actual SETTINGS_MAX_CONCURRENT_STREAMS negotiated with the server. + """ + return self._max_streams - self._concurrent_streams + def info(self) -> str: origin = str(self._origin) return ( diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index ddcc18900..891ff1d1d 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -64,6 +64,11 @@ def __init__( self._used_all_stream_ids = False self._connection_error = False + # self._max_streams contains the maximum number of concurrent requests that this connection can handle + # Initially start with just 1 until the remote server provides its max_concurrent_streams value + self._max_streams = 1 + self._concurrent_streams = 0 # Tracks currently active requests. + # Mapping from stream ID to response stream events. self._events: dict[ int, @@ -116,10 +121,6 @@ def handle_request(self, request: Request) -> Response: self._sent_connection_init = True - # Initially start with just 1 until the remote server provides - # its max_concurrent_streams value - self._max_streams = 1 - local_settings_max_streams = ( self._h2_state.local_settings.max_concurrent_streams ) @@ -129,6 +130,7 @@ def handle_request(self, request: Request) -> Response: self._max_streams_semaphore.acquire() self._max_streams_semaphore.acquire() + self._concurrent_streams += 1 try: stream_id = self._h2_state.get_next_available_stream_id() @@ -408,6 +410,7 @@ def _receive_remote_settings_change( def _response_closed(self, stream_id: int) -> None: self._max_streams_semaphore.release() + self._concurrent_streams -= 1 del self._events[stream_id] with self._state_lock: if self._connection_terminated and not self._events: @@ -529,6 +532,14 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._state == HTTPConnectionState.CLOSED + def get_available_stream_capacity(self) -> int: + """ + Return the number of additional streams that can be handled by this connection. + This is useful for determining how many more requests can be sent on this HTTP/2 connection. + Uses the actual SETTINGS_MAX_CONCURRENT_STREAMS negotiated with the server. + """ + return self._max_streams - self._concurrent_streams + def info(self) -> str: origin = str(self._origin) return ( From 72d2eb63fe44e322df50c12ad4c7932719930055 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 17:55:08 +0200 Subject: [PATCH 4/8] Rewrite _assign_requests_to_connections to fix bugs and improve perf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Redesign of the connection pool's core request-connection assignment algorithm (_assign_requests_to_connections) fixing bugs and performance issues identified in the original implementation. Fixes: - Fixed incorrect idle connection counting, causing premature connection evictions - Fixed bug where multiple requests could be assigned to the same HTTP/1.1 connection, leading to ConnectionNotAvailable errors Performance improvements: - Reduced time complexity from O(N²+NxM) to O(N+M) where N=connections, M=requests --- httpcore/_async/connection_pool.py | 200 ++++++++++++++++------ httpcore/_sync/connection_pool.py | 200 ++++++++++++++++------ tests/_async_only/test_connection_pool.py | 3 + 3 files changed, 293 insertions(+), 110 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 96e973d0c..0eeddc26b 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -13,6 +13,12 @@ from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface +try: + from .http2 import AsyncHTTP2Connection +except ImportError: + # ImportError happens when the user installed httpcore without the optional http2 dependency + AsyncHTTP2Connection = None # type: ignore[assignment, misc] + class AsyncPoolRequest: def __init__(self, request: Request) -> None: @@ -277,66 +283,150 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: Any closing connections are returned, allowing the I/O for closing those connections to be handled seperately. """ - closing_connections = [] - - # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. - for connection in list(self._connections): - if connection.is_closed(): - # log: "removing closed connection" - self._connections.remove(connection) - elif connection.has_expired(): - # log: "closing expired connection" - self._connections.remove(connection) - closing_connections.append(connection) - elif ( - connection.is_idle() - and len([connection.is_idle() for connection in self._connections]) - > self._max_keepalive_connections - ): - # log: "closing idle connection" - self._connections.remove(connection) - closing_connections.append(connection) - - # Assign queued requests to connections. - queued_requests = [request for request in self._requests if request.is_queued()] - for pool_request in queued_requests: + # Initialize connection buckets + closing_conns: list[AsyncConnectionInterface] = [] + available_conns: list[AsyncConnectionInterface] = [] + occupied_conns: list[AsyncConnectionInterface] = [] + + # Track HTTP/2 connection capacity + http2_conn_stream_capacity: dict[AsyncConnectionInterface, int] = {} + + # Phase 1: Categorize all connections in a single pass + for conn in self._connections: + if conn.is_closed(): + # Closed connections are simply skipped (not added to any bucket) + continue + elif conn.has_expired(): + # Expired connections need to be closed + closing_conns.append(conn) + elif conn.is_available(): + # Available connections + available_conns.append(conn) + # Track HTTP/2 connection capacity + if self._http2 and isinstance(conn, AsyncHTTP2Connection): + # Get the actual available stream count from the connection + http2_conn_stream_capacity[conn] = ( + conn.get_available_stream_capacity() + ) + elif conn.is_idle(): + # Idle but not available (this shouldn't happen, but handle it by closing the connection) + closing_conns.append(conn) + else: + # Occupied connections + occupied_conns.append(conn) + + # Calculate how many new connections we can create + total_existing_connections = ( + len(available_conns) + len(occupied_conns) + len(closing_conns) + ) + new_conns_remaining_count = self._max_connections - total_existing_connections + + # Phase 2: Assign queued requests to connections + for pool_request in self._requests: + if not pool_request.is_queued(): + continue + origin = pool_request.request.url.origin - available_connections = [ - connection - for connection in self._connections - if connection.can_handle_request(origin) and connection.is_available() - ] - idle_connections = [ - connection for connection in self._connections if connection.is_idle() - ] + # Try to find an available connection that can handle this request # There are three cases for how we may be able to handle the request: # - # 1. There is an existing connection that can handle the request. + # 1. There is an existing available connection that can handle the request. # 2. We can create a new connection to handle the request. - # 3. We can close an idle connection and then create a new connection - # to handle the request. - if available_connections: - # log: "reusing existing connection" - connection = available_connections[0] - pool_request.assign_to_connection(connection) - elif len(self._connections) < self._max_connections: - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - - return closing_connections + # 3. We can close an idle connection and then create a new connection to handle the request. + + assigned = False + + # Case 1: try to use an available connection + for i in range(len(available_conns) - 1, -1, -1): + # Loop in reverse order since popping an element from the end of the list is O(1), + # whereas popping from the beginning of the list is O(n) + + conn = available_conns[i] + if conn.can_handle_request(origin): + # Assign the request to this connection + pool_request.assign_to_connection(conn) + + # Handle HTTP/1.1 vs HTTP/2 differently + if self._http2 and conn in http2_conn_stream_capacity: + # HTTP/2: Decrement available capacity + http2_conn_stream_capacity[conn] -= 1 + if http2_conn_stream_capacity[conn] <= 0: + # Move to occupied if no more capacity + available_conns.pop(i) + occupied_conns.append(conn) + del http2_conn_stream_capacity[conn] + else: + # HTTP/1.1: Move to occupied immediately + available_conns.pop(i) + occupied_conns.append(conn) + + assigned = True + break + + if assigned: + continue + + # Case 2: Try to create a new connection + if new_conns_remaining_count > 0: + conn = self.create_connection(origin) + pool_request.assign_to_connection(conn) + # New connections go to occupied (we don't know if HTTP/1.1 or HTTP/2 yet, so assume no multiplexing) + occupied_conns.append(conn) + new_conns_remaining_count -= 1 + continue + + # Case 3, last resort: evict an idle connection and create a new connection + assigned = False + for i in range(len(available_conns) - 1, -1, -1): + # Loop in reverse order since popping an element from the end of the list is O(1), + # whereas popping from the beginning of the list is O(n) + conn = available_conns[i] + if conn.is_idle(): + evicted_conn = available_conns.pop(i) + closing_conns.append(evicted_conn) + # Create new connection for the required origin + conn = self.create_connection(origin) + pool_request.assign_to_connection(conn) + occupied_conns.append(conn) + assigned = True + break + + # All attempts failed: all connections are occupied and we can't create a new one + if not assigned: + # Break out of the loop since no more queued requests can be serviced at this time + break + + # Phase 3: Enforce self._max_keepalive_connections by closing excess idle connections + # + # Only run keepalive enforcement if len(available_conns) > max_keepalive. + # Since idle connections are a subset of available connections, if there are + # fewer available connections than the limit, we cannot possibly violate it. + if len(available_conns) > self._max_keepalive_connections: + keepalive_available_conns: list[AsyncConnectionInterface] = [] + n_idle_conns_kept = 0 + + for conn in available_conns: + if conn.is_idle(): + if n_idle_conns_kept >= self._max_keepalive_connections: + # We've already kept the maximum allowed idle connections, close this one + closing_conns.append(conn) + else: + # Keep this idle connection as we're still under the limit + keepalive_available_conns.append(conn) + n_idle_conns_kept += 1 + else: + # This is an available but not idle connection (active HTTP/2 with capacity) + # Always keep these as they don't count against keepalive limits + keepalive_available_conns.append(conn) + + # Replace available_conns with the filtered list (excess idle connections removed) + available_conns = keepalive_available_conns + + # Rebuild self._connections from all buckets + self._connections = available_conns + occupied_conns + + return closing_conns async def _close_connections(self, closing: list[AsyncConnectionInterface]) -> None: # Close connections which have been removed from the pool. diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 9ccfa53e5..1031e23cb 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -13,6 +13,12 @@ from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface +try: + from .http2 import HTTP2Connection +except ImportError: + # ImportError happens when the user installed httpcore without the optional http2 dependency + HTTP2Connection = None # type: ignore[assignment, misc] + class PoolRequest: def __init__(self, request: Request) -> None: @@ -277,66 +283,150 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: Any closing connections are returned, allowing the I/O for closing those connections to be handled seperately. """ - closing_connections = [] - - # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. - for connection in list(self._connections): - if connection.is_closed(): - # log: "removing closed connection" - self._connections.remove(connection) - elif connection.has_expired(): - # log: "closing expired connection" - self._connections.remove(connection) - closing_connections.append(connection) - elif ( - connection.is_idle() - and len([connection.is_idle() for connection in self._connections]) - > self._max_keepalive_connections - ): - # log: "closing idle connection" - self._connections.remove(connection) - closing_connections.append(connection) - - # Assign queued requests to connections. - queued_requests = [request for request in self._requests if request.is_queued()] - for pool_request in queued_requests: + # Initialize connection buckets + closing_conns: list[ConnectionInterface] = [] + available_conns: list[ConnectionInterface] = [] + occupied_conns: list[ConnectionInterface] = [] + + # Track HTTP/2 connection capacity + http2_conn_stream_capacity: dict[ConnectionInterface, int] = {} + + # Phase 1: Categorize all connections in a single pass + for conn in self._connections: + if conn.is_closed(): + # Closed connections are simply skipped (not added to any bucket) + continue + elif conn.has_expired(): + # Expired connections need to be closed + closing_conns.append(conn) + elif conn.is_available(): + # Available connections + available_conns.append(conn) + # Track HTTP/2 connection capacity + if self._http2 and isinstance(conn, HTTP2Connection): + # Get the actual available stream count from the connection + http2_conn_stream_capacity[conn] = ( + conn.get_available_stream_capacity() + ) + elif conn.is_idle(): + # Idle but not available (this shouldn't happen, but handle it by closing the connection) + closing_conns.append(conn) + else: + # Occupied connections + occupied_conns.append(conn) + + # Calculate how many new connections we can create + total_existing_connections = ( + len(available_conns) + len(occupied_conns) + len(closing_conns) + ) + new_conns_remaining_count = self._max_connections - total_existing_connections + + # Phase 2: Assign queued requests to connections + for pool_request in self._requests: + if not pool_request.is_queued(): + continue + origin = pool_request.request.url.origin - available_connections = [ - connection - for connection in self._connections - if connection.can_handle_request(origin) and connection.is_available() - ] - idle_connections = [ - connection for connection in self._connections if connection.is_idle() - ] + # Try to find an available connection that can handle this request # There are three cases for how we may be able to handle the request: # - # 1. There is an existing connection that can handle the request. + # 1. There is an existing available connection that can handle the request. # 2. We can create a new connection to handle the request. - # 3. We can close an idle connection and then create a new connection - # to handle the request. - if available_connections: - # log: "reusing existing connection" - connection = available_connections[0] - pool_request.assign_to_connection(connection) - elif len(self._connections) < self._max_connections: - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) - - return closing_connections + # 3. We can close an idle connection and then create a new connection to handle the request. + + assigned = False + + # Case 1: try to use an available connection + for i in range(len(available_conns) - 1, -1, -1): + # Loop in reverse order since popping an element from the end of the list is O(1), + # whereas popping from the beginning of the list is O(n) + + conn = available_conns[i] + if conn.can_handle_request(origin): + # Assign the request to this connection + pool_request.assign_to_connection(conn) + + # Handle HTTP/1.1 vs HTTP/2 differently + if self._http2 and conn in http2_conn_stream_capacity: + # HTTP/2: Decrement available capacity + http2_conn_stream_capacity[conn] -= 1 + if http2_conn_stream_capacity[conn] <= 0: + # Move to occupied if no more capacity + available_conns.pop(i) + occupied_conns.append(conn) + del http2_conn_stream_capacity[conn] + else: + # HTTP/1.1: Move to occupied immediately + available_conns.pop(i) + occupied_conns.append(conn) + + assigned = True + break + + if assigned: + continue + + # Case 2: Try to create a new connection + if new_conns_remaining_count > 0: + conn = self.create_connection(origin) + pool_request.assign_to_connection(conn) + # New connections go to occupied (we don't know if HTTP/1.1 or HTTP/2 yet, so assume no multiplexing) + occupied_conns.append(conn) + new_conns_remaining_count -= 1 + continue + + # Case 3, last resort: evict an idle connection and create a new connection + assigned = False + for i in range(len(available_conns) - 1, -1, -1): + # Loop in reverse order since popping an element from the end of the list is O(1), + # whereas popping from the beginning of the list is O(n) + conn = available_conns[i] + if conn.is_idle(): + evicted_conn = available_conns.pop(i) + closing_conns.append(evicted_conn) + # Create new connection for the required origin + conn = self.create_connection(origin) + pool_request.assign_to_connection(conn) + occupied_conns.append(conn) + assigned = True + break + + # All attempts failed: all connections are occupied and we can't create a new one + if not assigned: + # Break out of the loop since no more queued requests can be serviced at this time + break + + # Phase 3: Enforce self._max_keepalive_connections by closing excess idle connections + # + # Only run keepalive enforcement if len(available_conns) > max_keepalive. + # Since idle connections are a subset of available connections, if there are + # fewer available connections than the limit, we cannot possibly violate it. + if len(available_conns) > self._max_keepalive_connections: + keepalive_available_conns: list[ConnectionInterface] = [] + n_idle_conns_kept = 0 + + for conn in available_conns: + if conn.is_idle(): + if n_idle_conns_kept >= self._max_keepalive_connections: + # We've already kept the maximum allowed idle connections, close this one + closing_conns.append(conn) + else: + # Keep this idle connection as we're still under the limit + keepalive_available_conns.append(conn) + n_idle_conns_kept += 1 + else: + # This is an available but not idle connection (active HTTP/2 with capacity) + # Always keep these as they don't count against keepalive limits + keepalive_available_conns.append(conn) + + # Replace available_conns with the filtered list (excess idle connections removed) + available_conns = keepalive_available_conns + + # Rebuild self._connections from all buckets + self._connections = available_conns + occupied_conns + + return closing_conns def _close_connections(self, closing: list[ConnectionInterface]) -> None: # Close connections which have been removed from the pool. diff --git a/tests/_async_only/test_connection_pool.py b/tests/_async_only/test_connection_pool.py index d25b3ed71..1409d2f92 100644 --- a/tests/_async_only/test_connection_pool.py +++ b/tests/_async_only/test_connection_pool.py @@ -87,6 +87,9 @@ async def test_available_connections_reassigned(): await response1.aread() await response1.aclose() + # Give a short time for the pool to assign the freed-up connection to the queued request + await anyio.sleep(0.01) + # After finishing the first request, the pool automatically assigns requests # from the request queue to available connections. # At this point, we should have: From c9106c731fd2d03a02e0b2c5d10769f7b850a002 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 18:55:44 +0200 Subject: [PATCH 5/8] Add get_available_stream_capacity to conn interface --- httpcore/_async/connection.py | 5 +++++ httpcore/_async/connection_pool.py | 4 ++-- httpcore/_async/http11.py | 7 +++++++ httpcore/_async/http_proxy.py | 3 +++ httpcore/_async/interfaces.py | 9 +++++++++ httpcore/_sync/connection.py | 5 +++++ httpcore/_sync/connection_pool.py | 4 ++-- httpcore/_sync/http11.py | 7 +++++++ httpcore/_sync/http_proxy.py | 3 +++ httpcore/_sync/interfaces.py | 9 +++++++++ 10 files changed, 52 insertions(+), 4 deletions(-) diff --git a/httpcore/_async/connection.py b/httpcore/_async/connection.py index b42581dff..d311db61d 100644 --- a/httpcore/_async/connection.py +++ b/httpcore/_async/connection.py @@ -199,6 +199,11 @@ def is_closed(self) -> bool: return self._connect_failed return self._connection.is_closed() + def get_available_stream_capacity(self) -> int: + if self._connection is None: + return 1 + return self._connection.get_available_stream_capacity() + def info(self) -> str: if self._connection is None: return "CONNECTION FAILED" if self._connect_failed else "CONNECTING" diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 0eeddc26b..1a2080f98 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -15,7 +15,7 @@ try: from .http2 import AsyncHTTP2Connection -except ImportError: +except ImportError: # pragma: nocover # ImportError happens when the user installed httpcore without the optional http2 dependency AsyncHTTP2Connection = None # type: ignore[assignment, misc] @@ -303,7 +303,7 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]: # Available connections available_conns.append(conn) # Track HTTP/2 connection capacity - if self._http2 and isinstance(conn, AsyncHTTP2Connection): + if self._http2: # Get the actual available stream count from the connection http2_conn_stream_capacity[conn] = ( conn.get_available_stream_capacity() diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index e6d6d7098..0552d498c 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -291,6 +291,13 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._state == HTTPConnectionState.CLOSED + def get_available_stream_capacity(self) -> int: + """ + For HTTP/1.1, return 1 if the connection is idle (can accept a request), + 0 otherwise (connection is busy). + """ + return 1 if self._state == HTTPConnectionState.IDLE else 0 + def info(self) -> str: origin = str(self._origin) return ( diff --git a/httpcore/_async/http_proxy.py b/httpcore/_async/http_proxy.py index cc9d92066..1eaae9b0b 100644 --- a/httpcore/_async/http_proxy.py +++ b/httpcore/_async/http_proxy.py @@ -363,5 +363,8 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._connection.is_closed() + def get_available_stream_capacity(self) -> int: + return self._connection.get_available_stream_capacity() + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.info()}]>" diff --git a/httpcore/_async/interfaces.py b/httpcore/_async/interfaces.py index 361583bed..40a43af59 100644 --- a/httpcore/_async/interfaces.py +++ b/httpcore/_async/interfaces.py @@ -135,3 +135,12 @@ def is_closed(self) -> bool: returned to the connection pool or not. """ raise NotImplementedError() # pragma: nocover + + def get_available_stream_capacity(self) -> int: + """ + Return the number of additional streams that can be handled by this connection. + + For HTTP/1.1 connections, this is 1 if the connection is idle, 0 otherwise. + For HTTP/2 connections, this is the number of available concurrent streams. + """ + raise NotImplementedError() # pragma: nocover diff --git a/httpcore/_sync/connection.py b/httpcore/_sync/connection.py index 363f8be81..e86252a09 100644 --- a/httpcore/_sync/connection.py +++ b/httpcore/_sync/connection.py @@ -199,6 +199,11 @@ def is_closed(self) -> bool: return self._connect_failed return self._connection.is_closed() + def get_available_stream_capacity(self) -> int: + if self._connection is None: + return 1 + return self._connection.get_available_stream_capacity() + def info(self) -> str: if self._connection is None: return "CONNECTION FAILED" if self._connect_failed else "CONNECTING" diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 1031e23cb..c78f65b4d 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -15,7 +15,7 @@ try: from .http2 import HTTP2Connection -except ImportError: +except ImportError: # pragma: nocover # ImportError happens when the user installed httpcore without the optional http2 dependency HTTP2Connection = None # type: ignore[assignment, misc] @@ -303,7 +303,7 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]: # Available connections available_conns.append(conn) # Track HTTP/2 connection capacity - if self._http2 and isinstance(conn, HTTP2Connection): + if self._http2: # Get the actual available stream count from the connection http2_conn_stream_capacity[conn] = ( conn.get_available_stream_capacity() diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index ebd3a9748..7e53dc8a0 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -291,6 +291,13 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._state == HTTPConnectionState.CLOSED + def get_available_stream_capacity(self) -> int: + """ + For HTTP/1.1, return 1 if the connection is idle (can accept a request), + 0 otherwise (connection is busy). + """ + return 1 if self._state == HTTPConnectionState.IDLE else 0 + def info(self) -> str: origin = str(self._origin) return ( diff --git a/httpcore/_sync/http_proxy.py b/httpcore/_sync/http_proxy.py index ecca88f7d..958b2f2c8 100644 --- a/httpcore/_sync/http_proxy.py +++ b/httpcore/_sync/http_proxy.py @@ -363,5 +363,8 @@ def is_idle(self) -> bool: def is_closed(self) -> bool: return self._connection.is_closed() + def get_available_stream_capacity(self) -> int: + return self._connection.get_available_stream_capacity() + def __repr__(self) -> str: return f"<{self.__class__.__name__} [{self.info()}]>" diff --git a/httpcore/_sync/interfaces.py b/httpcore/_sync/interfaces.py index e673d4cc1..3e35ffb0c 100644 --- a/httpcore/_sync/interfaces.py +++ b/httpcore/_sync/interfaces.py @@ -135,3 +135,12 @@ def is_closed(self) -> bool: returned to the connection pool or not. """ raise NotImplementedError() # pragma: nocover + + def get_available_stream_capacity(self) -> int: + """ + Return the number of additional streams that can be handled by this connection. + + For HTTP/1.1 connections, this is 1 if the connection is idle, 0 otherwise. + For HTTP/2 connections, this is the number of available concurrent streams. + """ + raise NotImplementedError() # pragma: nocover From 85296a47dd0aaf572449c19936625861edbf7df2 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 19:13:39 +0200 Subject: [PATCH 6/8] add tests for _assign_requests_to_connections --- tests/_async/test_connection_pool.py | 119 +++++++++++++++++++++++++++ tests/_sync/test_connection_pool.py | 119 +++++++++++++++++++++++++++ 2 files changed, 238 insertions(+) diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2b95e575a..48099b391 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -892,3 +892,122 @@ async def test_keepalive_idle_connections(): repr(pool) == "" ) + + +@pytest.mark.anyio +async def test_idle_but_not_available_connection_closing(): + """ + Test that a connection that is idle but not available gets closed. + This is a pathological edge case that shouldn't occur in reality, but the connection pool + handles it anyway. + """ + + class MockIdleNotAvailableConnection(httpcore.AsyncConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return False # Not available + + def is_idle(self) -> bool: + return True # But is idle - this is the edge case + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + network_backend = httpcore.AsyncMockBackend([]) + + async with httpcore.AsyncConnectionPool( + network_backend=network_backend, + ) as pool: + # Replace the connection list with our pathological mock connection + mock_conn = MockIdleNotAvailableConnection() + + pool._connections = [mock_conn] + + # This should move the idle-but-not-available connection to closing_conns + closing_conns = pool._assign_requests_to_connections() + + # Verify the connection is marked for closing + assert len(closing_conns) == 1 + assert closing_conns[0] is mock_conn + + # Verify it's no longer in the connection pool + assert len(pool._connections) == 0 + + +@pytest.mark.anyio +async def test_active_http2_connection_keepalive_preservation(): + """ + Test that active HTTP/2 connections with capacity are preserved during keepalive enforcement. + This tests line 421 where active HTTP/2 connections are kept during keepalive enforcement. + """ + + class MockActiveHTTP2Connection(httpcore.AsyncConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return True # Available with capacity + + def is_idle(self) -> bool: + return False # Not idle (active HTTP/2 connection with streams) + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + def get_available_stream_capacity(self) -> int: + return 5 # HTTP/2 connection with available stream capacity + + async def aclose(self): + pass + + class MockIdleConnection(httpcore.AsyncConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return True + + def is_idle(self) -> bool: + return True # This is an idle connection + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + def get_available_stream_capacity(self) -> int: + return 1 + + network_backend = httpcore.AsyncMockBackend([]) + + async with httpcore.AsyncConnectionPool( + max_keepalive_connections=0, # Force keepalive limit to 0 + http2=True, + network_backend=network_backend, + ) as pool: + # Create one idle connection and one active HTTP/2 connection + idle_conn = MockIdleConnection() + active_http2_conn = MockActiveHTTP2Connection() + + with pool._optional_thread_lock: + pool._connections = [idle_conn, active_http2_conn] + + # This should close idle connections but preserve the active HTTP/2 connection + closing_conns = pool._assign_requests_to_connections() + + # Verify idle connection is marked for closing + assert len(closing_conns) == 1 + assert idle_conn in closing_conns + + # Verify the active HTTP/2 connection is preserved (line 421) + assert len(pool._connections) == 1 + assert active_http2_conn in pool._connections diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index 495c5e19b..13f318cb3 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -892,3 +892,122 @@ def test_keepalive_idle_connections(): repr(pool) == "" ) + + + +def test_idle_but_not_available_connection_closing(): + """ + Test that a connection that is idle but not available gets closed. + This is a pathological edge case that shouldn't occur in reality, but the connection pool + handles it anyway. + """ + + class MockIdleNotAvailableConnection(httpcore.ConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return False # Not available + + def is_idle(self) -> bool: + return True # But is idle - this is the edge case + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + network_backend = httpcore.MockBackend([]) + + with httpcore.ConnectionPool( + network_backend=network_backend, + ) as pool: + # Replace the connection list with our pathological mock connection + mock_conn = MockIdleNotAvailableConnection() + + pool._connections = [mock_conn] + + # This should move the idle-but-not-available connection to closing_conns + closing_conns = pool._assign_requests_to_connections() + + # Verify the connection is marked for closing + assert len(closing_conns) == 1 + assert closing_conns[0] is mock_conn + + # Verify it's no longer in the connection pool + assert len(pool._connections) == 0 + + + +def test_active_http2_connection_keepalive_preservation(): + """ + Test that active HTTP/2 connections with capacity are preserved during keepalive enforcement. + This tests line 421 where active HTTP/2 connections are kept during keepalive enforcement. + """ + + class MockActiveHTTP2Connection(httpcore.ConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return True # Available with capacity + + def is_idle(self) -> bool: + return False # Not idle (active HTTP/2 connection with streams) + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + def get_available_stream_capacity(self) -> int: + return 5 # HTTP/2 connection with available stream capacity + + def close(self): + pass + + class MockIdleConnection(httpcore.ConnectionInterface): + def __init__(self) -> None: + self._origin = httpcore.Origin(b"https", b"example.com", 443) + + def is_available(self) -> bool: + return True + + def is_idle(self) -> bool: + return True # This is an idle connection + + def is_closed(self) -> bool: + return False + + def has_expired(self) -> bool: + return False + + def get_available_stream_capacity(self) -> int: + return 1 + + network_backend = httpcore.MockBackend([]) + + with httpcore.ConnectionPool( + max_keepalive_connections=0, # Force keepalive limit to 0 + http2=True, + network_backend=network_backend, + ) as pool: + # Create one idle connection and one active HTTP/2 connection + idle_conn = MockIdleConnection() + active_http2_conn = MockActiveHTTP2Connection() + + with pool._optional_thread_lock: + pool._connections = [idle_conn, active_http2_conn] + + # This should close idle connections but preserve the active HTTP/2 connection + closing_conns = pool._assign_requests_to_connections() + + # Verify idle connection is marked for closing + assert len(closing_conns) == 1 + assert idle_conn in closing_conns + + # Verify the active HTTP/2 connection is preserved (line 421) + assert len(pool._connections) == 1 + assert active_http2_conn in pool._connections From ce0d1b29754c30785c607e9af2e23729ba8af2d5 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 19:59:42 +0200 Subject: [PATCH 7/8] Improve test to demonstrate double assignment bug --- tests/_async_only/test_connection_pool.py | 33 ++++++++++++++--------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/_async_only/test_connection_pool.py b/tests/_async_only/test_connection_pool.py index 1409d2f92..fd3302e37 100644 --- a/tests/_async_only/test_connection_pool.py +++ b/tests/_async_only/test_connection_pool.py @@ -18,6 +18,7 @@ async def test_available_connections_reassigned(): After reading/closing first request: Expected: 1 active, 1 queued, 1 connection + Only 1 request should be assigned to the freed-up connection """ network_backend = httpcore.AsyncMockBackend( [ @@ -83,23 +84,29 @@ async def test_available_connections_reassigned(): == "" ) + # Monkey patch connection's handle_async_request to intercept ConnectionNotAvailable + for connection in pool._connections: + original_handle = connection.handle_async_request + + async def monitored_handle_async_request(request): + try: + return await original_handle(request) + except httpcore.ConnectionNotAvailable: # pragma: nocover + pytest.fail( # pragma: nocover + "ConnectionNotAvailable was raised on connection, " + "indicating that multiple requests were assigned to a single HTTP/1.1 connection" + ) + + connection.handle_async_request = monitored_handle_async_request # type: ignore[method-assign] + # Read and close the first response await response1.aread() await response1.aclose() # Give a short time for the pool to assign the freed-up connection to the queued request - await anyio.sleep(0.01) - - # After finishing the first request, the pool automatically assigns requests - # from the request queue to available connections. - # At this point, we should have: - # - 1 active request (request2 should become active) - # - 1 queued request (request3 should remain queued) - # - 1 connection (same connection, now available for request2) - assert ( - repr(pool) - == "" - ) + # This will trigger the ConnectionNotAvailable if multiple requests are assigned to the same connection + await anyio.sleep(0.05) - # cancel taskgroup to avoid a hanging test + # Cancel taskgroup to avoid a hanging test + # (since request2 is never closed, and hence the task for request3 cannot start) tg.cancel_scope.cancel() From 47b9126866c00e28a979dcfeef793a8e53e85620 Mon Sep 17 00:00:00 2001 From: Victor Prins <32959052+VictorPrins@users.noreply.github.com> Date: Sun, 14 Sep 2025 23:49:00 +0200 Subject: [PATCH 8/8] Remove unused import --- httpcore/_async/connection_pool.py | 6 ------ httpcore/_sync/connection_pool.py | 6 ------ 2 files changed, 12 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 1a2080f98..410e2583a 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -13,12 +13,6 @@ from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface -try: - from .http2 import AsyncHTTP2Connection -except ImportError: # pragma: nocover - # ImportError happens when the user installed httpcore without the optional http2 dependency - AsyncHTTP2Connection = None # type: ignore[assignment, misc] - class AsyncPoolRequest: def __init__(self, request: Request) -> None: diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index c78f65b4d..ba9b73d03 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -13,12 +13,6 @@ from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface -try: - from .http2 import HTTP2Connection -except ImportError: # pragma: nocover - # ImportError happens when the user installed httpcore without the optional http2 dependency - HTTP2Connection = None # type: ignore[assignment, misc] - class PoolRequest: def __init__(self, request: Request) -> None: