Skip to content

Commit

Permalink
* Add support for queuing lists of commands to ensure they end up in …
Browse files Browse the repository at this point in the history
…sequential order in the command queue. Honeywell uses this to ensure that the regularly used keypress sequences cannot be corrupted.

* Ensure periodic keep-alive and zone timer dumps happen on schedule regardless of how long the EVL takes to respond to the command.
* Backed out experimental changes used to try and prevent EVL from going unresponsive
  • Loading branch information
ufodone committed Jan 21, 2023
1 parent 46134d1 commit 37c1279
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 89 deletions.
12 changes: 1 addition & 11 deletions custom_components/envisalink_new/pyenvisalink/dsc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,7 @@ async def keypresses_to_partition(self, partitionNumber, keypresses):

async def keep_alive(self):
"""Send a keepalive command to reset it's watchdog timer."""
while not self._shutdown:
if self._loggedin:
await self.queue_command(evl_Commands['KeepAlive'], '')
await asyncio.sleep(self._alarmPanel.keepalive_interval)

async def periodic_zone_timer_dump(self):
"""Used to periodically get the zone timers to make sure our zones are updated."""
while not self._shutdown:
if self._loggedin:
await self.dump_zone_timers()
await asyncio.sleep(self._alarmPanel.zone_timer_interval)
await self.queue_command(evl_Commands['KeepAlive'], '')

async def arm_stay_partition(self, code, partitionNumber):
"""Public method to arm/stay a partition."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ def __init__(self, cmd, data, code):
self.expiryTime = 0
self.responseEvent = asyncio.Event()

self._lastReceiveTime = 0
self._nextExpectedReceiveWindow = None


def __init__(self, panel, loop):
self._loggedin = False
Expand Down Expand Up @@ -71,10 +68,18 @@ def start(self):
self._shutdown = False
self._commandTask = self.create_internal_task(self.process_command_queue(), name="command_processor")
self._readLoopTask = self.create_internal_task(self.read_loop(), name="read_loop")
self._keepAliveTask = self.create_internal_task(self.keep_alive(), name="keep_alive")

if self._alarmPanel.keepalive_interval > 0:
self.create_internal_task(
self.periodic_command(self.keep_alive, self._alarmPanel.keepalive_interval),
name="keep_alive"
)

if self._alarmPanel.zone_timer_interval > 0:
self.create_internal_task(self.periodic_zone_timer_dump(), name="zone_timer_dump")
self.create_internal_task(
self.periodic_command(self.dump_zone_timers, self._alarmPanel.zone_timer_interval),
name="zone_timer_dump"
)

if self._ownLoop:
_LOGGER.info("Starting up our own event loop.")
Expand Down Expand Up @@ -119,7 +124,7 @@ async def read_loop(self):
while not self._shutdown and self._reader:
_LOGGER.debug("Waiting for data from EVL")
try:
data = await asyncio.wait_for(self._reader.read(n=256), 5)
data = await asyncio.wait_for(self._reader.read(n=1024), 5)
except asyncio.exceptions.TimeoutError:
continue

Expand All @@ -128,9 +133,6 @@ async def read_loop(self):
await self.disconnect()
break

self._lastReceiveTime = time.time()
self._nextExpectedReceiveWindow = None

data = data.decode('ascii')
_LOGGER.debug('{---------------------------------------')
_LOGGER.debug(str.format('RX < {0}', data))
Expand All @@ -153,14 +155,16 @@ async def read_loop(self):

await self.disconnect()

async def periodic_command(self, action, interval):
"""Used to periodically send a keepalive command to reset the envisalink's watchdog timer."""
while not self._shutdown:
next_send = time.time() + interval

async def keep_alive(self):
"""Used to periodically send a keepalive message to the envisalink."""
raise NotImplementedError()
if self._loggedin:
await action()

async def periodic_zone_timer_dump(self):
"""Used to periodically get the zone timers to make sure our zones are updated."""
raise NotImplementedError()
now = time.time();
await asyncio.sleep(next_send - now)

async def connect(self):
_LOGGER.info(str.format("Started to connect to Envisalink... at {0}:{1}", self._alarmPanel.host, self._alarmPanel.port))
Expand Down Expand Up @@ -204,8 +208,6 @@ async def send_data(self, data):
_LOGGER.debug('TX > %s', logData.encode('ascii'))

try:
await self.delay_write_if_needed()

self._writer.write((data + '\r\n').encode('ascii'))
await self._writer.drain()
except Exception as err:
Expand All @@ -220,6 +222,10 @@ async def dump_zone_timers(self):
"""Public method for dumping zone timers."""
raise NotImplementedError()

async def keep_alive(self):
"""Send a keepalive command to reset it's watchdog timer."""
raise NotImplementedError()

async def change_partition(self, partitionNumber):
"""Public method for changing the default partition."""
raise NotImplementedError()
Expand Down Expand Up @@ -385,16 +391,28 @@ def handle_zone_timer_dump(self, code, data):


