Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support firmware extensions #611

Draft
wants to merge 16 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bellows/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
cv_boolean,
)

CONF_BELLOWS_CONFIG = "bellows_config"
CONF_MANUAL_SOURCE_ROUTING = "manual_source_routing"

CONF_USE_THREAD = "use_thread"
CONF_EZSP_CONFIG = "ezsp_config"
CONF_EZSP_POLICIES = "ezsp_policies"
Expand All @@ -31,6 +34,12 @@
{vol.Optional(str): int}
),
vol.Optional(CONF_USE_THREAD, default=True): cv_boolean,
# The above config really should belong in here
vol.Optional(CONF_BELLOWS_CONFIG, default={}): vol.Schema(
{
vol.Optional(CONF_MANUAL_SOURCE_ROUTING, default=False): bool,
}
),
}
)

Expand Down
102 changes: 92 additions & 10 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import bellows.config as conf
from bellows.exception import EzspError, InvalidCommandError
from bellows.ezsp import xncp
from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig
from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType
import bellows.types as t
import bellows.uart

Expand Down Expand Up @@ -60,6 +62,7 @@ def __init__(self, device_config: dict):
self._callbacks = {}
self._ezsp_event = asyncio.Event()
self._ezsp_version = v4.EZSPv4.VERSION
self._xncp_features = FirmwareFeatures.NONE
self._gw = None
self._protocol = None

Expand Down Expand Up @@ -121,6 +124,7 @@ async def startup_reset(self) -> None:
await self.reset()

await self.version()
await self.get_xncp_features()

@classmethod
async def initialize(cls, zigpy_config: dict) -> EZSP:
Expand Down Expand Up @@ -172,13 +176,22 @@ async def version(self):
if ver != self.ezsp_version:
self._switch_protocol_version(ver)
await self._command("version", desiredProtocolVersion=ver)

LOGGER.debug(
"EZSP Stack Type: %s, Stack Version: %04x, Protocol version: %s",
("EZSP Stack Type: %s" ", Stack Version: %04x" ", Protocol version: %s"),
stack_type,
stack_version,
ver,
)

async def get_xncp_features(self) -> None:
try:
self._xncp_features = await self.xncp_get_supported_firmware_features()
except InvalidCommandError:
self._xncp_features = xncp.FirmwareFeatures.NONE

LOGGER.debug("XNCP features: %s", self._xncp_features)

def close(self):
self.stop_ezsp()
if self._gw:
Expand Down Expand Up @@ -324,11 +337,10 @@ async def get_board_info(
) -> tuple[str, str, str | None] | tuple[None, None, str | None]:
"""Return board info."""

tokens = {}
tokens: dict[t.EzspMfgTokenId, str | None] = {}

for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
(value,) = await self.getMfgToken(tokenId=token)
LOGGER.debug("Read %s token: %s", token.name, value)
for token_id in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
value = await self.get_mfg_token(token_id)

# Tokens are fixed-length and initially filled with \xFF but also can end
# with \x00
Expand All @@ -340,10 +352,7 @@ async def get_board_info(
except UnicodeDecodeError:
result = "0x" + value.hex().upper()

if not result:
result = None

tokens[token] = result
tokens[token_id] = result or None

(status, ver_info_bytes) = await self.getValue(
valueId=t.EzspValueId.VALUE_VERSION_INFO
Expand All @@ -358,6 +367,15 @@ async def get_board_info(
special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes)
version = f"{major}.{minor}.{patch}.{special} build {build}"

if xncp.FirmwareFeatures.BUILD_STRING in self._xncp_features:
try:
build_string = await self.xncp_get_build_string()
except InvalidCommandError:
build_string = None

if build_string:
version = f"{version} ({build_string})"

return (
tokens[t.EzspMfgTokenId.MFG_STRING],
tokens[t.EzspMfgTokenId.MFG_BOARD_NAME],
Expand Down Expand Up @@ -385,9 +403,23 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:

return None

async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes:
(value,) = await self.getMfgToken(tokenId=token)
LOGGER.debug("Read manufacturing token %s: %s", token.name, value)

override_value = None

if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features:
with contextlib.suppress(InvalidCommandError):
override_value = await self.xncp_get_mfg_token_override(token)

LOGGER.debug("XNCP override token %s: %s", token.name, override_value)

return override_value or value

async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None:
"""Get the custom EUI 64 manufacturing token, if it has a valid value."""
(data,) = await self.getMfgToken(tokenId=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)
data = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)

# Manufacturing tokens do not exist in RCP firmware: all reads are empty
if not data:
Expand Down Expand Up @@ -632,3 +664,53 @@ async def write_config(self, config: dict) -> None:
status,
)
continue

async def send_xncp_frame(
self, payload: xncp.XncpCommandPayload
) -> xncp.XncpCommandPayload:
"""Send an XNCP frame."""
req_frame = xncp.XncpCommand.from_payload(payload)
LOGGER.debug("Sending XNCP frame: %s", req_frame)
status, data = await self.customFrame(req_frame.serialize())

if status != t.EmberStatus.SUCCESS:
raise InvalidCommandError("XNCP is not supported")

rsp_frame = xncp.XncpCommand.from_bytes(data)
LOGGER.debug("Received XNCP frame: %s", rsp_frame)

if rsp_frame.status != t.EmberStatus.SUCCESS:
raise InvalidCommandError(f"XNCP response error: {rsp_frame.status}")

return rsp_frame.payload

async def xncp_get_supported_firmware_features(self) -> xncp.FirmwareFeatures:
"""Get supported firmware extensions."""
rsp = await self.send_xncp_frame(xncp.GetSupportedFeaturesReq())
return rsp.features

async def xncp_set_manual_source_route(
self, destination: t.NWK, route: list[t.NWK]
) -> None:
"""Set a manual source route."""
await self.send_xncp_frame(
xncp.SetSourceRouteReq(
destination=destination,
source_route=route,
)
)

async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes:
"""Get manufacturing token override."""
rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token))
return rsp.value

async def xncp_get_build_string(self) -> bytes:
"""Get build string."""
rsp = await self.send_xncp_frame(xncp.GetBuildStringReq())
return rsp.build_string.decode("utf-8")

async def xncp_get_flow_control_type(self) -> FlowControlType:
"""Get flow control type."""
rsp = await self.send_xncp_frame(xncp.GetFlowControlTypeReq())
return rsp.flow_control_type
170 changes: 170 additions & 0 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Custom EZSP commands."""
from __future__ import annotations

import dataclasses
import logging
from typing import Callable

import zigpy.types as t

from bellows.types import EmberStatus, EzspMfgTokenId

_LOGGER = logging.getLogger(__name__)

COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {}
REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {}


def register_command(command_id: XncpCommandId) -> Callable[[type], type]:
def decorator(cls: type) -> type:
COMMANDS[command_id] = cls
REV_COMMANDS[cls] = command_id
return cls

return decorator


class Bytes(bytes):
def serialize(self) -> Bytes:
return self

@classmethod
def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]:
return cls(data), b""

def __repr__(self) -> str:
# Reading byte sequences like \x200\x21 is extremely annoying
# compared to \x20\x30\x21
escaped = "".join(f"\\x{b:02X}" for b in self)

return f"b'{escaped}'"

__str__ = __repr__


class XncpCommandId(t.enum16):
GET_SUPPORTED_FEATURES_REQ = 0x0000
SET_SOURCE_ROUTE_REQ = 0x0001
GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002
GET_BUILD_STRING_REQ = 0x0003
GET_FLOW_CONTROL_TYPE_REQ = 0x0004

GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000
SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000
GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000
GET_BUILD_STRING_RSP = GET_BUILD_STRING_REQ | 0x8000
GET_FLOW_CONTROL_TYPE_RSP = GET_FLOW_CONTROL_TYPE_REQ | 0x8000

UNKNOWN = 0xFFFF


@dataclasses.dataclass
class XncpCommand:
command_id: XncpCommandId
status: EmberStatus
payload: XncpCommandPayload

@classmethod
def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand:
return cls(
command_id=REV_COMMANDS[type(payload)],
status=EmberStatus.SUCCESS,
payload=payload,
)

@classmethod
def from_bytes(cls, data: bytes) -> XncpCommand:
command_id, data = XncpCommandId.deserialize(data)
status, data = EmberStatus.deserialize(data)
payload, rest = COMMANDS[command_id].deserialize(data)

if rest:
_LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest)

return cls(command_id=command_id, status=status, payload=payload)

def serialize(self) -> Bytes:
return (
self.command_id.serialize()
+ self.status.serialize()
+ self.payload.serialize()
)


class FirmwareFeatures(t.bitmap32):
NONE = 0

# The firmware passes through all group traffic, regardless of group membership
MEMBER_OF_ALL_GROUPS = 1 << 0

# Source routes can be overridden by the application
MANUAL_SOURCE_ROUTE = 1 << 1

# The firmware supports overriding some manufacturing tokens
MFG_TOKEN_OVERRIDES = 1 << 2

# The firmware contains a free-form build string
BUILD_STRING = 1 << 3

# The flow control type (software or hardware) can be queried
FLOW_CONTROL_TYPE = 1 << 4


class XncpCommandPayload(t.Struct):
pass


class FlowControlType(t.enum8):
Software = 0x00
Hardware = 0x01


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ)
class GetSupportedFeaturesReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_RSP)
class GetSupportedFeaturesRsp(XncpCommandPayload):
features: FirmwareFeatures


@register_command(XncpCommandId.SET_SOURCE_ROUTE_REQ)
class SetSourceRouteReq(XncpCommandPayload):
destination: t.NWK
source_route: t.List[t.NWK]


@register_command(XncpCommandId.SET_SOURCE_ROUTE_RSP)
class SetSourceRouteRsp(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ)
class GetMfgTokenOverrideReq(XncpCommandPayload):
token: EzspMfgTokenId


@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP)
class GetMfgTokenOverrideRsp(XncpCommandPayload):
value: Bytes


@register_command(XncpCommandId.GET_BUILD_STRING_REQ)
class GetBuildStringReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BUILD_STRING_RSP)
class GetBuildStringRsp(XncpCommandPayload):
build_string: Bytes


@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_REQ)
class GetFlowControlTypeReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_RSP)
class GetFlowControlTypeRsp(XncpCommandPayload):
flow_control_type: FlowControlType
Loading
Loading