diff --git a/pyproject.toml b/pyproject.toml index 1544f76..fba512b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ readme = "README.md" license = {text = "GPL-3.0"} requires-python = ">=3.8" dependencies = [ - "zigpy>=0.60.0", + "zigpy>=0.60.2", ] [tool.setuptools.packages.find] diff --git a/zigpy_xbee/api.py b/zigpy_xbee/api.py index b4a73da..ffad765 100644 --- a/zigpy_xbee/api.py +++ b/zigpy_xbee/api.py @@ -8,6 +8,7 @@ import serial from zigpy.config import CONF_DEVICE_PATH, SCHEMA_DEVICE +from zigpy.datastructures import PriorityLock from zigpy.exceptions import APIException, DeliveryError import zigpy.types as t @@ -289,6 +290,7 @@ def __init__(self, device_config: Dict[str, Any]) -> None: self._cmd_mode_future: Optional[asyncio.Future] = None self._reset: asyncio.Event = asyncio.Event() self._running: asyncio.Event = asyncio.Event() + self._send_lock = PriorityLock() @property def reset_event(self): @@ -333,20 +335,31 @@ def close(self): self._uart.close() self._uart = None - def _command(self, name, *args, mask_frame_id=False): + def _get_command_priority(self, name: str, *args) -> int: + return { + "tx_explicit": -1, + "remote_at": -1, + }.get(name, 0) + + async def _command(self, name, *args, mask_frame_id=False): """Send API frame to the device.""" - LOGGER.debug("Command %s %s", name, args) if self._uart is None: raise APIException("API is not running") - frame_id = 0 if mask_frame_id else self._seq - data, needs_response = self._api_frame(name, frame_id, *args) - self._uart.send(data) - future = None - if needs_response and frame_id: + + async with self._send_lock(priority=self._get_command_priority(name)): + LOGGER.debug("Command %s %s", name, args) + frame_id = 0 if mask_frame_id else self._seq + data, needs_response = self._api_frame(name, frame_id, *args) + self._uart.send(data) + + if not needs_response or not frame_id: + return + future = asyncio.Future() self._awaiting[frame_id] = (future,) - self._seq = (self._seq % 255) + 1 - return future + self._seq = (self._seq % 255) + 1 + + return await future async def _remote_at_command(self, ieee, nwk, options, name, *args): """Execute AT command on a different XBee module in the network.""" @@ -597,9 +610,3 @@ async def _probe(self) -> None: raise APIException("Failed to configure XBee for API mode") finally: self.close() - - def __getattr__(self, item): - """Handle supported command requests.""" - if item in COMMAND_REQUESTS: - return functools.partial(self._command, item) - raise AttributeError(f"Unknown command {item}") diff --git a/zigpy_xbee/zigbee/application.py b/zigpy_xbee/zigbee/application.py index 2158b95..55f0ffe 100644 --- a/zigpy_xbee/zigbee/application.py +++ b/zigpy_xbee/zigbee/application.py @@ -302,29 +302,31 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: "Cannot send a packet to a device without a known IEEE address" ) - send_req = self._api.tx_explicit( - long_addr, - short_addr, - packet.src_ep or 0, - packet.dst_ep or 0, - packet.cluster_id, - packet.profile_id, - packet.radius, - tx_opts, - packet.data.serialize(), - ) - - try: - v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS) - except asyncio.TimeoutError: - raise zigpy.exceptions.DeliveryError( - "Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE + async with self._limit_concurrency(): + send_req = self._api._command( + "tx_explicit", + long_addr, + short_addr, + packet.src_ep or 0, + packet.dst_ep or 0, + packet.cluster_id, + packet.profile_id, + packet.radius, + tx_opts, + packet.data.serialize(), ) - if v != TXStatus.SUCCESS: - raise zigpy.exceptions.DeliveryError( - f"Failed to deliver packet: {v!r}", status=v - ) + try: + v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS) + except asyncio.TimeoutError: + raise zigpy.exceptions.DeliveryError( + "Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE + ) + + if v != TXStatus.SUCCESS: + raise zigpy.exceptions.DeliveryError( + f"Failed to deliver packet: {v!r}", status=v + ) @zigpy.util.retryable_request() def remote_at_command( @@ -356,7 +358,9 @@ async def permit_with_link_key( # Key type: # 0 = Pre-configured Link Key (KY command of the joining device) # 1 = Install Code With CRC (I? command of the joining device) - await self._api.register_joining_device(node, reserved, key_type, link_key) + await self._api._command( + "register_joining_device", node, reserved, key_type, link_key + ) def handle_modem_status(self, status): """Handle changed Modem Status of the device."""