From e0a6852df32bd1f93d62a696168ce3978054ba62 Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Wed, 13 Mar 2024 12:41:06 +0100 Subject: [PATCH 1/2] Stopping listener closes handler which closes communicators. Fixes #59 --- pyleco/utils/listener.py | 2 +- pyleco/utils/pipe_handler.py | 11 ++++++++++- tests/utils/test_listener.py | 30 +++++++++++++++++++++++++++++- tests/utils/test_pipe_handler.py | 12 ++++++++++++ 4 files changed, 52 insertions(+), 3 deletions(-) diff --git a/pyleco/utils/listener.py b/pyleco/utils/listener.py index 741579fd..0d93fe4b 100644 --- a/pyleco/utils/listener.py +++ b/pyleco/utils/listener.py @@ -146,7 +146,7 @@ def stop_listen(self) -> None: log.debug("Stopping listener thread.") self.stop_event.set() self.thread.join() - self.communicator.close() + self.message_handler.close() log.removeHandler(self.message_handler.log_handler) if self.logger is not None: self.logger.removeHandler(self.message_handler.log_handler) diff --git a/pyleco/utils/pipe_handler.py b/pyleco/utils/pipe_handler.py index 09f955ed..af52ba13 100644 --- a/pyleco/utils/pipe_handler.py +++ b/pyleco/utils/pipe_handler.py @@ -170,7 +170,11 @@ def full_name(self) -> str: return self.handler.full_name def _send_pipe_message(self, typ: PipeCommands, *content: bytes) -> None: - self.socket.send_multipart((typ, *content)) + try: + self.socket.send_multipart((typ, *content)) + except zmq.ZMQError as exc: + raise ConnectionRefusedError(f"Connection to the handler refused with '{exc}', " + "probably the handler stopped.") def send_message(self, message: Message) -> None: if not message.sender: @@ -262,6 +266,7 @@ def setup_message_buffer(self) -> None: def close(self) -> None: self.internal_pipe.close(1) + self.close_all_communicators() super().close() def set_full_name(self, full_name: str) -> None: @@ -355,3 +360,7 @@ def get_communicator(self, **kwargs) -> CommunicatorPipe: return self.create_communicator(**kwargs) else: return com + + def close_all_communicators(self) -> None: + for communicator in self._communicators.values(): + communicator.close() diff --git a/tests/utils/test_listener.py b/tests/utils/test_listener.py index 9b14f6b1..68eaae6a 100644 --- a/tests/utils/test_listener.py +++ b/tests/utils/test_listener.py @@ -25,8 +25,9 @@ import pytest from pyleco.test import FakeCommunicator +from pyleco.core.message import Message -from pyleco.utils.listener import Listener +from pyleco.utils.listener import Listener, CommunicatorPipe @pytest.fixture @@ -38,3 +39,30 @@ def listener() -> Listener: def test_communicator_name_is_returned(listener: Listener): assert listener.name == "N.Pipe" + + +class Test_communicator_closed_at_stopped_listener(): + @pytest.fixture(scope="class") + def communicator(self) -> CommunicatorPipe: + # scope is class as starting the listener takes some time + listener = Listener(name="test") + listener.start_listen() + communicator = listener.communicator + listener.stop_listen() + return communicator + + def test_socket_closed(self, communicator: CommunicatorPipe): + assert communicator.socket.closed is True + + def test_internal_method(self, communicator: CommunicatorPipe): + """A method which is handled in the handler and not sent from the handler via LECO.""" + with pytest.raises(ConnectionRefusedError): + communicator.ask_handler("pong") + + def test_sending_messages(self, communicator: CommunicatorPipe): + with pytest.raises(ConnectionRefusedError): + communicator.send_message(Message("rec", "send")) + + def test_changing_name(self, communicator: CommunicatorPipe): + with pytest.raises(ConnectionRefusedError): + communicator.name = "abc" diff --git a/tests/utils/test_pipe_handler.py b/tests/utils/test_pipe_handler.py index e7ab34d9..960ed05e 100644 --- a/tests/utils/test_pipe_handler.py +++ b/tests/utils/test_pipe_handler.py @@ -149,6 +149,13 @@ def communicator(pipe_handler_pipe: PipeHandler): return pipe_handler_pipe.get_communicator() +def test_close_closes_all_communicators( + pipe_handler_pipe: PipeHandler, communicator: CommunicatorPipe +): + pipe_handler_pipe.close() + assert communicator.socket.closed is True + + class Test_PipeHandler_read_message: def test_handle_response(self, pipe_handler: PipeHandler): message = Message("rec", "send") @@ -195,6 +202,11 @@ def test_second_call_returns_same_communicator(self, pipe_handler_setup: PipeHan assert com2 == pipe_handler_setup.external_pipe # type: ignore +def test_close_all_communicators(pipe_handler_pipe: PipeHandler, communicator: CommunicatorPipe): + pipe_handler_pipe.close_all_communicators() + assert communicator.socket.closed is True + + def test_communicator_send_message(pipe_handler_pipe: PipeHandler, communicator: CommunicatorPipe): message = Message("rec", "send") pipe_handler_pipe._send_frames = MagicMock() # type: ignore[method-assign] From c0c29ad9d2352b1b1a49a054bbdc06821843d158 Mon Sep 17 00:00:00 2001 From: Benedikt Burger <67148916+BenediktBurger@users.noreply.github.com> Date: Wed, 13 Mar 2024 13:21:14 +0100 Subject: [PATCH 2/2] Improve test framework. --- tests/utils/test_pipe_handler.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tests/utils/test_pipe_handler.py b/tests/utils/test_pipe_handler.py index 960ed05e..685487a0 100644 --- a/tests/utils/test_pipe_handler.py +++ b/tests/utils/test_pipe_handler.py @@ -30,7 +30,8 @@ from pyleco.core.message import Message from pyleco.test import FakeContext -from pyleco.utils.pipe_handler import LockedMessageBuffer, PipeHandler, CommunicatorPipe +from pyleco.utils.pipe_handler import LockedMessageBuffer, PipeHandler, CommunicatorPipe,\ + PipeCommands cid = b"conversation_id;" # conversation_id header = b"".join((cid, b"mid", b"\x00")) @@ -124,9 +125,28 @@ def test_length_of_buffer(message_buffer: LockedMessageBuffer, length: int): assert len(message_buffer) == length +# Test CommunicatorPipe +class Test_CommunicatorPipe_send_pipe: + def test_send_pipe_message(self, communicator: CommunicatorPipe): + communicator.socket.send_multipart = MagicMock() # type: ignore[method-assign] + communicator._send_pipe_message(PipeCommands.LOCAL_COMMAND, b"abc") + # assert + communicator.socket.send_multipart.assert_called_once_with( + (PipeCommands.LOCAL_COMMAND, b"abc") + ) + + def test_raise_ConnectionError_on_zmq_error(self, communicator: CommunicatorPipe): + communicator.socket.send_multipart = MagicMock( # type: ignore[method-assign] + side_effect=zmq.ZMQError(128, "not a socket") + ) + # act + with pytest.raises(ConnectionRefusedError): + communicator._send_pipe_message(PipeCommands.LOCAL_COMMAND, b"c") + + # Test PipeHandler @pytest.fixture -def pipe_handler(): +def pipe_handler() -> PipeHandler: """With fake contexts, that is with a broken pipe.""" pipe_handler = PipeHandler(name="handler", context=FakeContext()) # type: ignore return pipe_handler @@ -144,7 +164,7 @@ def pipe_handler_pipe(): @pytest.fixture -def communicator(pipe_handler_pipe: PipeHandler): +def communicator(pipe_handler_pipe: PipeHandler) -> CommunicatorPipe: """Communicator of `pipe_handler_pipe`.""" return pipe_handler_pipe.get_communicator()