Skip to content

Commit

Permalink
TCP Autoreconnect Improvements (#6)
Browse files Browse the repository at this point in the history
* Handle graceful TCP closing: Improved the TCP reconnect behavior to account for ECU closes/resets. Fixed some timeout edge cases.
* Ran code through black again to cleanup
* Improved timeout behavior
  • Loading branch information
jacobschaer authored Sep 26, 2021
1 parent d21d10b commit f0bb64d
Show file tree
Hide file tree
Showing 4 changed files with 500 additions and 223 deletions.
218 changes: 172 additions & 46 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self):
self.payload_type = None
self.payload_size = None
self.payload = bytearray()
self._state = Parser.ParserState.READ_PROTOCOL_VERSION
self._state = Parser.ParserState.READ_PROTOCOL_VERSION

def push_bytes(self, data_bytes):
self.rx_buffer += data_bytes
Expand Down Expand Up @@ -79,10 +79,10 @@ def read_message(self, data_bytes):
if len(self.payload) == self.payload_size:
self._state = Parser.ParserState.READ_PROTOCOL_VERSION
logger.debug(
"Received DoIP Message. Type: 0x{:x}, Size: {} bytes, Payload: {}".format(
"Received DoIP Message. Type: 0x{:X}, Payload Size: {} bytes, Payload: {}".format(
self.payload_type,
self.payload_size,
[hex(x) for x in data_bytes],
" ".join(f"{byte:02X}" for byte in self.payload),
)
)
try:
Expand Down Expand Up @@ -166,8 +166,9 @@ def __init__(
self._protocol_version = protocol_version
self._connect()
self._auto_reconnect_tcp = auto_reconnect_tcp
self._tcp_close_detected = False
if self._activation_type is not None:
result = self.request_activation(self._activation_type)
result = self.request_activation(self._activation_type, disable_retry=True)
if result.response_code != RoutingActivationResponse.ResponseCode.Success:
raise ConnectionRefusedError(
f"Activation Request failed with code {result.response_code}"
Expand Down Expand Up @@ -221,9 +222,7 @@ def await_vehicle_announcement(cls, udp_port=UDP_DISCOVERY, timeout=None):
sock.settimeout(remaining)
try:
data, addr = sock.recvfrom(1024)
if data[0] != 0:
print(data)
except socket.timeout as ex:
except socket.timeout:
raise TimeoutError(
"Timed out waiting for Vehicle Announcement broadcast"
)
Expand All @@ -239,7 +238,9 @@ def empty_txqueue(self):
"""Implemented for compatibility with udsoncan library. Nothing useful to be done yet"""
pass

def read_doip(self, timeout=A_PROCESSING_TIME, transport=TransportType.TRANSPORT_TCP):
def read_doip(
self, timeout=A_PROCESSING_TIME, transport=TransportType.TRANSPORT_TCP
):
"""Helper function to read from the DoIP socket.
:param timeout: Maximum time allowed for response from ECU
Expand All @@ -255,8 +256,7 @@ def read_doip(self, timeout=A_PROCESSING_TIME, transport=TransportType.TRANSPORT
if transport == DoIPClient.TransportType.TRANSPORT_TCP:
response = self._tcp_parser.read_message(data)
else:
response = self._udp_parser.read_message(data)
data = bytearray()
response = self._udp_parser.read_message(data)
if type(response) == GenericDoIPNegativeAcknowledge:
raise IOError(
f"DoIP Negative Acknowledge. NACK Code: {response.nack_code}"
Expand All @@ -265,18 +265,74 @@ def read_doip(self, timeout=A_PROCESSING_TIME, transport=TransportType.TRANSPORT
logger.warning("Responding to an alive check")
self.send_doip_message(AliveCheckResponse(self._client_logical_address))
elif response:
# We got a response that might actually be interesting to the caller,
# so return it.
return response
else:
try:
if transport == DoIPClient.TransportType.TRANSPORT_TCP:
data = self._tcp_sock.recv(1024)
else:
data = self._udp_sock.recv(1024)
except socket.timeout:
pass
# There were no responses in the parser, so we need to read off the network
# and feed that to the parser until we find another DoIP message

if (transport == DoIPClient.TransportType.TRANSPORT_TCP) and self._tcp_close_detected:
# The caller is looking for TCP responses, but there were no messages
# returned from the parser and the socket has been closed (so no further
# responses are expected). It's safe to stop looking early and raise
# a TimeoutError
break
else:
try:
if transport == DoIPClient.TransportType.TRANSPORT_TCP:
data = self._tcp_sock.recv(1024)
if len(data) == 0:
logger.debug("Peer has closed the connection.")
self._tcp_close_detected = True
else:
data = self._udp_sock.recv(1024)
except socket.timeout:
pass
raise TimeoutError("ECU failed to respond in time")

def send_doip(self, payload_type, payload_data, transport=TransportType.TRANSPORT_TCP):
def _tcp_socket_check(self, first_timeout=0.010):
"""Helper function to service a TCP socket and check for disconnects.
Called from send_doip() before and after TCP socket sends to detect if reconnect
is needed.
:param first_timeout: Timeout for the first recv() call. This should correspond to
how long you expect the ECU to return an RST after sending to the
socket if the connection was unexpectedly terminated. Too long
and it hurts performance, too short and you run the risk of
missing a socket reconnect opportunity. Normally <1ms, but
allowing 10ms by default to be safe.
:type first_timeout: float
"""
original_timeout = self._tcp_sock.gettimeout()
try:
self._tcp_sock.settimeout(first_timeout)
while True:
data = self._tcp_sock.recv(1024)
if len(data) == 0:
logger.debug("TCP Connection closed by ECU, attempting to reset")
self._tcp_close_detected = True
break
else:
self._tcp_parser.push_bytes(data)
# Subsequent reads, go to 0 timeout
self._tcp_sock.settimeout(0)
except (BlockingIOError, socket.timeout):
pass
except (ConnectionResetError, BrokenPipeError):
logger.debug("TCP Connection broken, attempting to reset")
self._tcp_close_detected = True
finally:
self._tcp_sock.settimeout(original_timeout)

def send_doip(
self,
payload_type,
payload_data,
transport=TransportType.TRANSPORT_TCP,
disable_retry=False,
):
"""Helper function to send to the DoIP socket.
Adds the correct DoIP header to the payload and sends to the socket.
Expand All @@ -285,7 +341,13 @@ def send_doip(self, payload_type, payload_data, transport=TransportType.TRANSPOR
:type payload_type: int
:param transport: The IP transport layer to send to, either UDP or TCP
:type transport: DoIPClient.TransportType, optional
:param disable_retry: Disables retry regardless of auto_reconnect_tcp flag. This is used by activation
requests during connect/reconnect.
:type disable_retry: bool, optional
"""

retry = self._auto_reconnect_tcp and not disable_retry

data_bytes = struct.pack(
"!BBHL",
self._protocol_version,
Expand All @@ -295,24 +357,61 @@ def send_doip(self, payload_type, payload_data, transport=TransportType.TRANSPOR
)
data_bytes += payload_data
logger.debug(
"Sending DoIP Message: Type: 0x{:x}, Size: {}, Payload: {}".format(
payload_type, len(payload_data), [hex(x) for x in data_bytes]
"Sending DoIP Message: Type: 0x{:X}, Payload Size: {}, Payload: {}".format(
payload_type,
len(payload_data),
" ".join(f"{byte:02X}" for byte in payload_data),
)
)
if transport == DoIPClient.TransportType.TRANSPORT_TCP:
self._tcp_sock.send(data_bytes)

if self._auto_reconnect_tcp:
try:
self._tcp_parser.push_bytes(self._tcp_sock.recv(1024))
except (ConnectionResetError, BrokenPipeError):
logger.debug("TCP Connection broken, attempting to reset")
self.reconnect()
self._tcp_sock.send(data_bytes)
else:
self._udp_sock.sendto(data_bytes, (self._ecu_ip_address, self._udp_port))

def send_doip_message(self, doip_message, transport=TransportType.TRANSPORT_TCP.TRANSPORT_TCP):
# The ECU is well within its rights to have closed the socket since we last sent it data -
# particularly if the tester has been quiet for a while. For TCP there's two possibilities
# 1) The ECU closed the connection properly, and there's a FIN/RST waiting to be read
# 2) The ECU force closed the connection - we won't find that out until we try to write
# something and the ECU responds with an RST because the session isn't valid anymore.
#
# For (1) we could easily let the state machine go without a special case, but then we'd
# be pushing a packet that the ECU would have to ignore (if they closed they have no way
# to respond). So, we'll handle before the Tx, but we won't allow it to block.

if retry:
self._tcp_socket_check(first_timeout=0)

remaining = len(data_bytes)
attempted_reconnect = False

# In general, the entire DoIP message should fit in one TCP packet, but it's good practice
# to loop until the whole packet has been written, in case the OS write buffers get backed up
while remaining > 0:
if transport == DoIPClient.TransportType.TRANSPORT_TCP:
if retry and self._tcp_close_detected:
if not attempted_reconnect:
logger.warning("TCP reconnecting")
self.reconnect()
attempted_reconnect = True
else:
logger.warning(
"TCP needs reconnection, but we already attempted once. Send will fail."
)

remaining -= self._tcp_sock.send(data_bytes[-remaining:])

if retry and not self._tcp_close_detected:
self._tcp_socket_check()
if self._tcp_close_detected:
remaining = len(data_bytes)

else:
remaining -= self._udp_sock.sendto(
data_bytes[-remaining:], (self._ecu_ip_address, self._udp_port)
)

def send_doip_message(
self,
doip_message,
transport=TransportType.TRANSPORT_TCP.TRANSPORT_TCP,
disable_retry=False,
):
"""Helper function to send an unpacked message to the DoIP socket.
Packs the given message and adds the correct DoIP header before sending to the socket
Expand All @@ -321,12 +420,19 @@ def send_doip_message(self, doip_message, transport=TransportType.TRANSPORT_TCP.
:type doip_message: object
:param transport: The IP transport layer to send to, either UDP or TCP
:type transport: DoIPClient.TransportType, optional
:param disable_retry: Disables retry regardless of auto_reconnect_tcp flag. This is used by activation
requests during connect/reconnect.
:type disable_retry: bool, optional
"""
payload_type = payload_message_to_type[type(doip_message)]
payload_data = doip_message.pack()
self.send_doip(payload_type, payload_data, transport=transport)
self.send_doip(
payload_type, payload_data, transport=transport, disable_retry=disable_retry
)

def request_activation(self, activation_type, vm_specific=None):
def request_activation(
self, activation_type, vm_specific=None, disable_retry=False
):
"""Requests a given activation type from the ECU for this connection using payload type 0x0005
:param activation_type: The type of activation to request - see Table 47 ("Routing
Expand All @@ -335,13 +441,16 @@ def request_activation(self, activation_type, vm_specific=None):
:type activation_type: RoutingActivationRequest.ActivationType
:param vm_specific: Optional 4 byte long int
:type vm_specific: int, optional
:param disable_retry: Disables retry regardless of auto_reconnect_tcp flag. This is used by activation
requests during connect/reconnect.
:type disable_retry: bool, optional
:return: The resulting activation response object
:rtype: RoutingActivationResponse
"""
message = RoutingActivationRequest(
self._client_logical_address, activation_type, vm_specific=vm_specific
)
self.send_doip_message(message)
self.send_doip_message(message, disable_retry=disable_retry)
while True:
result = self.read_doip()
if type(result) == RoutingActivationResponse:
Expand Down Expand Up @@ -370,7 +479,9 @@ def request_vehicle_identification(self, eid=None, vin=None):
message = VehicleIdentificationRequestWithVIN(vin)
else:
message = VehicleIdentificationRequest()
self.send_doip_message(message, transport=DoIPClient.TransportType.TRANSPORT_UDP)
self.send_doip_message(
message, transport=DoIPClient.TransportType.TRANSPORT_UDP
)
while True:
result = self.read_doip(transport=DoIPClient.TransportType.TRANSPORT_UDP)
if type(result) == VehicleIdentificationResponse:
Expand All @@ -389,7 +500,9 @@ def request_alive_check(self):
:rtype: AliveCheckResopnse
"""
message = AliveCheckRequest()
self.send_doip_message(message, transport=DoIPClient.TransportType.TRANSPORT_UDP)
self.send_doip_message(
message, transport=DoIPClient.TransportType.TRANSPORT_UDP
)
while True:
result = self.read_doip(transport=DoIPClient.TransportType.TRANSPORT_UDP)
if type(result) == AliveCheckResponse:
Expand All @@ -408,7 +521,9 @@ def request_diagnostic_power_mode(self):
:rtype: DiagnosticPowerModeResponse
"""
message = DiagnosticPowerModeRequest()
self.send_doip_message(message, transport=DoIPClient.TransportType.TRANSPORT_UDP)
self.send_doip_message(
message, transport=DoIPClient.TransportType.TRANSPORT_UDP
)
while True:
result = self.read_doip(transport=DoIPClient.TransportType.TRANSPORT_UDP)
if type(result) == DiagnosticPowerModeResponse:
Expand All @@ -427,7 +542,9 @@ def request_entity_status(self):
:rtype: EntityStatusResponse
"""
message = DoipEntityStatusRequest()
self.send_doip_message(message, transport=DoIPClient.TransportType.TRANSPORT_UDP)
self.send_doip_message(
message, transport=DoIPClient.TransportType.TRANSPORT_UDP
)
while True:
result = self.read_doip(transport=DoIPClient.TransportType.TRANSPORT_UDP)
if type(result) == EntityStatusResponse:
Expand All @@ -439,7 +556,7 @@ def request_entity_status(self):
)
)

def send_diagnostic(self, diagnostic_payload):
def send_diagnostic(self, diagnostic_payload, timeout=A_PROCESSING_TIME):
"""Send a raw diagnostic payload (ie: UDS) to the ECU.
:param diagnostic_payload: UDS payload to transmit to the ECU
Expand All @@ -450,8 +567,15 @@ def send_diagnostic(self, diagnostic_payload):
self._client_logical_address, self._ecu_logical_address, diagnostic_payload
)
self.send_doip_message(message)
start_time = time.time()
while True:
result = self.read_doip()
ellapsed_time = time.time() - start_time
if timeout and ellapsed_time > timeout:
raise TimeoutError("Timed out waiting for diagnostic response")
if timeout:
result = self.read_doip(timeout=(timeout - ellapsed_time))
else:
result = self.read_doip()
if type(result) == DiagnosticMessageNegativeAcknowledgement:
raise IOError(
"Diagnostic request rejected with negative acknowledge code: {}".format(
Expand All @@ -476,10 +600,11 @@ def receive_diagnostic(self, timeout=None):
"""
start_time = time.time()
while True:
if timeout and (time.time() - start_time) > timeout:
ellapsed_time = time.time() - start_time
if timeout and ellapsed_time > timeout:
raise TimeoutError("Timed out waiting for diagnostic response")
if timeout:
result = self.read_doip(timeout=timeout)
result = self.read_doip(timeout=(timeout - ellapsed_time))
else:
result = self.read_doip()
if type(result) == DiagnosticMessage:
Expand All @@ -500,6 +625,7 @@ def _connect(self):
self._tcp_sock.bind((self._client_ip_address, 0))
self._tcp_sock.connect((self._ecu_ip_address, self._tcp_port))
self._tcp_sock.settimeout(A_PROCESSING_TIME)
self._tcp_close_detected = False

self._udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand Down Expand Up @@ -530,8 +656,8 @@ def reconnect(self, close_delay=A_PROCESSING_TIME):
time.sleep(close_delay)
self._connect()
if self._activation_type is not None:
result = self.request_activation(self._activation_type)
result = self.request_activation(self._activation_type, disable_retry=True)
if result.response_code != RoutingActivationResponse.ResponseCode.Success:
raise ConnectionRefusedError(
f"Activation Request failed with code {result.response_code}"
)
)
Loading

0 comments on commit f0bb64d

Please sign in to comment.