diff --git a/dali/device/sequences.py b/dali/device/sequences.py index 3d3f0ef..acf0c0c 100644 --- a/dali/device/sequences.py +++ b/dali/device/sequences.py @@ -14,6 +14,9 @@ DTR2, EventScheme, InstanceEventFilter, + QueryResolution, + QueryInputValue, + QueryInputValueLatch, QueryEventFilterH, QueryEventFilterL, QueryEventFilterM, @@ -23,6 +26,7 @@ SetEventScheme, ) from dali.device.helpers import check_bad_rsp +from dali.exceptions import DALISequenceError def SetEventSchemes( @@ -225,3 +229,49 @@ def QueryEventFilters( hi = rsp.value return filter_type(int.from_bytes((lo, md, hi), "little")) + + +def query_input_value( + device: DeviceShort, + instance: InstanceNumber, + resolution: Optional[int] = None +) -> Generator[Command, Optional[Response], Optional[int]]: + """ + A generator sequence to retrieve full sensor value from a part-103 control device. + Use with an appropriate DALI driver instance, through the `run_sequence()` + method. + + :param device: A DeviceShort address to target + :param instance: An InstanceNumber to target + :param resolution: Number of valid bits that the device provides + """ + # Although the proper types are expected, ints are common enough for + # addresses and their meaning is unambiguous in this context + if isinstance(device, int): + device = DeviceShort(device) + if isinstance(instance, int): + instance = InstanceNumber(instance) + + if resolution is None: + resolution = yield QueryResolution(device, instance) + if check_bad_rsp(resolution): + raise DALISequenceError("query_input_value: QueryResolution failed") + resolution = resolution.value + + value = yield QueryInputValue(device, instance) + if check_bad_rsp(value): + raise DALISequenceError("query_input_value: QueryInputValue failed") + value = value.value + while resolution > 8: + resolution -= 8 + value <<= 8 + chunk = yield QueryInputValueLatch(device, instance) + if check_bad_rsp(chunk): + raise DALISequenceError("query_input_value: QueryInputValueLatch failed") + value += chunk.value + + if resolution > 0: + # Strip the repeated trailing bytes as per IEC 62386-103:2014, part 9.7.2 + value >>= 8 - resolution + + return value diff --git a/dali/tests/test_device_sequences.py b/dali/tests/test_device_sequences.py index 6b57628..d5456ca 100644 --- a/dali/tests/test_device_sequences.py +++ b/dali/tests/test_device_sequences.py @@ -11,6 +11,9 @@ QueryEventFilterZeroToSeven, QueryEventScheme, QueryEventSchemeResponse, + QueryInputValue, + QueryInputValueLatch, + QueryResolution, SetEventFilter, ) from dali.device.helpers import DeviceInstanceTypeMapper, check_bad_rsp @@ -19,6 +22,7 @@ QueryEventFilters, SetEventFilters, SetEventSchemes, + query_input_value, ) from dali.frame import BackwardFrame, BackwardFrameError from dali.tests import fakes @@ -269,3 +273,93 @@ def test_event_filter_sequence_bad_type(): # filter_type needs to be a type, not an instance with pytest.raises(TypeError): sequence.send(None) + + +def test_query_input_values_unknown_1bit(): + """ + Reading a device's input register which has a one-bit resolution, with + no prior information. Device sends 0xff, which is converted to 1. + """ + sequence = query_input_value( + device=DeviceShort(0), + instance=InstanceNumber(0), + resolution=None, + ) + rsp = None + # No resolution was given, so the first message sends out a query for that + try: + cmd = sequence.send(rsp) + except StopIteration: + raise RuntimeError() + assert isinstance(cmd, QueryResolution) + assert cmd.frame.as_byte_sequence[2] == 0x81 # QUERY_RESOLUTION + rsp = NumericResponse(BackwardFrame(1)) + + # Ask for the first byte + try: + cmd = sequence.send(rsp) + except StopIteration: + raise RuntimeError() + assert isinstance(cmd, QueryInputValue) + assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE + assert cmd.destination == DeviceShort(0) + assert cmd.instance == InstanceNumber(0) + + rsp = NumericResponse(BackwardFrame(0xff)) + ret = None + try: + cmd = sequence.send(rsp) + raise RuntimeError() + except StopIteration as r: + ret = r.value + + assert ret == 1 + + +def test_query_input_values_10bit(): + """ + Reading a device's input register. The resolution is known upfront to be + 10bit. The first byte is: + + 0x6c = 0110 1100 + + The second byte is: + + 0x9b = 1001 1011 + + And since only the first two bits are needed (10-8), the rest is just + repeated first byte, so the result is 0b0110110010 = 434. + """ + sequence = query_input_value( + device=DeviceShort(0), + instance=InstanceNumber(0), + resolution=10, + ) + rsp = None + # resolution is known upfront, so there will be no querying + try: + cmd = sequence.send(rsp) + except StopIteration: + raise RuntimeError() + assert isinstance(cmd, QueryInputValue) + assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE + assert cmd.destination == DeviceShort(0) + assert cmd.instance == InstanceNumber(0) + rsp = NumericResponse(BackwardFrame(0x6c)) + try: + cmd = sequence.send(rsp) + except StopIteration: + raise RuntimeError() + assert isinstance(cmd, QueryInputValueLatch) + assert cmd.frame.as_byte_sequence[2] == 0x8d # QUERY_INPUT_VALUE_LATCH + assert cmd.destination == DeviceShort(0) + assert cmd.instance == InstanceNumber(0) + rsp = NumericResponse(BackwardFrame(0x9b)) + + ret = None + try: + cmd = sequence.send(rsp) + raise RuntimeError() + except StopIteration as r: + ret = r.value + assert ret == 434 diff --git a/examples/async-dalitest.py b/examples/async-dalitest.py index 750c402..8e53177 100755 --- a/examples/async-dalitest.py +++ b/examples/async-dalitest.py @@ -6,6 +6,7 @@ DeviceShort, DeviceBroadcast, InstanceNumber import dali.gear.general as gg import dali.device.general as dg +import dali.device.sequences as ds from dali.gear import emergency from dali.gear import led from dali.sequences import QueryDeviceTypes, DALISequenceError @@ -85,9 +86,9 @@ async def scan_control_devices(d): for instance in (InstanceNumber(x) for x in range(instances.value)): print(f" -{instance}- enabled: {await d.send(dg.QueryInstanceEnabled(addr, instance))}") print(f" -{instance}- type: {await d.send(dg.QueryInstanceType(addr, instance))}") - print(f" -{instance}- resolution: {await d.send(dg.QueryResolution(addr, instance))}") - print(f" -{instance}- value: {await d.send(dg.QueryInputValue(addr, instance))}") - # XXX read remaining bytes of value + resolution = await d.send(dg.QueryResolution(addr, instance)) + print(f" -{instance}- resolution: {resolution}") + print(f" -{instance}- value: {await d.run_sequence(ds.query_input_value(addr, instance, resolution.value))}") print(f" -{instance}- feature type: {await d.send(dg.QueryFeatureType(addr, instance))}") #for b in (info.BANK_0, oem.BANK_1): # try: