Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control Devices: respect "resolution" when reading "inputValue" #128

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions dali/device/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
DTR2,
EventScheme,
InstanceEventFilter,
QueryResolution,
QueryInputValue,
QueryInputValueLatch,
QueryEventFilterH,
QueryEventFilterL,
QueryEventFilterM,
Expand All @@ -23,6 +26,7 @@
SetEventScheme,
)
from dali.device.helpers import check_bad_rsp
from dali.exceptions import DALISequenceError


def SetEventSchemes(
Expand Down Expand Up @@ -225,3 +229,49 @@ def QueryEventFilters(
hi = rsp.value

return filter_type(int.from_bytes((lo, md, hi), "little"))


def query_input_value(
device: DeviceShort,
instance: InstanceNumber,
resolution: Optional[int] = None
) -> Generator[Command, Optional[Response], Optional[int]]:
"""
A generator sequence to retrieve full sensor value from a part-103 control device.
Use with an appropriate DALI driver instance, through the `run_sequence()`
method.

:param device: A DeviceShort address to target
:param instance: An InstanceNumber to target
:param resolution: Number of valid bits that the device provides
"""
# Although the proper types are expected, ints are common enough for
# addresses and their meaning is unambiguous in this context
if isinstance(device, int):
device = DeviceShort(device)
if isinstance(instance, int):
instance = InstanceNumber(instance)

if resolution is None:
resolution = yield QueryResolution(device, instance)
if check_bad_rsp(resolution):
raise DALISequenceError("query_input_value: QueryResolution failed")
resolution = resolution.value

value = yield QueryInputValue(device, instance)
if check_bad_rsp(value):
raise DALISequenceError("query_input_value: QueryInputValue failed")
value = value.value
while resolution > 8:
resolution -= 8
value <<= 8
chunk = yield QueryInputValueLatch(device, instance)
if check_bad_rsp(chunk):
raise DALISequenceError("query_input_value: QueryInputValueLatch failed")
value += chunk.value

if resolution > 0:
# Strip the repeated trailing bytes as per IEC 62386-103:2014, part 9.7.2
value >>= 8 - resolution

return value
94 changes: 94 additions & 0 deletions dali/tests/test_device_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
QueryEventFilterZeroToSeven,
QueryEventScheme,
QueryEventSchemeResponse,
QueryInputValue,
QueryInputValueLatch,
QueryResolution,
SetEventFilter,
)
from dali.device.helpers import DeviceInstanceTypeMapper, check_bad_rsp
Expand All @@ -19,6 +22,7 @@
QueryEventFilters,
SetEventFilters,
SetEventSchemes,
query_input_value,
)
from dali.frame import BackwardFrame, BackwardFrameError
from dali.tests import fakes
Expand Down Expand Up @@ -269,3 +273,93 @@ def test_event_filter_sequence_bad_type():
# filter_type needs to be a type, not an instance
with pytest.raises(TypeError):
sequence.send(None)


def test_query_input_values_unknown_1bit():
"""
Reading a device's input register which has a one-bit resolution, with
no prior information. Device sends 0xff, which is converted to 1.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=None,
)
rsp = None
# No resolution was given, so the first message sends out a query for that
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryResolution)
assert cmd.frame.as_byte_sequence[2] == 0x81 # QUERY_RESOLUTION
rsp = NumericResponse(BackwardFrame(1))

# Ask for the first byte
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)

rsp = NumericResponse(BackwardFrame(0xff))
ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value

assert ret == 1


def test_query_input_values_10bit():
"""
Reading a device's input register. The resolution is known upfront to be
10bit. The first byte is:

0x6c = 0110 1100

The second byte is:

0x9b = 1001 1011

And since only the first two bits are needed (10-8), the rest is just
repeated first byte, so the result is 0b0110110010 = 434.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=10,
)
rsp = None
# resolution is known upfront, so there will be no querying
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x6c))
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValueLatch)
assert cmd.frame.as_byte_sequence[2] == 0x8d # QUERY_INPUT_VALUE_LATCH
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x9b))

ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value
assert ret == 434
7 changes: 4 additions & 3 deletions examples/async-dalitest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
DeviceShort, DeviceBroadcast, InstanceNumber
import dali.gear.general as gg
import dali.device.general as dg
import dali.device.sequences as ds
from dali.gear import emergency
from dali.gear import led
from dali.sequences import QueryDeviceTypes, DALISequenceError
Expand Down Expand Up @@ -85,9 +86,9 @@ async def scan_control_devices(d):
for instance in (InstanceNumber(x) for x in range(instances.value)):
print(f" -{instance}- enabled: {await d.send(dg.QueryInstanceEnabled(addr, instance))}")
print(f" -{instance}- type: {await d.send(dg.QueryInstanceType(addr, instance))}")
print(f" -{instance}- resolution: {await d.send(dg.QueryResolution(addr, instance))}")
print(f" -{instance}- value: {await d.send(dg.QueryInputValue(addr, instance))}")
# XXX read remaining bytes of value
resolution = await d.send(dg.QueryResolution(addr, instance))
print(f" -{instance}- resolution: {resolution}")
print(f" -{instance}- value: {await d.run_sequence(ds.query_input_value(addr, instance, resolution.value))}")
print(f" -{instance}- feature type: {await d.send(dg.QueryFeatureType(addr, instance))}")
#for b in (info.BANK_0, oem.BANK_1):
# try:
Expand Down
Loading