Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

device/sequences: add Commissioning #138

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 128 additions & 5 deletions dali/device/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,40 @@
import types
from typing import Generator, Optional, Type

from dali.address import DeviceShort, InstanceNumber
from dali.address import DeviceShort, InstanceNumber, DeviceBroadcast, DeviceBroadcastUnaddressed
from dali.command import Command, Response
from dali.device.general import (
Compare,
DTR0,
DTR1,
DTR2,
EventScheme,
Initialise,
InstanceEventFilter,
QueryResolution,
QueryInputValue,
QueryInputValueLatch,
ProgramShortAddress,
QueryDeviceStatus,
QueryEventFilterH,
QueryEventFilterL,
QueryEventFilterM,
QueryEventScheme,
QueryEventSchemeResponse,
QueryInputValue,
QueryInputValueLatch,
QueryResolution,
Randomise,
SearchAddrH,
SearchAddrL,
SearchAddrM,
SetEventFilter,
SetEventScheme,
SetShortAddress,
Terminate,
VerifyShortAddress,
Withdraw,
)
from dali.device.helpers import check_bad_rsp
from dali.exceptions import DALISequenceError
from dali.exceptions import DALISequenceError, ProgramShortAddressFailure
from dali.sequences import progress, sleep


def SetEventSchemes(
Expand Down Expand Up @@ -275,3 +288,113 @@ def query_input_value(
value >>= 8 - resolution

return value

def _find_next(low, high):
yield SearchAddrH((high >> 16) & 0xff)
yield SearchAddrM((high >> 8) & 0xff)
yield SearchAddrL(high & 0xff)

r = yield Compare()

if low == high:
if r.value is True:
return "clash" if r.raw_value.error else low
return

if r.value is True:
midpoint = (low + high) // 2
res = yield from _find_next(low, midpoint)
if res is not None:
return res
return (yield from _find_next(midpoint + 1, high))


def Commissioning(available_addresses=None, readdress=False,
dry_run=False):
"""Assign short addresses to control gear

If available_addresses is passed, only the specified addresses
will be assigned; otherwise all short addresses are considered to
be available.

if "readdress" is set, all existing short addresses will be
cleared; otherwise, only control gear that is currently
unaddressed will have short addresses assigned.

If "dry_run" is set then no short addresses will actually be set.
This can be useful for testing.
"""
if available_addresses is None:
available_addresses = list(range(64))
else:
available_addresses = list(available_addresses)

if readdress:
if dry_run:
yield progress(message="dry_run is set: not deleting existing "
"short addresses")
else:
yield DTR0(255)
yield SetShortAddress(DeviceBroadcast())
else:
# We need to know which short addresses are already in use
for a in range(0, 64):
if a in available_addresses:
in_use = yield QueryDeviceStatus(DeviceShort(a))
if in_use.raw_value is not None:
available_addresses.remove(a)
yield progress(
message=f"Available addresses: {available_addresses}")

yield Terminate()
yield Initialise(0xff if readdress else 0x7f)

finished = False
# We loop here to cope with multiple devices picking the same
# random search address; when we discover that, we
# re-randomise and begin again. Devices that have already
# received addresses are unaffected.
while not finished:
yield Randomise()
# Randomise can take up to 100ms
yield sleep(0.1)

low = 0
high = 0xffffff

while low is not None:
yield progress(completed=low, size=high)
low = yield from _find_next(low, high)
if low == "clash":
yield progress(message="Multiple ballasts picked the same "
"random address; restarting")
break
if low is None:
finished = True
break
yield progress(
message=f"Ballast found at address {low:#x}")
if available_addresses:
new_addr = available_addresses.pop(0)
if dry_run:
yield progress(
message="Not programming short address "
f"{new_addr} because dry_run is set")
else:
yield progress(
message=f"Programming short address {new_addr}")
yield ProgramShortAddress(new_addr)
r = yield VerifyShortAddress(new_addr)
if r.value is not True:
raise ProgramShortAddressFailure(new_addr)
else:
yield progress(
message="Device found but no short addresses left")
yield Withdraw()
if low < high:
low = low + 1
else:
low = None
finished = True
yield Terminate()
yield progress(message="Addressing complete")
71 changes: 71 additions & 0 deletions dali/tests/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,17 @@ def __init__(
shortaddr: Optional[address.DeviceShort] = None,
groups: Optional[Iterable[address.DeviceGroup]] = None,
memory_banks: Optional[Iterable[Type[FakeMemoryBank]]] = (FakeDeviceBank0,),
random_preload: list[int] = [],
):
# Store parameters
self.shortaddr = shortaddr
self.groups = set(groups) if groups else set()
# Configure internal variables
self.randomaddr = frame.Frame(24)
self.searchaddr = frame.Frame(24)
self.random_preload = random_preload
self.initialising = False
self.withdrawn = False
self.dtr0: int = 0
self.dtr1: int = 0
self.dtr2: int = 0
Expand All @@ -504,6 +510,12 @@ def __init__(
raise ValueError(f"Duplicate memory bank {bank_number}")
self.memory_banks[bank_number] = fake_bank()

def _next_random_address(self):
if self.random_preload:
return self.random_preload.pop(0)
else:
return random.randrange(0, 0x1000000)

def valid_address(self, cmd: Command) -> bool:
"""Should we respond to this command?"""
if len(cmd.frame) != 24:
Expand All @@ -521,6 +533,13 @@ def valid_address(self, cmd: Command) -> bool:
if isinstance(cmd.destination, address.DeviceGroup):
return cmd.destination in self.groups

@property
def shortaddr_int(self):
if self.shortaddr is None:
return 0xff
else:
return self.shortaddr.address

def send(self, cmd: Command) -> Optional[int]:
# Reset enable_write_memory if command is not one of the memory
# writing commands, even if the command is not addressed to us
Expand Down Expand Up @@ -627,6 +646,58 @@ def send(self, cmd: Command) -> Optional[int]:
finally:
if not bank.nobble_dtr0_update:
self.dtr0 = min(self.dtr0 + 1, 255)
elif isinstance(cmd, device.general.SetShortAddress):
if self.dtr0 == 0xff:
self.shortaddr = None
elif (self.dtr0 & 1) == 1:
self.shortaddr = address.DeviceShort((self.dtr0 & 0x7e) >> 1)
elif isinstance(cmd, device.general.QueryMissingShortAddress):
if self.shortaddr is None:
return _yes
elif isinstance(cmd, device.general.QueryRandomAddressH):
return self.randomaddr[23:16]
elif isinstance(cmd, device.general.QueryRandomAddressM):
return self.randomaddr[15:8]
elif isinstance(cmd, device.general.QueryRandomAddressL):
return self.randomaddr[7:0]
elif isinstance(cmd, device.general.Terminate):
self.initialising = False
self.withdrawn = False
elif isinstance(cmd, device.general.Initialise):
if cmd.param == 0xff \
or (cmd.param == 0x7f and self.shortaddr is None) \
or (cmd.param == self.shortaddr):
self.initialising = True
self.withdrawn = False
# We don't implement the 15 minute timer
elif isinstance(cmd, device.general.Randomise):
self.randomaddr = frame.Frame(24, self._next_random_address())
elif isinstance(cmd, device.general.Compare):
if self.initialising \
and not self.withdrawn \
and self.randomaddr.as_integer <= self.searchaddr.as_integer:
return _yes
elif isinstance(cmd, device.general.Withdraw):
if self.initialising \
and self.randomaddr == self.searchaddr:
self.withdrawn = True
elif isinstance(cmd, device.general.SearchAddrH):
self.searchaddr[23:16] = cmd.param
elif isinstance(cmd, device.general.SearchAddrM):
self.searchaddr[15:8] = cmd.param
elif isinstance(cmd, device.general.SearchAddrL):
self.searchaddr[7:0] = cmd.param
elif isinstance(cmd, device.general.ProgramShortAddress):
if self.initialising \
and self.randomaddr == self.searchaddr:
if cmd.param == 255:
self.shortaddr = None
else:
self.shortaddr = address.DeviceShort(cmd.param)
elif isinstance(cmd, device.general.VerifyShortAddress):
if self.initialising \
and self.shortaddr_int == cmd.param:
return _yes

return None

Expand Down
37 changes: 37 additions & 0 deletions dali/tests/test_device_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SetEventFilters,
SetEventSchemes,
query_input_value,
Commissioning,
)
from dali.frame import BackwardFrame, BackwardFrameError
from dali.tests import fakes
Expand Down Expand Up @@ -363,3 +364,39 @@ def test_query_input_values_10bit():
except StopIteration as r:
ret = r.value
assert ret == 434

def _check_addresses(devices, expected=None):
if expected is None:
expected = list(range(len(devices)))
addresses = [g.shortaddr_int for g in devices]
addresses.sort()
assert addresses == expected

def test_commissioning():
devices = [fakes.Device() for _ in range(10)]
bus = fakes.Bus(devices)
bus.run_sequence(Commissioning())
_check_addresses(devices)

def test_commissioning_readdress():
devices = [fakes.Device(DeviceShort(x + 5)) for x in range(10)]
bus = fakes.Bus(devices)
bus.run_sequence(Commissioning(readdress=True))
_check_addresses(devices)

def test_commissioning_partial():
addresses = [DeviceShort(x) if x & 1 else None for x in range(10)]
devices = [fakes.Device(address) for address in addresses]
bus = fakes.Bus(devices)
bus.run_sequence(Commissioning())
_check_addresses(devices)

def test_commissioning_clash():
# (At least) one of the devices is going to pick the same
# "random" number as another the first time around!
randoms = list(range(0, 0xffffff, 0x82000))
randoms[8] = randoms[4]
gear = [fakes.Device(random_preload=[x]) for x in randoms]
bus = fakes.Bus(gear)
bus.run_sequence(Commissioning())
_check_addresses(gear)
Loading