Skip to content

Conversation

Olegt0rr
Copy link
Collaborator

What do these changes do?

Provides a way to work around issue sysid/sse-starlette#89

Are there changes in behavior for the user?

Added the ability to set a timeout for interaction with a connection.

For example, if a hung connection does not read ping messages, then after a while we can automatically disconnect it to free up resources.

Related issue number

sysid/sse-starlette#89

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes

Copy link

codecov bot commented Feb 12, 2024

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (b6883cb) to head (e3dc176).
⚠️ Report is 44 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff            @@
##            master      #469   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files            4         4           
  Lines          475       501   +26     
  Branches        17        17           
=========================================
+ Hits           475       501   +26     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Olegt0rr Olegt0rr marked this pull request as ready for review February 12, 2024 21:13
@Olegt0rr Olegt0rr self-assigned this Feb 12, 2024
timeout_raised = False

async def frozen_write(_data: bytes) -> None:
await asyncio.sleep(42)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem here, is that we're tampering with the server side of the connection. Is it possible to do something with the client to simulate the hanging connection? Then we can be sure this works correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to reproduce hanged connection, but the test covers any time-based issues.

Also I prepared this example to make sure solution helps directly to solve the issue:

import asyncio
from datetime import datetime

from aiohttp import web

from aiohttp_sse import sse_response

TIMEOUT = 5


async def hello(request: web.Request) -> web.StreamResponse:
    """Timeout example.

    How to reproduce the issue:
    1. Run this example
    2. Open console
    3. Executed the command below and then press Ctrl+Z (cmd+Z):
        curl -s -N localhost:8000/events > /dev/null
        
    4. Try to change TIMEOUT to None and repeat the steps above.
    """
    async with sse_response(request, timeout=TIMEOUT) as resp:
        i = 0
        try:
            while resp.is_connected():
                spaces = " " * 4096
                data = f"Server Time : {datetime.now()} {spaces}"

                i += 1
                if i % 100 == 0:
                    print(i, data)

                await resp.send(data)
                await asyncio.sleep(0.01)
        except Exception as exc:
            print(f"Exception: {type(exc).__name__} {exc}")
        finally:
            print("Disconnected")

    return resp


if __name__ == "__main__":
    app = web.Application()
    app.router.add_route("GET", "/events", hello)
    web.run_app(app, host="127.0.0.1", port=8000)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my thinking is that we should be able to do something like resp.connection.transport.pause_reading() in the test to stop the client reading the connection. But, the test is not passing then.

I'm not yet convinced this fixes the reported issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With timeout=None this test is not passed (as expected)

@pytest.mark.parametrize("timeout", (None, 1.0))
async def test_with_timeout(
    aiohttp_client: ClientFixture,
    monkeypatch: pytest.MonkeyPatch,
    timeout: Optional[float],
) -> None:
    """Test write timeout.

    Relates to this issue:
    https://github.com/sysid/sse-starlette/issues/89
    """
    sse_closed = asyncio.Event()

    async def handler(request: web.Request) -> EventSourceResponse:
        sse = EventSourceResponse(timeout=timeout)
        sse.ping_interval = 1
        await sse.prepare(request)

        try:
            async with sse:
                i = 0
                while sse.is_connected():
                    spaces = " " * 4096
                    data = f"Server Time : {datetime.now()} {spaces}"

                    i += 1
                    if i % 100 == 0:
                        print(i, data)

                        await sse.send(data)
                        await asyncio.sleep(0.01)
        finally:
            sse_closed.set()
        return sse  # pragma: no cover

    app = web.Application()
    app.router.add_route("GET", "/", handler)

    client = await aiohttp_client(app)
    async with client.get("/") as resp:
        assert resp.status == 200
        resp.connection.transport.pause_reading()
        print(
            f"Transport paused reading with "
            f"{resp.connection.transport.pause_reading}"
        )
        
        async with asyncio.timeout(10):
            await sse_closed.wait()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing it tests is that the status was 200?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if I add prints:

            print("A", time.time())
            await self.write(buffer.getvalue().encode("utf-8")),
            print("B", time.time())

I then need to add an await asyncio.sleep(0) to the original test:

            await asyncio.sleep(0)
            try:
                await sse.send("foo")

The send() call doesn't seem to yield, so without the sleep, the client code never runs and manages to pause the reading.

But, then my output looks like:

A 1714071179.1714509
B 1714071179.1715052

So, even after we pause reading, it's not waiting for the client...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I increased the amount of data sent in each message, as you did above. Now I can see it working correctly!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed some changes to the test. I think that's probably good now. The assert for the connection being closed was failing, so I removed that. Feel free to play with it if you think it should work though.

I'd note from the original issue:

continued generating chunks to send on this connection, slowly saturating TCP buffers before finally simply hanging in the send call.

We are only detecting that final hang and cancelling then. As far as I can tell, the buffers must be around 10MB, so if you were sending a 100 byte message once per minute, then it'd take ~28 hours to detect the hung client and disconnect it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, did we not want to address that issue? 28 hours to disconnect a hung client doesn't seem very reasonable..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it should be done in a separate PR, as a separate feature. But, I guess we can create another version of this test that sends much smaller amounts of data in order to reproduce the issue. Then maybe we can log the time in the send() method, and if it's exceeded a couple of seconds, then it should call writer.drain() or something to flush the buffer to the socket.

@gbtami
Copy link

gbtami commented Aug 14, 2025

@Olegt0rr is there any chance you can fix the conflicts to let this PR be merged?

# Conflicts:
#	aiohttp_sse/__init__.py
#	tests/test_sse.py
@Olegt0rr
Copy link
Collaborator Author

@Olegt0rr is there any chance you can fix the conflicts to let this PR be merged?

Fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants