diff --git a/tests/core/transport/quic/test_integration.py b/tests/core/transport/quic/test_integration.py index 980a94140..cb3ee8dc5 100644 --- a/tests/core/transport/quic/test_integration.py +++ b/tests/core/transport/quic/test_integration.py @@ -338,15 +338,11 @@ async def timeout_test_handler(connection: QUICConnection) -> None: @pytest.mark.trio -@pytest.mark.flaky(reruns=3, reruns_delay=2) async def test_yamux_stress_ping(): STREAM_COUNT = 100 listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic") latencies = [] failures = [] - completion_event = trio.Event() - completed_count: list[int] = [0] # Use list to make it mutable for closures - completed_lock = trio.Lock() # === Server Setup === server_host = new_host(listen_addrs=[listen_addr]) @@ -364,9 +360,8 @@ async def handle_ping(stream: INetStream) -> None: server_host.set_stream_handler(PING_PROTOCOL_ID, handle_ping) async with server_host.run(listen_addrs=[listen_addr]): - # Wait for server to actually be listening - while not server_host.get_addrs(): - await trio.sleep(0.01) + # Give server time to start + await trio.sleep(0.1) # === Client Setup === destination = str(server_host.get_addrs()[0]) @@ -379,40 +374,17 @@ async def handle_ping(stream: INetStream) -> None: async with client_host.run(listen_addrs=[client_listen_addr]): await client_host.connect(info) - # Wait for connection to be established and ready - # (check actual connection state) - network = client_host.get_network() - connections_map = network.get_connections_map() - while ( - info.peer_id not in connections_map or not connections_map[info.peer_id] - ): - await trio.sleep(0.01) - connections_map = network.get_connections_map() - - # Wait for connection's event_started to ensure it's ready for streams - # This ensures the muxer is fully initialized and can accept streams - connections = connections_map[info.peer_id] - if connections: - swarm_conn = connections[0] - # Wait for the connection to be fully started (muxer ready) - if hasattr(swarm_conn, "event_started"): - await swarm_conn.event_started.wait() - # Additional small wait to ensure multiselect is ready - await trio.sleep(0.05) - async def ping_stream(i: int): stream = None try: start = trio.current_time() - stream = await client_host.new_stream( info.peer_id, [PING_PROTOCOL_ID] ) await stream.write(b"\x01" * PING_LENGTH) - # Wait for response with timeout as safety net - with trio.fail_after(30): + with trio.fail_after(5): response = await stream.read(PING_LENGTH) if response == b"\x01" * PING_LENGTH: @@ -424,35 +396,11 @@ async def ping_stream(i: int): print(f"[Ping #{i}] Failed: {e}") failures.append(i) if stream: - try: - await stream.reset() - except Exception: - pass - finally: - # Signal completion - async with completed_lock: - completed_count[0] += 1 - if completed_count[0] == STREAM_COUNT: - completion_event.set() - - # Throttle concurrent stream openings to prevent multiselect negotiation - # contention. QUICConnection limits concurrent negotiations to 5, so we - # use 8 here to allow some streams to queue while others negotiate. - # This is test-only - real apps don't need throttling. - # Note: Test may still be flaky; @pytest.mark.flaky handles retries. - semaphore = trio.Semaphore(8) - - async def ping_stream_with_semaphore(i: int): - async with semaphore: - await ping_stream(i) + await stream.reset() async with trio.open_nursery() as nursery: for i in range(STREAM_COUNT): - nursery.start_soon(ping_stream_with_semaphore, i) - - # Wait for all streams to complete (event-driven, not polling) - with trio.fail_after(120): # Safety timeout - await completion_event.wait() + nursery.start_soon(ping_stream, i) # === Result Summary === print("\nšŸ“Š Ping Stress Test Summary")