Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit request concurrency #173

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"zigpy>=0.60.0",
"zigpy>=0.60.2",
]

[tool.setuptools.packages.find]
Expand Down
37 changes: 22 additions & 15 deletions zigpy_xbee/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import serial
from zigpy.config import CONF_DEVICE_PATH, SCHEMA_DEVICE
from zigpy.datastructures import PriorityLock
from zigpy.exceptions import APIException, DeliveryError
import zigpy.types as t

Expand Down Expand Up @@ -289,6 +290,7 @@ def __init__(self, device_config: Dict[str, Any]) -> None:
self._cmd_mode_future: Optional[asyncio.Future] = None
self._reset: asyncio.Event = asyncio.Event()
self._running: asyncio.Event = asyncio.Event()
self._send_lock = PriorityLock()

@property
def reset_event(self):
Expand Down Expand Up @@ -333,20 +335,31 @@ def close(self):
self._uart.close()
self._uart = None

def _command(self, name, *args, mask_frame_id=False):
def _get_command_priority(self, name: str, *args) -> int:
return {
"tx_explicit": -1,
"remote_at": -1,
}.get(name, 0)

async def _command(self, name, *args, mask_frame_id=False):
"""Send API frame to the device."""
LOGGER.debug("Command %s %s", name, args)
if self._uart is None:
raise APIException("API is not running")
frame_id = 0 if mask_frame_id else self._seq
data, needs_response = self._api_frame(name, frame_id, *args)
self._uart.send(data)
future = None
if needs_response and frame_id:

async with self._send_lock(priority=self._get_command_priority(name)):
LOGGER.debug("Command %s %s", name, args)
frame_id = 0 if mask_frame_id else self._seq
data, needs_response = self._api_frame(name, frame_id, *args)
self._uart.send(data)

if not needs_response or not frame_id:
return

future = asyncio.Future()
self._awaiting[frame_id] = (future,)
self._seq = (self._seq % 255) + 1
return future
self._seq = (self._seq % 255) + 1

return await future

async def _remote_at_command(self, ieee, nwk, options, name, *args):
"""Execute AT command on a different XBee module in the network."""
Expand Down Expand Up @@ -597,9 +610,3 @@ async def _probe(self) -> None:
raise APIException("Failed to configure XBee for API mode")
finally:
self.close()

def __getattr__(self, item):
"""Handle supported command requests."""
if item in COMMAND_REQUESTS:
return functools.partial(self._command, item)
raise AttributeError(f"Unknown command {item}")
48 changes: 26 additions & 22 deletions zigpy_xbee/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,29 +302,31 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
"Cannot send a packet to a device without a known IEEE address"
)

send_req = self._api.tx_explicit(
long_addr,
short_addr,
packet.src_ep or 0,
packet.dst_ep or 0,
packet.cluster_id,
packet.profile_id,
packet.radius,
tx_opts,
packet.data.serialize(),
)

try:
v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS)
except asyncio.TimeoutError:
raise zigpy.exceptions.DeliveryError(
"Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE
async with self._limit_concurrency():
puddly marked this conversation as resolved.
Show resolved Hide resolved
send_req = self._api._command(
"tx_explicit",
long_addr,
short_addr,
packet.src_ep or 0,
packet.dst_ep or 0,
packet.cluster_id,
packet.profile_id,
packet.radius,
tx_opts,
packet.data.serialize(),
)

if v != TXStatus.SUCCESS:
raise zigpy.exceptions.DeliveryError(
f"Failed to deliver packet: {v!r}", status=v
)
try:
v = await asyncio.wait_for(send_req, timeout=TIMEOUT_TX_STATUS)
except asyncio.TimeoutError:
raise zigpy.exceptions.DeliveryError(
"Timeout waiting for ACK", status=TXStatus.NETWORK_ACK_FAILURE
)

if v != TXStatus.SUCCESS:
raise zigpy.exceptions.DeliveryError(
f"Failed to deliver packet: {v!r}", status=v
)

@zigpy.util.retryable_request()
def remote_at_command(
Expand Down Expand Up @@ -356,7 +358,9 @@ async def permit_with_link_key(
# Key type:
# 0 = Pre-configured Link Key (KY command of the joining device)
# 1 = Install Code With CRC (I? command of the joining device)
await self._api.register_joining_device(node, reserved, key_type, link_key)
await self._api._command(
"register_joining_device", node, reserved, key_type, link_key
)

def handle_modem_status(self, status):
"""Handle changed Modem Status of the device."""
Expand Down
Loading