From 0ebccd7ed46f2a6bb581f1f4c9c8937181f43ea2 Mon Sep 17 00:00:00 2001 From: Olivier Pieters Date: Sun, 12 Nov 2023 12:06:10 +0100 Subject: [PATCH] Add driver for Lunatune SCI RS232. This is a serial DALI interface, and follows the already existing implementation for the Lunatone LUBA serial driver. --- dali/driver/serial.py | 599 ++++++++++++++++++++++++++++++++++++++- dali/tests/test_dummy.py | 3 +- 2 files changed, 598 insertions(+), 4 deletions(-) diff --git a/dali/driver/serial.py b/dali/driver/serial.py index 12ae267..ac17a02 100644 --- a/dali/driver/serial.py +++ b/dali/driver/serial.py @@ -1,6 +1,7 @@ + """ serial.py - Driver for serial-based DALI interfaces, including the -Lunatone RS232 LUBA device +Lunatone RS232 LUBA and Lunatone SCI RS232 devices. This file is part of python-dali. @@ -441,7 +442,7 @@ async def wait_dali_raw_response(self) -> int: """ return await self._queue_rx_raw_dali.get() - def reset_dali_raw_response(self) -> None: + def reset_dali_response(self) -> None: """ Forces the queue of received DALI responses to be cleared, logging any responses that are dropped if the queue is not empty @@ -1039,7 +1040,7 @@ async def send( try: # Make sure the received command buffer is empty, so that an # unexpected response can't accidentally be used - self._protocol.reset_dali_raw_response() + self._protocol.reset_dali_response() await self._protocol.send_dali_command(msg) if msg.is_query: response = command.Response(None) @@ -1073,3 +1074,595 @@ async def send( def new_dali_rx_queue(self) -> DistributorQueue: return DistributorQueue(self._protocol.queue_rx_dali) + + + +class DriverSCIRS232(DriverSerialBase): + uri_scheme = "scirs232" + timeout_rx = 0.03 + timeout_tx_confirm = 0.1 + timeout_connect = 1.0 + + class SCIRS232Code(Enum): + """ + All supported SCI mode codes and status codes. Refer Lunatone's + documentation: + hhttps://www.lunatone.com/wp-content/uploads/2018/03/22176438-HS_DALI_SCI_RS232_EN_D0045.pdf + """ + STATUS_OK = 0x0 + STATUS_DALI_NO = 0x1 + SEND_DALI_8 = 0x2 + SEND_DALI_16 = 0x3 + SEND_EDALI = 0x4 + SEND_DSI = 0x5 + SEND_DALI_17 = 0x6 + ERROR = 0x7 + SEND_DALI2_24 = 0x8 + + class SCIRS232DeviceReply(NamedTuple): + """ + Named tuple for storing a set of information about the SCI RS232 device. + + This information is sent in every frame, but updates are ignored after + initialisation of the software. + """ + + id: int + code: int + + class SCIRS232DeviceSettings(NamedTuple): + """ + Named tuple for storing a set of information about the SCI RS232 device. + + These are sent on a per-frame basis, so the device is state-less with + respect to the DALI receive and transmit parameters. + """ + + monitor_enable: bool + identify : bool + echo:bool + + class SCIRS232Protocol(asyncio.Protocol): + """ + This class is internally used by DriverSCIRS232 to implement a state + machine for decoding the incoming serial bytes into SCI RS232 messages, + which in turn wrap DALI frames. The class also handles encoding DALI + frames into SCI RS232 messages, setting the appropriate flags etc. + """ + + MAX_LEN = 5 + CONTROL_ME_MASK = 0b10000000 + CONTROL_IDENTIFY_MASK = 0b01000000 + CONTROL_ECHO_MASK = 0b00100000 + CONTROL_SEND_TWICE_MASK = 0b00010000 + CONTROL_MODE_MASK = 0b00001111 + + STATUS_ID_MASK = 0b11110000 + STATUS_CODE_MASK = 0b00001111 + + class ReadState(Enum): + """ + Enum of states used in the receiver state machine + """ + + WAIT_STATUS = 1 + WAIT_DATA_HI = 2 + WAIT_DATA_MI = 3 + WAIT_DATA_LO = 4 + WAIT_CHECKSUM = 5 + + class ErrorType(Enum): + CHECKSUM = 1 + DALI_BUS_SHORT_CIRCUIT = 2 + DALI_RX_ERROR = 3 + UNKNOWN_COMMAND = 4 + COLLISION_DETECTED = 5 + + class SCIRS232MsgTxConf(NamedTuple): + """ + Named tuple used to enqueue messages + """ + + message: Optional[command.Command] = None + + def __init__(self) -> None: + super().__init__() + self.transport = None + + self._queue_rx_dali = DistributorQueue() + self._queue_rx_raw_dali = asyncio.Queue() + self._queue_rx_info = asyncio.Queue() + self._prev_rx_enable_dt = 0 + self._prev_tx_enable_dt = 0 + self._tx_lock = asyncio.Lock() + self._rx_state = None + self.rx_idle = asyncio.Event() + self._buffer = None + self._rx_expected_len = None + self._rx_received_len = None + self._connected = asyncio.Event() + self._dev_info: Optional[DriverSCIRS232.SCIRS232DeviceReply] = None + self._dev_inst_map: Optional[DeviceInstanceTypeMapper] = None + self._device_settings = DriverSCIRS232.SCIRS232DeviceSettings( + monitor_enable=True, + identify=False, + echo=True) + + self.reset() + + @property + def rx_state(self) -> ReadState: + return self._rx_state + + @rx_state.setter + def rx_state(self, state: ReadState): + if not isinstance(state, self.ReadState): + raise TypeError( + f"rx_state must be a ReadState enum, not {type(state)}" + ) + + self._rx_state = state + if state == self.ReadState.WAIT_STATUS: + self.rx_idle.set() + else: + self.rx_idle.clear() + + @property + def dev_inst_map(self) -> Optional[DeviceInstanceTypeMapper]: + return self._dev_inst_map + + @dev_inst_map.setter + def dev_inst_map(self, value: DeviceInstanceTypeMapper): + self._dev_inst_map = value + + @property + def queue_rx_dali(self) -> DistributorQueue: + return self._queue_rx_dali + + def reset(self): + """ + Returns the state machine to "WAIT_STATUS" + """ + self.rx_state = self.ReadState.WAIT_STATUS + self._buffer = [None] * self.MAX_LEN + self._rx_expected_len = None + self._rx_received_len = 0 + + async def wait_dali_raw_response(self) -> int: + """ + Async method which waits for a raw (i.e. un-decoded) DALI frame + to be received from the SCI RS232 device. + + :return: A received DALI frame, as an int + """ + return await self._queue_rx_raw_dali.get() + + def reset_dali_response(self) -> None: + """ + Forces the queue of received DALI responses to be cleared, logging + any responses that are dropped if the queue is not empty + """ + + # remove backward frames + qlen = self._queue_rx_raw_dali.qsize() + if qlen: + _LOG.critical( + f"SCI RS232 RX DALI queue not empty! {qlen} items in queue!" + ) + try: + item = self._queue_rx_raw_dali.get_nowait() + _LOG.critical(f"SCI RS232 RX DALI queue discarding: {item}") + except asyncio.QueueEmpty: + pass + + # remove information frames (includes errors and sent confirmations) + qlen = self._queue_rx_info.qsize() + if qlen: + _LOG.critical( + f"SCI RS232 RX info DALI queue not empty! {qlen} items in queue!" + ) + try: + item = self._queue_rx_raw_dali.get_nowait() + _LOG.critical(f"SCI RS232 RX info DALI queue discarding: {item}") + except asyncio.QueueEmpty: + pass + + @staticmethod + def _insert_checksum(in_ints: list[int]) -> None: + in_ints[-1] = reduce(xor, in_ints[0:-1]) + + async def send_dali_command(self, tx: command.Command) -> None: + """ + Sends a variable length DALI command (16 or 24 bits), waiting + until the SCI RS232 device confirms it has sent the message before + returning the frame ID. + + :param tx: A single DALI command to send + """ + # Make sure the serial interface is not in the process of reading + # data before we send + await self.rx_idle.wait() + + dali_ints = tx.frame.as_byte_sequence + if not len(dali_ints) in (1, 2, 3): + raise ValueError( + f"Only works with 8, 16 or 24 bit messages, not {8*len(dali_ints)}" + ) + + control_byte = (self._device_settings.monitor_enable << 7) | (self._device_settings.identify << 6) | (self._device_settings.echo << 5) | (tx.sendtwice << 4) + if len(dali_ints) == 1: + control_byte |= 2 + elif len(dali_ints) == 2: + control_byte |= 3 + elif len(dali_ints) == 3: + control_byte |= 8 + + tx_ints = [ + control_byte, + dali_ints[0], + 0 if len(dali_ints) < 2 else dali_ints[1], + 0 if len(dali_ints) < 3 else dali_ints[2], + None, # Checksum + ] + # Fill in the checksum + self._insert_checksum(tx_ints) + + # Use a mutex to ensure only one message is sent at a time, + # waiting for the SCI RS232 device to confirm before sending another + async with self._tx_lock: + _LOG.debug(f"DALI sending message: {tx}") + _LOG.trace( + f"SCI RS232 frame to send: {[f'0x{data:02x}' for data in tx_ints]}" + ) + self.transport.write(bytearray(tx_ints)) + + confirm = await asyncio.wait_for( + self._queue_rx_info.get(), + timeout=DriverSCIRS232.timeout_tx_confirm, + ) + if isinstance(confirm, DriverSCIRS232.SCIRS232DeviceReply): + _LOG.trace(f"SCI RS232 confirmed data with code {confirm.code}") + else: + _LOG.error(f"Received unexpected confirmation object {confirm}") + return + + async def send_device_info_query(self) -> None: + """ + Query some basic information from the SCI RS232 device + """ + # Use a mutex to ensure only one message is sent at a time + async with self._tx_lock: + _LOG.debug("Querying SCI RS232 device info") + tx_ints = [ + 0b11000010, # enable monitoring and identify + 0, + 0, + 0, + None, # Checksum + ] + # Fill in the checksum + self._insert_checksum(tx_ints) + + # empty queue (just in case) + while not self._queue_rx_info.empty(): + dev_info = self._queue_rx_info.get_nowait() + _LOG.warning(f"SCI RS232 info queue not empty, discarting: {dev_info}") + + _LOG.trace( + f"SCI RS232 frame to send: {[f'0x{data:02x}' for data in tx_ints]}" + ) + self.transport.write(bytearray(tx_ints)) + + # Wait for the SCI RS232 device to respond + dev_info = await asyncio.wait_for( + self._queue_rx_info.get(), + timeout=DriverSCIRS232.timeout_tx_confirm, + ) + # Release transmit mutex + + if not isinstance(dev_info, DriverSCIRS232.SCIRS232DeviceReply): + _LOG.error(f"Expected a SCI RS232, but got: {dev_info}") + return + + self._device_info = dev_info + + def _process_byte(self, rx_int: int) -> None: + if not isinstance(rx_int, int): + raise TypeError( + f"Got an item of type: {type(rx_int)}, expected an integer" + ) + + # Handle each state of the state machine + if self._rx_state == self.ReadState.WAIT_STATUS: + self._buffer[0] = rx_int + self._rx_state = self.ReadState.WAIT_DATA_HI + + return + + elif self._rx_state == self.ReadState.WAIT_DATA_HI: + # In the 'WAIT_DATA_HI' state the next byte will be data high + self._buffer[1] = rx_int + self._rx_state = self.ReadState.WAIT_DATA_MI + return + + elif self._rx_state == self.ReadState.WAIT_DATA_MI: + # In the 'WAIT_DATA_MI' state the next byte will be data mid + self._buffer[2] = rx_int + self._rx_state = self.ReadState.WAIT_DATA_LO + return + + elif self._rx_state == self.ReadState.WAIT_DATA_LO: + # Read bytes, up to the maximum expected length + self._buffer[3] = rx_int + self._rx_state = self.ReadState.WAIT_CHECKSUM + return + + elif self._rx_state == self.ReadState.WAIT_CHECKSUM: + # In the 'WAIT_CHECKSUM' state, the next byte will be the + # checksum + self._buffer[4] = rx_int + + # We now have a full frame + _LOG.trace( + f"Raw data: {[f'0x{data:02x}' for data in self._buffer]}" + ) + + # Validate the checksum: XOR all values, excluding the + # synchronisation and checksum + check = reduce(xor, self._buffer[0:4]) + + if check != rx_int: + _LOG.warning( + f"SCI RS232 checksum failure! Calculated: {check}, " + f"Expected: {rx_int}" + ) + self.reset() + return + else: + _LOG.trace("SCI RS232 checksum passed, full frame received") + try: + status = DriverSCIRS232.SCIRS232Code(self._buffer[0] & DriverSCIRS232.SCIRS232Protocol.STATUS_CODE_MASK) + except ValueError: + _LOG.exception( + f"SCI RS232 unknown status code: 0x{self._buffer[0]:02x}" + ) + self.reset() + return + + #TODO: handle status codes / options here + if status == DriverSCIRS232.SCIRS232Code.ERROR: + self._process_error(tuple(self._buffer[:4])) + elif status == DriverSCIRS232.SCIRS232Code.STATUS_OK: + self._process_system_message(self._buffer[0]) + elif status == DriverSCIRS232.SCIRS232Code.STATUS_DALI_NO: + self._process_system_message(self._buffer[0]) + elif status == DriverSCIRS232.SCIRS232Code.SEND_DALI_8: + self._process_dali_frame((self._buffer[3],)) + elif status == DriverSCIRS232.SCIRS232Code.SEND_DALI_16: + self._process_dali_frame(self._buffer[2:4]) + elif status == DriverSCIRS232.SCIRS232Code.SEND_DALI2_24: + self._process_dali_frame(self._buffer[1:4]) + elif (status == DriverSCIRS232.SCIRS232Code.SEND_EDALI) or \ + (status == DriverSCIRS232.SCIRS232Code.SEND_DSI) or \ + (status == DriverSCIRS232.SCIRS232Code.SEND_DALI_17): + _LOG.error( + f"SCI RS232 eDALI, DSI or 17-bit DALI message received. These are not supported." + f" data: {self._buffer[1:4]}" + ) + else: + _LOG.error( + f"SCI RS232 unexpected message, status {self._buffer[0]:02x}," + f" data: {self._buffer[1:4]}" + ) + + self.reset() + return + else: + raise RuntimeError(f"Invalid state: {self._rx_state}") + + def _process_system_message(self, data : int): + device_id_info = DriverSCIRS232.SCIRS232DeviceReply( + id=(data & 0xf0) >> 4,code=data&0xf) + self._queue_rx_info.put_nowait(device_id_info) + + def _process_error(self, data : tuple): + try: + error_type = DriverSCIRS232.SCIRS232Protocol.ErrorType(data[3]) + except ValueError: + _LOG.exception( + f"SCI RS232 unknown error code: 0x{data[3]:02x}" + ) + self.reset() + return + if error_type == DriverSCIRS232.SCIRS232Protocol.ErrorType.CHECKSUM: + error_str = "checksum" + elif error_type == DriverSCIRS232.SCIRS232Protocol.ErrorType.DALI_BUS_SHORT_CIRCUIT: + error_str = "short circuit on the DALI bus" + elif error_type == DriverSCIRS232.SCIRS232Protocol.ErrorType.DALI_RX_ERROR: + error_str = "DALI receive error" + elif error_type == DriverSCIRS232.SCIRS232Protocol.ErrorType.UNKNOWN_COMMAND: + error_str = "unknown command" + else: + error_str = "unknown" + _LOG.error( + f"No string defined for SCI RS232 error code: 0x{data[3]:02x}" + ) + _LOG.error(f"SCI RS232 reports {error_str} error ({error_type})") + self._process_system_message(data[0]) + + def _process_dali_frame(self, received_data: tuple): + """ + Handle a DALI 'event' message, typically these are received when + a DALI frame was observed on the bus by the SCI RS232 device + """ + + _LOG.trace( + f"SCI RS232 DALI frame received: {[f'0x{data:02x}' for data in received_data]}" + ) + + if len(received_data) == 1: + # An 8-bit frame is a response, don't try to decipher it + # here because it depends on context which the 'send()' + # routine will have to handle + self._queue_rx_raw_dali.put_nowait(received_data[0]) + _LOG.trace( + f"Adding raw DALI response to queue: '{received_data[0]}'" + ) + else: + # A 16 or 24-bit frame is an intercepted DALI command, + # it can be deciphered into a Command object + dali_frame = frame.Frame( + bits=8 * len(received_data), data=received_data + ) + try: + dali_command = command.Command.from_frame( + dali_frame, + devicetype=self._prev_rx_enable_dt, + dev_inst_map=self._dev_inst_map, + ) + except TypeError: + _LOG.error( + f"Failed to decode DALI command! Frame: {dali_frame}" + ) + return + if isinstance( + dali_command, dali.gear.general.EnableDeviceType + ): + self._prev_rx_enable_dt = dali_command.param + else: + self._prev_rx_enable_dt = 0 + + _LOG.debug(f"Adding DALI command to queue: {dali_command}") + self._queue_rx_dali.distribute(dali_command) + + def connection_made(self, transport): + self.transport = transport + _LOG.info(f"Serial port opened: {transport}") + self._connected.set() + + def data_received(self, data): + _LOG.trace(f"Serial data received: {data}") + for rx in data: + self._process_byte(rx) + + def connection_lost(self, exc): + _LOG.info("Serial port closed") + self.transport.loop.stop() + + @property + def connected(self) -> asyncio.Event: + return self._connected + + @property + def device_info(self) -> Optional[DriverSCIRS232.SCIRS232DeviceReply]: + return self._dev_info + + + def __init__( + self, + uri: str | ParseResult, + dev_inst_map: Optional[DeviceInstanceTypeMapper] = None, + ): + super().__init__(uri=uri, dev_inst_map=dev_inst_map) + + self.serial_path = self.uri.path + _LOG.info(f"Initialising SCI RS232 driver for '{self.serial_path}'") + self._transport: Optional[serial_asyncio.SerialTransport] = None + self._protocol: Optional[DriverSCIRS232.SCIRS232Protocol] = None + + async def connect(self, *, scan_dev_inst: bool = False) -> None: + if self.is_connected: + _LOG.warning( + f"'connect()' called but SCI RS232 driver already connected" + ) + return + + _LOG.info( + f"Creating serial connection to {self.serial_path}" + ) + + # TODO: Add failure/retry handling + ( + self._transport, + self._protocol, + ) = await serial_asyncio.create_serial_connection( + loop=asyncio.get_event_loop(), + protocol_factory=DriverSCIRS232.SCIRS232Protocol, + url=self.serial_path, + baudrate=38400, + ) + + try: + await asyncio.wait_for( + self._protocol._connected.wait(), + timeout=DriverSCIRS232.timeout_connect, + ) + except asyncio.exceptions.TimeoutError as exc: + _LOG.critical(f"Timeout waiting for driver to connect: {exc}") + raise + + await self._protocol.send_device_info_query() + self._protocol.dev_inst_map = self.dev_inst_map + + self._connected.set() + + # Scan the bus for control devices, and create a mapping of addresses + # to instance types + if scan_dev_inst: + _LOG.info("Scanning DALI bus for control devices") + await self.run_sequence(self.dev_inst_map.autodiscover()) + _LOG.info( + f"Found {len(self.dev_inst_map.mapping)} enabled control " + "device instances" + ) + + async def send( + self, msg: command.Command, in_transaction: bool = False + ) -> Optional[command.Response]: + # Only send if the driver is connected + if not self.is_connected: + _LOG.critical(f"DALI driver cannot send, not connected: {self}") + raise IOError("DALI driver cannot send, not connected") + + response = None + + if not in_transaction: + await self.transaction_lock.acquire() + try: + # Make sure the received command buffer is empty, so that an + # unexpected response can't accidentally be used + self._protocol.reset_dali_response() + await self._protocol.send_dali_command(msg) + if msg.is_query: + response = command.Response(None) + while True: + try: + raw_rsp = await asyncio.wait_for( + self._protocol._queue_rx_raw_dali.get(), + #self._protocol.wait_dali_raw_response(), + timeout=DriverSCIRS232.timeout_rx, + ) + except asyncio.exceptions.TimeoutError: + _LOG.debug( + f"DALI response timeout, from message: {msg}" + ) + break + if isinstance(raw_rsp, int): + response = msg.response(frame.BackwardFrame(raw_rsp)) + _LOG.debug(f"DALI response received: {raw_rsp}") + break + else: + _LOG.warning( + "DALI response expected to be 'int' but got type " + f"'{type(raw_rsp)}': {raw_rsp}" + ) + raw_rsp = None + continue + finally: + if not in_transaction: + self.transaction_lock.release() + + return response + + def new_dali_rx_queue(self) -> DistributorQueue: + return DistributorQueue(self._protocol.queue_rx_dali) diff --git a/dali/tests/test_dummy.py b/dali/tests/test_dummy.py index 7aa345a..bd568b7 100644 --- a/dali/tests/test_dummy.py +++ b/dali/tests/test_dummy.py @@ -23,7 +23,7 @@ import py import pytest -from dali.driver.serial import DriverSerialBase, DriverLubaRs232, drivers_map +from dali.driver.serial import DriverSerialBase, DriverLubaRs232, DriverSCIRS232, drivers_map from dali.tests.fakes_serial import DriverSerialDummy from dali import address, gear from dali.sequences import QueryDeviceTypes @@ -81,6 +81,7 @@ def test_drivers_map(): drivers = drivers_map() assert drivers["dummy"] == DriverSerialDummy assert drivers["luba232"] == DriverLubaRs232 + assert drivers["scirs232"] == DriverSCIRS232 def test_dummy_init_good(tmp_path):