Skip to content

Commit

Permalink
Use zigpy PriorityLock and deprioritize TX commands
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Dec 15, 2023
1 parent 49def51 commit 6059c1e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"zigpy>=0.60.0",
"zigpy>=0.60.2",
]

[tool.setuptools.packages.find]
Expand Down
54 changes: 32 additions & 22 deletions zigpy_xbee/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import serial
from zigpy.config import CONF_DEVICE_PATH, SCHEMA_DEVICE
from zigpy.datastructures import PriorityLock
from zigpy.exceptions import APIException, DeliveryError
import zigpy.types as t

Expand Down Expand Up @@ -289,7 +290,7 @@ def __init__(self, device_config: Dict[str, Any]) -> None:
self._cmd_mode_future: Optional[asyncio.Future] = None
self._reset: asyncio.Event = asyncio.Event()
self._running: asyncio.Event = asyncio.Event()
self._send_lock = asyncio.Lock()
self._send_lock = PriorityLock()

@property
def reset_event(self):
Expand Down Expand Up @@ -334,33 +335,43 @@ def close(self):
self._uart.close()
self._uart = None

def _command(self, name, *args, mask_frame_id=False):
def _get_command_priority(self, name: str, *args) -> int:
return {
"tx_explicit": -1,
"remote_at": -1,
}.get(name, 0)

async def _command(self, name, *args, mask_frame_id=False):
"""Send API frame to the device."""
LOGGER.debug("Command %s %s", name, args)
if self._uart is None:
raise APIException("API is not running")
frame_id = 0 if mask_frame_id else self._seq
data, needs_response = self._api_frame(name, frame_id, *args)
self._uart.send(data)
future = None
if needs_response and frame_id:

async with self._send_lock(priority=self._get_command_priority(name)):
LOGGER.debug("Command %s %s", name, args)
frame_id = 0 if mask_frame_id else self._seq
data, needs_response = self._api_frame(name, frame_id, *args)
self._uart.send(data)

if not needs_response or not frame_id:
return

future = asyncio.Future()
self._awaiting[frame_id] = (future,)
self._seq = (self._seq % 255) + 1
return future
self._seq = (self._seq % 255) + 1

return await future

async def _remote_at_command(self, ieee, nwk, options, name, *args):
"""Execute AT command on a different XBee module in the network."""
LOGGER.debug("Remote AT command: %s %s", name, args)
data = t.serialize(args, (AT_COMMANDS[name],))
try:
async with self._send_lock:
return await asyncio.wait_for(
self._command(
"remote_at", ieee, nwk, options, name.encode("ascii"), data
),
timeout=REMOTE_AT_COMMAND_TIMEOUT,
)
return await asyncio.wait_for(
self._command(
"remote_at", ieee, nwk, options, name.encode("ascii"), data
),
timeout=REMOTE_AT_COMMAND_TIMEOUT,
)
except asyncio.TimeoutError:
LOGGER.warning("No response to %s command", name)
raise
Expand All @@ -369,11 +380,10 @@ async def _at_partial(self, cmd_type, name, *args):
LOGGER.debug("%s command: %s %s", cmd_type, name, args)
data = t.serialize(args, (AT_COMMANDS[name],))
try:
async with self._send_lock:
return await asyncio.wait_for(
self._command(cmd_type, name.encode("ascii"), data),
timeout=AT_COMMAND_TIMEOUT,
)
return await asyncio.wait_for(
self._command(cmd_type, name.encode("ascii"), data),
timeout=AT_COMMAND_TIMEOUT,
)
except asyncio.TimeoutError:
LOGGER.warning("%s: No response to %s command", cmd_type, name)
raise
Expand Down

0 comments on commit 6059c1e

Please sign in to comment.