async def queue_command(self, cmd, data, code = None):
if _LOGGER.isEnabledFor(logging.DEBUG):
# Scrub the password and alarm code if necessary
logData = self.scrub_sensitive_data(data, code)
_LOGGER.debug("Queueing command '%s' data: '%s' ; calling_task=%s", cmd, logData, asyncio.current_task().get_name())
return await self.queue_commands([ { "cmd": cmd, "data": data, "code": code }])

async def queue_commands(self, command_list : list):
operations = []
for command in command_list:
cmd = command["cmd"]
data = command["data"]
code = command.get("code")

if _LOGGER.isEnabledFor(logging.DEBUG):
# Scrub the password and alarm code if necessary
logData = self.scrub_sensitive_data(data, code)
_LOGGER.debug("Queueing command '%s' data: '%s' ; calling_task=%s", cmd, logData, asyncio.current_task().get_name())

op = self.Operation(cmd, data, code)
op.expiryTime = time.time() + self._alarmPanel.command_timeout
operations.append(op)
self._commandQueue.append(op)

op = self.Operation(cmd, data, code)
op.expiryTime = time.time() + self._alarmPanel.command_timeout
self._commandQueue.append(op)
self._commandEvent.set()
await op.responseEvent.wait()
for op in operations:
await op.responseEvent.wait()
return op.state == op.State.SUCCEEDED

async def process_command_queue(self):
Expand Down Expand Up @@ -513,33 +531,6 @@ def command_failed(self, retry = False):
# Wake up the command processing task to process this result
self._commandEvent.set()

async def delay_write_if_needed(self):
""" Some EVLs can become non-responsive for a period of time if sends and receives happen
within the same millisecond.
If we have received data from the EVL within 1ms then delay the send for a little bit.
Avoiding writes just before we receive data from the EVL is harder and we have to use
heuristics based on expected traffic patterns to try and predict when it's safe to send.
"""
delay = None
now = time.time()
if self._nextExpectedReceiveWindow:
if (self._nextExpectedReceiveWindow[0] - 0.01) < now < (self._nextExpectedReceiveWindow[1] + 0.01):
delay = (self._nextExpectedReceiveWindow[1] - now) + 0.1
elif abs(now - self._lastReceiveTime) < 0.01:
delay = 0.1


if delay is not None:
_LOGGER.error("Delaying send to avoid being too close to a receive by %f seconds.", delay)
await asyncio.sleep(delay)

def set_next_expected_receive_window(self, window : tuple):
# Only update if it's happening sooner than the previous guess
if self._nextExpectedReceiveWindow is None or window[0] < self._nextExpectedReceiveWindow[0]:
self._nextExpectedReceiveWindow = window

def scrub_sensitive_data(self, data, code = None):
if not self._loggedin:
# Remove the password from the log entry
Expand Down
36 changes: 10 additions & 26 deletions custom_components/envisalink_new/pyenvisalink/honeywell_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,7 @@ class HoneywellClient(EnvisalinkClient):
"""Represents a honeywell alarm client."""

async def keep_alive(self):
"""Send a keepalive command to reset it's watchdog timer."""
while not self._shutdown:
if self._loggedin:
await self.queue_command(evl_Commands['KeepAlive'], '')
await asyncio.sleep(self._alarmPanel.keepalive_interval)

async def periodic_zone_timer_dump(self):
"""Used to periodically get the zone timers to make sure our zones are updated."""
while not self._shutdown:
if self._loggedin:
await self.dump_zone_timers()
await asyncio.sleep(self._alarmPanel.zone_timer_interval)
await self.queue_command(evl_Commands['KeepAlive'], '')

async def send_command(self, code, data):
"""Send a command in the proper honeywell format."""
Expand All @@ -37,10 +26,16 @@ async def dump_zone_timers(self):

async def keypresses_to_partition(self, partitionNumber, keypresses):
"""Send keypresses to a particular partition."""
commands = []
for char in keypresses:
result = await self.queue_command(evl_Commands['PartitionKeypress'], str.format("{0},{1}", partitionNumber, char))
if not result:
break
commands.append({
"cmd": evl_Commands['PartitionKeypress'],
"data": str.format("{0},{1}", partitionNumber, char)
})

# Queue up all the keypresses together to ensure an unrelated command cannot
# be inserted in the middle.
await self.queue_commands(commands)

async def arm_stay_partition(self, code, partitionNumber):
"""Public method to arm/stay a partition."""
Expand Down Expand Up @@ -191,17 +186,6 @@ def handle_keypad_update(self, code, data):
})
_LOGGER.debug(json.dumps(self._alarmPanel.alarm_state['partition'][partitionNumber]['status']))

# Try and guess when the next update will come based on the state
now = time.time()
if (bool(flags.armed_stay) or bool(flags.armed_away)) and user_zone_field != 0:
# Exit delay in progress so updates come every second
self.set_next_expected_receive_window((now + 0.9, now + 1.1))
else:
# When in the Ready state we typically see an update every 10 seconds but sometimes it
# shows up at around the 9.5s mark.
# TODO: does the same happen once it's armed?
self.set_next_expected_receive_window((now + 9.5, now + 10))

def handle_zone_state_change(self, code, data):
"""Handle when the envisalink sends us a zone change."""
# Envisalink TPI is inconsistent at generating these
Expand Down

0 comments on commit 37c1279

Please sign in to comment.