From 27b11d0405b36b79ae0e6c7d4950002483af7936 Mon Sep 17 00:00:00 2001 From: teinturman <32970797+teinturman@users.noreply.github.com> Date: Tue, 10 Nov 2020 08:13:39 +0100 Subject: [PATCH 1/2] modified proc_msg function to resolve malformed json packets. This new version uses a local buffer when receiving data, which is truncated each time a new packet is taken into account, This should avoid errors regarding malformed json packets. --- gym_donkeycar/core/client.py | 317 +++++++++++++++++------------------ 1 file changed, 151 insertions(+), 166 deletions(-) diff --git a/gym_donkeycar/core/client.py b/gym_donkeycar/core/client.py index da08ae91b..9c136aac0 100755 --- a/gym_donkeycar/core/client.py +++ b/gym_donkeycar/core/client.py @@ -1,166 +1,151 @@ -""" -SDClient - -A base class for interacting with the sdsim simulator as server. -The server will create on vehicle per client connection. The client -will then interact by createing json message to send to the server. -The server will reply with telemetry and other status messages in an -asynchronous manner. - -Author: Tawn Kramer -""" -import os -import time -import socket -import select -from threading import Thread -import json -import logging - -from .util import replace_float_notation - -logger = logging.getLogger(__name__) - -class SDClient: - def __init__(self, host, port, poll_socket_sleep_time=0.05): - self.msg = None - self.host = host - self.port = port - self.poll_socket_sleep_sec = poll_socket_sleep_time - self.th = None - - # the aborted flag will be set when we have detected a problem with the socket - # that we can't recover from. - self.aborted = False - self.connect() - - - def connect(self): - self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - # connecting to the server - logger.info("connecting to %s:%d " % (self.host, self.port)) - try: - self.s.connect((self.host, self.port)) - except ConnectionRefusedError as e: - raise( Exception("Could not connect to server. Is it running? If you specified 'remote', then you must start it manually.")) - - # time.sleep(pause_on_create) - self.do_process_msgs = True - self.th = Thread(target=self.proc_msg, args=(self.s,)) - self.th.start() - - - def send(self, m): - self.msg = m - - def send_now(self, msg): - logger.debug("send_now:" + msg) - self.s.sendall(msg.encode("utf-8")) - - def on_msg_recv(self, j): - logger.debug("got:" + j['msg_type']) - - - def stop(self): - # signal proc_msg loop to stop, then wait for thread to finish - # close socket - self.do_process_msgs = False - if self.th is not None: - self.th.join() - if self.s is not None: - self.s.close() - - - def proc_msg(self, sock): - ''' - This is the thread message loop to process messages. - We will send any message that is queued via the self.msg variable - when our socket is in a writable state. - And we will read any messages when it's in a readable state and then - call self.on_msg_recv with the json object message. - ''' - sock.setblocking(0) - inputs = [ sock ] - outputs = [ sock ] - partial = [] - - while self.do_process_msgs: - # without this sleep, I was getting very consistent socket errors - # on Windows. Perhaps we don't need this sleep on other platforms. - time.sleep(self.poll_socket_sleep_sec) - - try: - # test our socket for readable, writable states. - readable, writable, exceptional = select.select(inputs, outputs, inputs) - - for s in readable: - # print("waiting to recv") - try: - data = s.recv(1024 * 256) - except ConnectionAbortedError: - logger.warn("socket connection aborted") - self.do_process_msgs = False - break - - # we don't technically need to convert from bytes to string - # for json.loads, but we do need a string in order to do - # the split by \n newline char. This seperates each json msg. - data = data.decode("utf-8") - msgs = data.split("\n") - - for m in msgs: - if len(m) < 2: - continue - last_char = m[-1] - first_char = m[0] - # check first and last char for a valid json terminator - # if not, then add to our partial packets list and see - # if we get the rest of the packet on our next go around. - if first_char == "{" and last_char == '}': - # Replace comma with dots for floats - # useful when using unity in a language different from English - m = replace_float_notation(m) - try: - j = json.loads(m) - self.on_msg_recv(j) - except Exception as e: - logger.error("Exception:" + str(e)) - logger.error("json: " + m) - else: - partial.append(m) - # logger.info("partial packet:" + m) - if last_char == '}': - if partial[0][0] == "{": - assembled_packet = "".join(partial) - assembled_packet = replace_float_notation(assembled_packet) - second_open = assembled_packet.find('{"msg', 1) - if second_open != -1: - # hmm what to do? We have a partial packet. Trimming just - # the good part and discarding the rest. - logger.warn("got partial packet:" + assembled_packet[:20]) - assembled_packet = assembled_packet[second_open:] - - try: - j = json.loads(assembled_packet) - self.on_msg_recv(j) - except Exception as e: - logger.error("Exception:" + str(e)) - logger.error("partial json: " + assembled_packet) - else: - logger.error("failed packet.") - partial.clear() - - for s in writable: - if self.msg != None: - logger.debug("sending " + self.msg) - s.sendall(self.msg.encode("utf-8")) - self.msg = None - if len(exceptional) > 0: - logger.error("problems w sockets!") - - except Exception as e: - print("Exception:", e) - self.aborted = True - self.on_msg_recv({"msg_type" : "aborted"}) - break +""" +SDClient + +A base class for interacting with the sdsim simulator as server. +The server will create on vehicle per client connection. The client +will then interact by createing json message to send to the server. +The server will reply with telemetry and other status messages in an +asynchronous manner. + +Author: Tawn Kramer +""" +import os +import time +import socket +import select +from threading import Thread +import json +import logging + +from .util import replace_float_notation + +logger = logging.getLogger(__name__) + +class SDClient: + def __init__(self, host, port, poll_socket_sleep_time=0.05): + self.msg = None + self.host = host + self.port = port + self.poll_socket_sleep_sec = poll_socket_sleep_time + self.th = None + + # the aborted flag will be set when we have detected a problem with the socket + # that we can't recover from. + self.aborted = False + self.connect() + + + def connect(self): + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # connecting to the server + logger.info("connecting to %s:%d " % (self.host, self.port)) + try: + self.s.connect((self.host, self.port)) + except ConnectionRefusedError as e: + raise( Exception("Could not connect to server. Is it running? If you specified 'remote', then you must start it manually.")) + + # time.sleep(pause_on_create) + self.do_process_msgs = True + self.th = Thread(target=self.proc_msg, args=(self.s,)) + self.th.start() + + + def send(self, m): + self.msg = m + + def send_now(self, msg): + logger.debug("send_now:" + msg) + self.s.sendall(msg.encode("utf-8")) + + def on_msg_recv(self, j): + logger.debug("got:" + j['msg_type']) + + + def stop(self): + # signal proc_msg loop to stop, then wait for thread to finish + # close socket + self.do_process_msgs = False + if self.th is not None: + self.th.join() + if self.s is not None: + self.s.close() + + + def proc_msg(self, sock): + ''' + This is the thread message loop to process messages. + We will send any message that is queued via the self.msg variable + when our socket is in a writable state. + And we will read any messages when it's in a readable state and then + call self.on_msg_recv with the json object message. + ''' + sock.setblocking(0) + inputs = [ sock ] + outputs = [ sock ] + localbuffer="" + + while self.do_process_msgs: + # without this sleep, I was getting very consistent socket errors + # on Windows. Perhaps we don't need this sleep on other platforms. + time.sleep(self.poll_socket_sleep_sec) + try: + # test our socket for readable, writable states. + readable, writable, exceptional = select.select(inputs, outputs, inputs) + + for s in readable: + try: + data = s.recv(1024 * 256) + except ConnectionAbortedError: + logger.warn("socket connection aborted") + print("socket connection aborted") + self.do_process_msgs = False + break + + # we don't technically need to convert from bytes to string + # for json.loads, but we do need a string in order to do + # the split by \n newline char. This seperates each json msg. + data = data.decode("utf-8") + + localbuffer += data + + n0=localbuffer.find("{") + n1=localbuffer.rfind("}") + if n1>=0 and n0>=0: # there is at least one message : + msgs=localbuffer[n0:n1+1].split("\n") + localbuffer=localbuffer[n1:] + + for m in msgs: + if len(m) <= 2: + continue + # Replace comma with dots for floats + # useful when using unity in a language different from English + m = replace_float_notation(m) + try: + j = json.loads(m) + except Exception as e: + logger.error("Exception:" + str(e)) + logger.error("json: " + m) + continue + + if 'msg_type' not in j: + logger.error('Warning expected msg_type field') + logger.error("json: " + m) + continue + else : + self.on_msg_recv(j) + + for s in writable: + if self.msg != None: + logger.debug("sending " + self.msg) + s.sendall(self.msg.encode("utf-8")) + self.msg = None + + if len(exceptional) > 0: + logger.error("problems w sockets!") + + except Exception as e: + print("Exception:", e) + self.aborted = True + self.on_msg_recv({"msg_type" : "aborted"}) + break From c00ab6247b3c92aa99f03621d2ed51e205173670 Mon Sep 17 00:00:00 2001 From: teinturman <32970797+teinturman@users.noreply.github.com> Date: Tue, 10 Nov 2020 09:11:08 +0100 Subject: [PATCH 2/2] modified proc_msg function to resolve malformed json packets. Added the test to cover the scenario "..}\n.{.." and by the way, the scenario "...{..." and "...}..." --- gym_donkeycar/core/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gym_donkeycar/core/client.py b/gym_donkeycar/core/client.py index 9c136aac0..fe096a6e6 100755 --- a/gym_donkeycar/core/client.py +++ b/gym_donkeycar/core/client.py @@ -111,7 +111,7 @@ def proc_msg(self, sock): n0=localbuffer.find("{") n1=localbuffer.rfind("}") - if n1>=0 and n0>=0: # there is at least one message : + if n1>=0 and n0>=0 and n0