Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conbee III support #232

Merged
merged 22 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"voluptuous",
"zigpy>=0.54.1",
"zigpy>=0.60.0",
'async-timeout; python_version<"3.11"',
]

Expand Down
44 changes: 44 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,3 +949,47 @@ async def test_add_neighbour(api, mock_command_rsp):
mac_capability_flags=0x12,
)
]


async def test_cb3_device_state_callback_bug(api, mock_command_rsp):
mock_command_rsp(
command_id=deconz_api.CommandId.version,
params={"reserved": t.uint8_t(0)},
rsp={
"status": deconz_api.Status.SUCCESS,
"frame_length": t.uint16_t(9),
"version": deconz_api.FirmwareVersion(0x26450900),
},
replace=True,
)

await api.connect()

device_state = deconz_api.DeviceState(
network_state=deconz_api.NetworkState2.CONNECTED,
device_state=deconz_api.DeviceStateFlags.APSDE_DATA_CONFIRM,
)

assert api._device_state != device_state

_, rx_schema = deconz_api.COMMAND_SCHEMAS[deconz_api.CommandId.device_state]
api.data_received(
deconz_api.Command(
command_id=deconz_api.CommandId.device_state,
seq=api._seq,
payload=t.serialize_dict(
{
"status": deconz_api.Status.SUCCESS,
"frame_length": t.uint16_t(8),
"device_state": device_state,
"reserved1": t.uint8_t(0),
"reserved2": t.uint8_t(0),
},
rx_schema,
),
).serialize()
)

await asyncio.sleep(0.01)

assert api._device_state == device_state
185 changes: 81 additions & 104 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import zigpy.application
import zigpy.config
import zigpy.device
from zigpy.types import EUI64, Channels
from zigpy.types import EUI64, Channels, KeyData
import zigpy.zdo.types as zdo_t

from zigpy_deconz import types as t
Expand Down Expand Up @@ -41,6 +41,7 @@ def api():
return_value=deconz_api.DeviceState(deconz_api.NetworkState.CONNECTED)
)
api.write_parameter = AsyncMock()
api.firmware_version = deconz_api.FirmwareVersion(0x26580700)

# So the protocol version is effectively infinite
api._protocol_version.__ge__.return_value = True
Expand Down Expand Up @@ -112,7 +113,7 @@ def addr_nwk_and_ieee(nwk, ieee):
return addr


@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_WAIT", 0.001)
@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_POLL_TIME", 0.001)
@pytest.mark.parametrize(
"proto_ver, target_state, returned_state",
[
Expand Down Expand Up @@ -200,16 +201,12 @@ async def test_connect_failure(app):


async def test_disconnect(app):
reset_watchdog_task = app._reset_watchdog_task = MagicMock()
api_close = app._api.close = MagicMock()

await app.disconnect()

assert app._api is None
assert app._reset_watchdog_task is None

assert api_close.call_count == 1
assert reset_watchdog_task.cancel.call_count == 1


async def test_disconnect_no_api(app):
Expand All @@ -224,19 +221,34 @@ async def test_disconnect_close_error(app):
await app.disconnect()


async def test_permit_with_key_not_implemented(app):
with pytest.raises(NotImplementedError):
await app.permit_with_key(node=MagicMock(), code=b"abcdef")
async def test_permit_with_link_key(app):
app._api.write_parameter = AsyncMock()
app.permit = AsyncMock()

await app.permit_with_link_key(
node=t.EUI64.convert("00:11:22:33:44:55:66:77"),
link_key=KeyData.convert("aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd"),
)

assert app._api.write_parameter.mock_calls == [
mock.call(
deconz_api.NetworkParameter.link_key,
deconz_api.LinkKey(
ieee=t.EUI64.convert("00:11:22:33:44:55:66:77"),
key=KeyData.convert("aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd:aa:bb:cc:dd"),
),
)
]

assert app.permit.mock_calls == [mock.call(mock.ANY)]


async def test_deconz_dev_add_to_group(app, nwk, device_path):
group = MagicMock()
app._groups = MagicMock()
app._groups.add_group.return_value = group

deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
deconz.endpoints = {
0: sentinel.zdo,
1: sentinel.ep1,
Expand All @@ -254,9 +266,7 @@ async def test_deconz_dev_add_to_group(app, nwk, device_path):
async def test_deconz_dev_remove_from_group(app, nwk, device_path):
group = MagicMock()
app.groups[sentinel.grp_id] = group
deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
deconz.endpoints = {
0: sentinel.zdo,
1: sentinel.ep1,
Expand All @@ -268,38 +278,16 @@ async def test_deconz_dev_remove_from_group(app, nwk, device_path):


def test_deconz_props(nwk, device_path):
deconz = application.DeconzDevice(
deconz_api.FirmwareVersion(0), device_path, app, sentinel.ieee, nwk
)
deconz = application.DeconzDevice("Conbee II", app, sentinel.ieee, nwk)
assert deconz.manufacturer is not None
assert deconz.model is not None


@pytest.mark.parametrize(
"name, firmware_version, device_path",
[
("ConBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyUSB0"),
("ConBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyUSB0"),
("RaspBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyS0"),
("RaspBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyS0"),
("RaspBee", deconz_api.FirmwareVersion(0x00000500), "/dev/ttyAMA0"),
("RaspBee II", deconz_api.FirmwareVersion(0x00000700), "/dev/ttyAMA0"),
],
)
def test_deconz_name(nwk, name, firmware_version, device_path):
deconz = application.DeconzDevice(
firmware_version, device_path, app, sentinel.ieee, nwk
)
assert deconz.model == name


async def test_deconz_new(app, nwk, device_path, monkeypatch):
mock_init = AsyncMock()
monkeypatch.setattr(zigpy.device.Device, "_initialize", mock_init)

deconz = await application.DeconzDevice.new(
app, sentinel.ieee, nwk, deconz_api.FirmwareVersion(0), device_path
)
deconz = await application.DeconzDevice.new(app, sentinel.ieee, nwk, "Conbee II")
assert isinstance(deconz, application.DeconzDevice)
assert mock_init.call_count == 1
mock_init.reset_mock()
Expand All @@ -311,9 +299,7 @@ async def test_deconz_new(app, nwk, device_path, monkeypatch):
22: MagicMock(),
}
app.devices[sentinel.ieee] = mock_dev
deconz = await application.DeconzDevice.new(
app, sentinel.ieee, nwk, deconz_api.FirmwareVersion(0), device_path
)
deconz = await application.DeconzDevice.new(app, sentinel.ieee, nwk, "Conbee II")
assert isinstance(deconz, application.DeconzDevice)
assert mock_init.call_count == 0

Expand Down Expand Up @@ -346,18 +332,21 @@ def test_tx_confirm_unexpcted(app, caplog):

async def test_reset_watchdog(app):
"""Test watchdog."""
with patch.object(app._api, "write_parameter") as mock_api:
dog = asyncio.create_task(app._reset_watchdog())
await asyncio.sleep(0.3)
dog.cancel()
assert mock_api.call_count == 1
app._api.protocol_version = application.PROTO_VER_WATCHDOG
app._api.get_device_state = AsyncMock()
app._api.write_parameter = AsyncMock()

with patch.object(app._api, "write_parameter") as mock_api:
mock_api.side_effect = zigpy_deconz.exception.CommandError
dog = asyncio.create_task(app._reset_watchdog())
await asyncio.sleep(0.3)
dog.cancel()
assert mock_api.call_count == 1
await app._watchdog_feed()
assert len(app._api.get_device_state.mock_calls) == 1
assert len(app._api.write_parameter.mock_calls) == 1

app._api.protocol_version = application.PROTO_VER_WATCHDOG - 1
app._api.get_device_state.reset_mock()
app._api.write_parameter.reset_mock()

await app._watchdog_feed()
assert len(app._api.get_device_state.mock_calls) == 1
assert len(app._api.write_parameter.mock_calls) == 0


async def test_force_remove(app):
Expand Down Expand Up @@ -426,11 +415,8 @@ async def test_delayed_scan():
app.topology.scan.assert_called_once_with(devices=[coord])


@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_WAIT", 0.001)
@pytest.mark.parametrize("support_watchdog", [False, True])
async def test_change_network_state(app, support_watchdog):
app._reset_watchdog_task = MagicMock()

@patch("zigpy_deconz.zigbee.application.CHANGE_NETWORK_POLL_TIME", 0.001)
async def test_change_network_state(app):
app._api.get_device_state = AsyncMock(
side_effect=[
deconz_api.DeviceState(deconz_api.NetworkState.OFFLINE),
Expand All @@ -439,25 +425,11 @@ async def test_change_network_state(app, support_watchdog):
]
)

if support_watchdog:
app._api._protocol_version = application.PROTO_VER_WATCHDOG
app._api.protocol_version = application.PROTO_VER_WATCHDOG
else:
app._api._protocol_version = application.PROTO_VER_WATCHDOG - 1
app._api.protocol_version = application.PROTO_VER_WATCHDOG - 1

old_watchdog_task = app._reset_watchdog_task
cancel_mock = app._reset_watchdog_task.cancel = MagicMock()
app._api._protocol_version = application.PROTO_VER_WATCHDOG
app._api.protocol_version = application.PROTO_VER_WATCHDOG

await app._change_network_state(deconz_api.NetworkState.CONNECTED, timeout=0.01)

if support_watchdog:
assert cancel_mock.call_count == 1
assert app._reset_watchdog_task is not old_watchdog_task
else:
assert cancel_mock.call_count == 0
assert app._reset_watchdog_task is old_watchdog_task


ENDPOINT = zdo_t.SimpleDescriptor(
endpoint=None,
Expand Down Expand Up @@ -552,43 +524,14 @@ async def read_param(param_id, index):
)


@patch("zigpy_deconz.zigbee.application.asyncio.sleep", new_callable=AsyncMock)
@patch(
"zigpy_deconz.zigbee.application.ControllerApplication.initialize",
side_effect=[RuntimeError(), None],
)
@patch(
"zigpy_deconz.zigbee.application.ControllerApplication.connect",
side_effect=[RuntimeError(), None, None],
)
async def test_reconnect(mock_connect, mock_initialize, mock_sleep, app):
assert app._reconnect_task is None
app.connection_lost(RuntimeError())

assert app._reconnect_task is not None
await app._reconnect_task

assert mock_connect.call_count == 3
assert mock_initialize.call_count == 2


async def test_disconnect_during_reconnect(app):
assert app._reconnect_task is None
app.connection_lost(RuntimeError())
await asyncio.sleep(0)
await app.disconnect()

assert app._reconnect_task is None


async def test_reset_network_info(app):
app.form_network = AsyncMock()
await app.reset_network_info()

app.form_network.assert_called_once()


async def test_energy_scan(app):
async def test_energy_scan_conbee_2(app):
with mock.patch.object(
zigpy.application.ControllerApplication,
"energy_scan",
Expand All @@ -601,6 +544,40 @@ async def test_energy_scan(app):
assert results == {c: c * 3 for c in Channels.ALL_CHANNELS}


async def test_energy_scan_conbee_3(app):
app._api.firmware_version = deconz_api.FirmwareVersion(0x26580900)

type(app)._device = AsyncMock()

app._device.zdo.Mgmt_NWK_Update_req = AsyncMock(
side_effect=zigpy.exceptions.DeliveryError("error")
)

with pytest.raises(zigpy.exceptions.DeliveryError):
await app.energy_scan(channels=Channels.ALL_CHANNELS, duration_exp=0, count=1)

app._device.zdo.Mgmt_NWK_Update_req = AsyncMock(
side_effect=[
asyncio.TimeoutError(),
list(
{
"Status": zdo_t.Status.SUCCESS,
"ScannedChannels": Channels.ALL_CHANNELS,
"TotalTransmissions": 0,
"TransmissionFailures": 0,
"EnergyValues": [i for i in range(11, 26 + 1)],
}.values()
),
]
)

results = await app.energy_scan(
channels=Channels.ALL_CHANNELS, duration_exp=0, count=1
)

assert results == {c: c for c in Channels.ALL_CHANNELS}


async def test_channel_migration(app):
app._api.write_parameter = AsyncMock()
app._change_network_state = AsyncMock()
Expand Down
Loading
Loading