Skip to content

Commit

Permalink
Control Devices: respect "resolution" when reading "inputValue"
Browse files Browse the repository at this point in the history
The standard (62386-103:2014, chapter 9.7) states that the "resolution"
of an instance is not always 8-bits. If it's any longer, a proper read
involves a transaction and a sequence of commands. If the number is not
divisible by 8, extra trailing bits have to be truncated.

Tested on Lunatone DALI-2 MC (86459532-2) and Lunatone CS-2 (86458670).
The push button coupler is a part-301 device where the standard says
that the on-the-wire values are either 0x00 or 0xff, but the actual
resolution (and therefore the correct numeric value) is either 0 or 1.
The other device, a combined movement sensor with a light meter,
supports 11 bits of resolution for the lux meter and therefore it
requires a "QUERY INPUT VALUE" followed by a "QUERY INPUT VALUE LATCH"
and dropping the extra 5 bits.

The test case comes from Lunatone's doc "DALI-2 Devices for
Integration, DALI-2 Instances, DALI-2 Instance Mode".
  • Loading branch information
jktjkt authored and sde1000 committed May 3, 2024
1 parent 15bcee3 commit 73ba41f
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 3 deletions.
50 changes: 50 additions & 0 deletions dali/device/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
DTR2,
EventScheme,
InstanceEventFilter,
QueryResolution,
QueryInputValue,
QueryInputValueLatch,
QueryEventFilterH,
QueryEventFilterL,
QueryEventFilterM,
Expand All @@ -23,6 +26,7 @@
SetEventScheme,
)
from dali.device.helpers import check_bad_rsp
from dali.exceptions import DALISequenceError


def SetEventSchemes(
Expand Down Expand Up @@ -225,3 +229,49 @@ def QueryEventFilters(
hi = rsp.value

return filter_type(int.from_bytes((lo, md, hi), "little"))


def query_input_value(
device: DeviceShort,
instance: InstanceNumber,
resolution: Optional[int] = None
) -> Generator[Command, Optional[Response], Optional[int]]:
"""
A generator sequence to retrieve full sensor value from a part-103 control device.
Use with an appropriate DALI driver instance, through the `run_sequence()`
method.
:param device: A DeviceShort address to target
:param instance: An InstanceNumber to target
:param resolution: Number of valid bits that the device provides
"""
# Although the proper types are expected, ints are common enough for
# addresses and their meaning is unambiguous in this context
if isinstance(device, int):
device = DeviceShort(device)
if isinstance(instance, int):
instance = InstanceNumber(instance)

if resolution is None:
resolution = yield QueryResolution(device, instance)
if check_bad_rsp(resolution):
raise DALISequenceError("query_input_value: QueryResolution failed")
resolution = resolution.value

value = yield QueryInputValue(device, instance)
if check_bad_rsp(value):
raise DALISequenceError("query_input_value: QueryInputValue failed")
value = value.value
while resolution > 8:
resolution -= 8
value <<= 8
chunk = yield QueryInputValueLatch(device, instance)
if check_bad_rsp(chunk):
raise DALISequenceError("query_input_value: QueryInputValueLatch failed")
value += chunk.value

if resolution > 0:
# Strip the repeated trailing bytes as per IEC 62386-103:2014, part 9.7.2
value >>= 8 - resolution

return value
94 changes: 94 additions & 0 deletions dali/tests/test_device_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
QueryEventFilterZeroToSeven,
QueryEventScheme,
QueryEventSchemeResponse,
QueryInputValue,
QueryInputValueLatch,
QueryResolution,
SetEventFilter,
)
from dali.device.helpers import DeviceInstanceTypeMapper, check_bad_rsp
Expand All @@ -19,6 +22,7 @@
QueryEventFilters,
SetEventFilters,
SetEventSchemes,
query_input_value,
)
from dali.frame import BackwardFrame, BackwardFrameError
from dali.tests import fakes
Expand Down Expand Up @@ -269,3 +273,93 @@ def test_event_filter_sequence_bad_type():
# filter_type needs to be a type, not an instance
with pytest.raises(TypeError):
sequence.send(None)


def test_query_input_values_unknown_1bit():
"""
Reading a device's input register which has a one-bit resolution, with
no prior information. Device sends 0xff, which is converted to 1.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=None,
)
rsp = None
# No resolution was given, so the first message sends out a query for that
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryResolution)
assert cmd.frame.as_byte_sequence[2] == 0x81 # QUERY_RESOLUTION
rsp = NumericResponse(BackwardFrame(1))

# Ask for the first byte
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)

rsp = NumericResponse(BackwardFrame(0xff))
ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value

assert ret == 1


def test_query_input_values_10bit():
"""
Reading a device's input register. The resolution is known upfront to be
10bit. The first byte is:
0x6c = 0110 1100
The second byte is:
0x9b = 1001 1011
And since only the first two bits are needed (10-8), the rest is just
repeated first byte, so the result is 0b0110110010 = 434.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=10,
)
rsp = None
# resolution is known upfront, so there will be no querying
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x6c))
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValueLatch)
assert cmd.frame.as_byte_sequence[2] == 0x8d # QUERY_INPUT_VALUE_LATCH
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x9b))

ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value
assert ret == 434
7 changes: 4 additions & 3 deletions examples/async-dalitest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
DeviceShort, DeviceBroadcast, InstanceNumber
import dali.gear.general as gg
import dali.device.general as dg
import dali.device.sequences as ds
from dali.gear import emergency
from dali.gear import led
from dali.sequences import QueryDeviceTypes, DALISequenceError
Expand Down Expand Up @@ -85,9 +86,9 @@ async def scan_control_devices(d):
for instance in (InstanceNumber(x) for x in range(instances.value)):
print(f" -{instance}- enabled: {await d.send(dg.QueryInstanceEnabled(addr, instance))}")
print(f" -{instance}- type: {await d.send(dg.QueryInstanceType(addr, instance))}")
print(f" -{instance}- resolution: {await d.send(dg.QueryResolution(addr, instance))}")
print(f" -{instance}- value: {await d.send(dg.QueryInputValue(addr, instance))}")
# XXX read remaining bytes of value
resolution = await d.send(dg.QueryResolution(addr, instance))
print(f" -{instance}- resolution: {resolution}")
print(f" -{instance}- value: {await d.run_sequence(ds.query_input_value(addr, instance, resolution.value))}")
print(f" -{instance}- feature type: {await d.send(dg.QueryFeatureType(addr, instance))}")
#for b in (info.BANK_0, oem.BANK_1):
# try:
Expand Down

0 comments on commit 73ba41f

Please sign in to comment.