diff --git a/aiozmq/stream.py b/aiozmq/stream.py index 8548299..f52f246 100644 --- a/aiozmq/stream.py +++ b/aiozmq/stream.py @@ -235,6 +235,9 @@ def read(self): if self._exception is not None: raise self._exception + if self._closing: + raise ZmqStreamClosed() + if not self._queue_len: if self._waiter is not None: raise RuntimeError('read called while another coroutine is ' diff --git a/docs/stream.rst b/docs/stream.rst index b1be1e2..f29c2cd 100644 --- a/docs/stream.rst +++ b/docs/stream.rst @@ -160,6 +160,8 @@ ZmqStream Read one :term:`ZeroMQ` message from the wire and return it. + Raise :exc:`ZmqStreamClosed` if the stream was closed. + .. method:: write(msg) Writes message *msg* into :term:`ZeroMQ` socket. @@ -193,3 +195,11 @@ ZmqStream operations waiting for the data will be resumed. *The private method*. + + +Exceptions +---------- + +.. exception:: ZmqStreamClosed + + Raised by read operations on closed stream. diff --git a/tests/zmq_stream_test.py b/tests/zmq_stream_test.py index c11cee9..f7ebb48 100644 --- a/tests/zmq_stream_test.py +++ b/tests/zmq_stream_test.py @@ -540,3 +540,25 @@ def f(): self.assertIs(cm.exception, exc) self.loop.run_until_complete(go()) + + def test_double_read_of_closed_stream(self): + port = find_unused_port() + + @asyncio.coroutine + def go(): + s2 = yield from aiozmq.create_zmq_stream( + zmq.ROUTER, + connect='tcp://127.0.0.1:{}'.format(port), + loop=self.loop) + + self.assertFalse(s2.at_closing()) + s2.close() + with self.assertRaises(aiozmq.ZmqStreamClosed): + yield from s2.read() + self.assertTrue(s2.at_closing()) + + with self.assertRaises(aiozmq.ZmqStreamClosed): + yield from s2.read() + self.assertTrue(s2.at_closing()) + + self.loop.run_until_complete(go())