Skip to content

Commit

Permalink
fix: better handle ConnectionResetError and log timing
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuagruenstein committed Jul 5, 2024
1 parent 1a68135 commit 1c8fab8
Showing 1 changed file with 56 additions and 30 deletions.
86 changes: 56 additions & 30 deletions tcp_modbus_aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ async def send_modbus_message(

time_budget_remaining = timeout if timeout is not None else float("inf")

with catchtime() as t:
# STEP ONE: WE ACQUIRE THE LOCK TO THE CONNECTION
with catchtime() as lock_t:
try:
await asyncio.wait_for(
self._comms_lock.acquire(), time_budget_remaining
Expand All @@ -420,40 +421,51 @@ async def send_modbus_message(
raise ModbusCommunicationTimeoutError(
f"Failed to acquire lock to send request {msg_str} to modbus device {self.host}"
)
time_budget_remaining -= t()
time_budget_remaining -= lock_t()

try:
if self.logger is not None:
self.logger.debug(
f"[{self}][send_modbus_message] acquired lock to send {msg_str}"
)

with catchtime() as t:
# STEP TWO: CREATE A CONNECTION IF ONE DOES NOT EXIST
with catchtime() as conn_t:
reader, writer = await self._get_tcp_connection(
timeout=time_budget_remaining
)
time_budget_remaining -= t()
time_budget_remaining -= conn_t()

# STEP THREE: WRITE OUR REQUEST
try:
writer.write(request_adu)

with catchtime() as t:
with catchtime() as write_t:
await asyncio.wait_for(writer.drain(), time_budget_remaining)
time_budget_remaining -= t()
time_budget_remaining -= write_t()

if self.logger is not None:
self.logger.debug(f"[{self}][send_modbus_message] wrote {msg_str}")

except (asyncio.TimeoutError, OSError) as e:
except (asyncio.TimeoutError, OSError, ConnectionResetError):
# Clear connection no matter what if we fail on the write
# TODO: consider revisiting this to only do it on OSError and ConnectionResetError
# (but Gru is scared about partial writes)

if self.logger is not None:
self.logger.warning(
f"[{self}][send_modbus_message] Failed to send data to modbus device for "
f"request {msg_str}, clearing connection"
)

await self.clear_tcp_connection()

if retries > 0:
if self.logger is not None:
self.logger.warning(
f"[{self}][send_modbus_message] Failed to send data to modbus device for "
f"request {msg_str}, retrying {retries} more time(s)"
f"[{self}][send_modbus_message] Retrying {retries} more time(s) after failure to write"
)

await self.clear_tcp_connection()

# release the lock before retrying (so we can re-get it)
self._comms_lock.release()

Expand All @@ -463,13 +475,10 @@ async def send_modbus_message(
retries=retries - 1,
)

raise (
ModbusCommunicationTimeoutError
if isinstance(e, asyncio.TimeoutError)
else ModbusCommunicationFailureError
)(f"Failed to write request {msg_str} to modbus device {self.host}")
raise

try:
# STEP FOUR: READ THE MBAP HEADER FROM THE RESPONSE (AND ANY JUNK)
expected_response_mbap_header = struct.pack(
MBAP_HEADER_STRUCT_FORMAT,
request_transaction_id,
Expand All @@ -478,12 +487,12 @@ async def send_modbus_message(
self.slave_id,
)

with catchtime() as t:
with catchtime() as read_mbap_t:
response_up_to_mbap_header = await asyncio.wait_for(
reader.readuntil(expected_response_mbap_header),
timeout=time_budget_remaining,
)
time_budget_remaining -= t()
time_budget_remaining -= read_mbap_t()

if len(response_up_to_mbap_header) > MODBUS_MBAP_SIZE:
# TODO: consider introspecting the discarded traffic here for better introspection
Expand All @@ -493,30 +502,47 @@ async def send_modbus_message(
"before mbap header, likely catching up stream after timeouts"
)

with catchtime() as t:
# STEP FOUR: READ THE RESPONSE PDU
with catchtime() as read_pdu_time:
response_pdu = await asyncio.wait_for(
reader.readexactly(request_function.expected_response_pdu_size),
timeout=time_budget_remaining,
)
time_budget_remaining -= t()
time_budget_remaining -= read_pdu_time()

except asyncio.TimeoutError:
if error_on_no_response:
raise ModbusCommunicationTimeoutError(
f"Failed to read response to {msg_str} from modbus device {self.host}"
)
# Sometimes it is ok to not hear back
if not error_on_no_response:
return None

else:
if self.logger is not None:
self.logger.warning(
f"[{self}][send_modbus_message] failed to read response to {msg_str}"
)
raise

except (asyncio.TimeoutError, OSError, ConnectionResetError) as e:
# We clear the connection if the connection was reset by peer or was an OS error
if isinstance(e, (OSError, ConnectionResetError)):
print("CLEARING TCP ON GENERAL FAIL")
await self.clear_tcp_connection()

raise (
ModbusCommunicationTimeoutError
if isinstance(e, asyncio.TimeoutError)
else ModbusCommunicationFailureError
)(
f"Request {msg_str} failed to {self.host}:{self.port} ({type(e).__name__}({e}))"
) from e

return None
finally:
if self._comms_lock.locked():
self._comms_lock.release()

if self.logger is not None:
self.logger.debug(
f"[{self}][send_modbus_message] executed request/response with timing "
f"lock={1000*lock_t():0.2f}ms conn={1000*conn_t():0.2f}ms "
f"write={1000*write_t():0.2f}ms read_mbap={1000*read_mbap_t():0.2f}ms "
f"read_pdu={1000*read_pdu_time():0.2f}ms"
)

response_function = create_function_from_response_pdu(
response_pdu, request_function
)
Expand Down

0 comments on commit 1c8fab8

Please sign in to comment.