Skip to content

Commit

Permalink
Merge pull request #66 from pymeasure/rework-pipe-handler
Browse files Browse the repository at this point in the history
Rework message buffer
  • Loading branch information
BenediktBurger authored Mar 12, 2024
2 parents 20d57c0 + 7850331 commit 6f4e1d5
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 172 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ _Use self defined objects instead of jsonrpc2-objects and jsonrpc2-pyclient._
- Move `pyleco.errors.CommunicationError` to `pyleco.json_utils.errors`.
- Deprecate `generate_error_with_data` in favor of `DataError.from_error` class method.
- Python requirement lowered to Python 3.8
- Rework the message buffer in the base communicator and harmonize with pipe handler's buffer.

### Added

Expand All @@ -20,6 +21,7 @@ _Use self defined objects instead of jsonrpc2-objects and jsonrpc2-pyclient._

### Deprecated
- Deprecate `pyleco.errors` in favor of `json_utils.errors` and `json_utils.json_objects`.
- Deprecate to use `CommunicatorPipe.buffer`, use `message_buffer` instead.


## [0.2.2] - 2024-02-14
Expand Down
73 changes: 51 additions & 22 deletions pyleco/utils/base_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,59 @@
NOT_SIGNED_IN_ERROR_CODE = str(NOT_SIGNED_IN.code).encode()


class MessageBuffer:
_messages: list[Message]
_requested_ids: set[bytes]

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._messages = []
self._requested_ids = set()

def add_conversation_id(self, conversation_id: bytes) -> None:
"""Add a conversation_id such that its response has to be recalled by name."""
self._requested_ids.add(conversation_id)

def remove_conversation_id(self, conversation_id: bytes) -> None:
"""Remove a conversation_id from the requested ids."""
self._requested_ids.discard(conversation_id)

def is_conversation_id_requested(self, conversation_id: bytes) -> bool:
"""Check whether this conversation_id is requested by someone."""
return conversation_id in self._requested_ids

def add_message(self, message: Message):
"""Add a message to the buffer."""
self._messages.append(message)

def retrieve_message(self, conversation_id: Optional[bytes] = None) -> Optional[Message]:
"""Retrieve the requested message or the next free one for `conversation_id=None`."""
for i, msg in enumerate(self._messages):
cid = msg.conversation_id
if conversation_id == cid:
self._requested_ids.discard(cid)
return self._messages.pop(i)
elif cid not in self._requested_ids and conversation_id is None:
return self._messages.pop(i)
return None

def __len__(self):
return len(self._messages)


class BaseCommunicator(CommunicatorProtocol, Protocol):
"""Abstract class of a Communicator with some logic.
"""

socket: zmq.Socket
_message_buffer: list[Message]
_requested_ids: set[bytes]
log: logging.Logger
namespace: Optional[str]
message_buffer: MessageBuffer

# Setup methods for call in init
def setup_message_buffer(self) -> None:
"""Create the message buffer variables."""
self._message_buffer = []
self._requested_ids = set()
self.message_buffer = MessageBuffer()

def close(self) -> None:
"""Close the connection."""
Expand Down Expand Up @@ -115,20 +153,9 @@ def finish_sign_out(self) -> None:
self.namespace = None

# Reading messages with buffer
def _find_buffer_message(self, conversation_id: Optional[bytes] = None) -> Optional[Message]:
"""Find a message in the buffer."""
for i, msg in enumerate(self._message_buffer):
cid = msg.conversation_id
if conversation_id == cid:
self._requested_ids.discard(cid)
return self._message_buffer.pop(i)
elif cid not in self._requested_ids and conversation_id is None:
return self._message_buffer.pop(i)
return None

def _read_socket_message(self, timeout: Optional[float] = None) -> Message:
"""Read the next message from the socket, without further processing."""
if self.socket.poll(int((timeout or self.timeout) * 1000)):
if self.socket.poll(int((timeout if timeout is not None else self.timeout) * 1000)):
return Message.from_frames(*self.socket.recv_multipart())
raise TimeoutError("Reading timed out")

Expand All @@ -139,18 +166,20 @@ def _find_socket_message(self, conversation_id: Optional[bytes] = None,
:param conversation_id: Conversation ID to filter for, or next free message if None.
"""
stop = perf_counter() + (timeout or self.timeout)
stop = perf_counter() + (timeout if timeout is not None else self.timeout)
while True:
msg = self._read_socket_message(timeout)
self.check_for_not_signed_in_error(message=msg)
cid = msg.conversation_id
if conversation_id == cid:
self._requested_ids.discard(cid)
self.message_buffer.remove_conversation_id(conversation_id=cid)
return msg
elif conversation_id is not None or cid in self._requested_ids:
self._message_buffer.append(msg)
else:
elif self.message_buffer.is_conversation_id_requested(conversation_id=cid):
self.message_buffer.add_message(msg)
elif conversation_id is None:
return msg
else:
self.message_buffer.add_message(msg)
if perf_counter() > stop:
# inside the loop to do it at least once, even if timeout is 0
break
Expand All @@ -165,7 +194,7 @@ def check_for_not_signed_in_error(self, message: Message) -> None:

def read_message(self, conversation_id: Optional[bytes] = None,
timeout: Optional[float] = None) -> Message:
message = self._find_buffer_message(conversation_id=conversation_id)
message = self.message_buffer.retrieve_message(conversation_id=conversation_id)
if message is None:
message = self._find_socket_message(conversation_id=conversation_id, timeout=timeout)
return message
Expand Down
3 changes: 1 addition & 2 deletions pyleco/utils/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ def __init__(
self.open()
self.name = name
self.namespace = None
self._message_buffer: list[Message] = []
self._requested_ids: set[bytes] = set()
self._last_beat: float = 0
self.rpc_generator = RPCGenerator()
super().__init__(**kwargs)
self.setup_message_buffer()

def open(self, context: Optional[zmq.Context] = None) -> None:
"""Open the connection."""
Expand Down
3 changes: 1 addition & 2 deletions pyleco/utils/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ def __init__(
self.name = name
self._namespace: Union[str, None] = None
self._full_name: str = name
self._message_buffer: list[Message] = []
self._requested_ids: set[bytes] = set()
self.rpc = RPCServer(title=name)
self.rpc_generator = RPCGenerator()
self.register_rpc_methods()
Expand All @@ -91,6 +89,7 @@ def __init__(
)

super().__init__(**kwargs)
self.setup_message_buffer()

@property
def namespace(self) -> Union[str, None]:
Expand Down
Loading

0 comments on commit 6f4e1d5

Please sign in to comment.