Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 5 additions & 57 deletions tests/core/transport/quic/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,15 +338,11 @@ async def timeout_test_handler(connection: QUICConnection) -> None:


@pytest.mark.trio
@pytest.mark.flaky(reruns=3, reruns_delay=2)
async def test_yamux_stress_ping():
STREAM_COUNT = 100
listen_addr = create_quic_multiaddr("127.0.0.1", 0, "/quic")
latencies = []
failures = []
completion_event = trio.Event()
completed_count: list[int] = [0] # Use list to make it mutable for closures
completed_lock = trio.Lock()

# === Server Setup ===
server_host = new_host(listen_addrs=[listen_addr])
Expand All @@ -364,9 +360,8 @@ async def handle_ping(stream: INetStream) -> None:
server_host.set_stream_handler(PING_PROTOCOL_ID, handle_ping)

async with server_host.run(listen_addrs=[listen_addr]):
# Wait for server to actually be listening
while not server_host.get_addrs():
await trio.sleep(0.01)
# Give server time to start
await trio.sleep(0.1)

# === Client Setup ===
destination = str(server_host.get_addrs()[0])
Expand All @@ -379,40 +374,17 @@ async def handle_ping(stream: INetStream) -> None:
async with client_host.run(listen_addrs=[client_listen_addr]):
await client_host.connect(info)

# Wait for connection to be established and ready
# (check actual connection state)
network = client_host.get_network()
connections_map = network.get_connections_map()
while (
info.peer_id not in connections_map or not connections_map[info.peer_id]
):
await trio.sleep(0.01)
connections_map = network.get_connections_map()

# Wait for connection's event_started to ensure it's ready for streams
# This ensures the muxer is fully initialized and can accept streams
connections = connections_map[info.peer_id]
if connections:
swarm_conn = connections[0]
# Wait for the connection to be fully started (muxer ready)
if hasattr(swarm_conn, "event_started"):
await swarm_conn.event_started.wait()
# Additional small wait to ensure multiselect is ready
await trio.sleep(0.05)

async def ping_stream(i: int):
stream = None
try:
start = trio.current_time()

stream = await client_host.new_stream(
info.peer_id, [PING_PROTOCOL_ID]
)

await stream.write(b"\x01" * PING_LENGTH)

# Wait for response with timeout as safety net
with trio.fail_after(30):
with trio.fail_after(5):
response = await stream.read(PING_LENGTH)

if response == b"\x01" * PING_LENGTH:
Expand All @@ -424,35 +396,11 @@ async def ping_stream(i: int):
print(f"[Ping #{i}] Failed: {e}")
failures.append(i)
if stream:
try:
await stream.reset()
except Exception:
pass
finally:
# Signal completion
async with completed_lock:
completed_count[0] += 1
if completed_count[0] == STREAM_COUNT:
completion_event.set()

# Throttle concurrent stream openings to prevent multiselect negotiation
# contention. QUICConnection limits concurrent negotiations to 5, so we
# use 8 here to allow some streams to queue while others negotiate.
# This is test-only - real apps don't need throttling.
# Note: Test may still be flaky; @pytest.mark.flaky handles retries.
semaphore = trio.Semaphore(8)

async def ping_stream_with_semaphore(i: int):
async with semaphore:
await ping_stream(i)
await stream.reset()

async with trio.open_nursery() as nursery:
for i in range(STREAM_COUNT):
nursery.start_soon(ping_stream_with_semaphore, i)

# Wait for all streams to complete (event-driven, not polling)
with trio.fail_after(120): # Safety timeout
await completion_event.wait()
nursery.start_soon(ping_stream, i)

# === Result Summary ===
print("\n📊 Ping Stress Test Summary")
Expand Down
Loading