diff --git a/AvionicsMockData.py b/AvionicsMockData.py index f36d95b..c73051f 100644 --- a/AvionicsMockData.py +++ b/AvionicsMockData.py @@ -34,6 +34,7 @@ print("GPS – Longitude: -16") print("OXIDIZER TANK PRESSURE: 1070928 1000*psi") print("COMBUSTION CHAMBER PRESSURE: -124294 1000*psi") + print("BATTERY VOLTAGE: -17") print("FLIGHTPHASE: 0 - PRELAUNCH") print("VENT_VALVE: 0 - CLOSED") print("INJECTION_VALVE: 0 - CLOSED") diff --git a/EEPROM_DMB_Test_FRAMEWORK_Revision.py b/EEPROM_DMB_Test_FRAMEWORK_Revision.py index d712031..2400301 100644 --- a/EEPROM_DMB_Test_FRAMEWORK_Revision.py +++ b/EEPROM_DMB_Test_FRAMEWORK_Revision.py @@ -1,9 +1,11 @@ +from _typeshed import Self import serial import time class AllDataStruct: - #Define a static variable - size = 73 + # Define a static variable + global size_bytes + size_bytes = 73 # TODO: This should have units (i.e. size_B or size_bytes) --Done def __init__(self): self.AGM_accelX = 0 @@ -26,7 +28,6 @@ def __init__(self): self.HAL_tick = 0 self.SOF_version = 0 - def __init__(self, binArray): self.AGM_accelX = int(binArray[0:4]) self.AGM_accelY = int(binArray[4:8]) @@ -49,214 +50,175 @@ def __init__(self, binArray): self.SOF_version = int(binArray[72:73]) def __repr__(self): - #return "AllDataStruct: _info_" - return F"AllData Object: \n AccelX: {self.AGM_accelX}\n AccelY: {self.AGM_accelY}\n AccelZ: {self.AGM_accelZ}\n GyroX: {self.AGM_gyroX}\n GyroY: {self.AGM_gyroY}\n GyroZ: {self.AGM_gyroZ}\n - MagnetoX: {self.AGM_magnetoX}\n MagnetoY: {self.AGM_magnetoY}\n MagnetoZ: {self.AGM_magnetoZ}\n Bar Pressure: {self.BAR_pressure}\n Bar Temperature: {self.BAR_temperature}\n - CCP Pressure: {self.CCP_pressure}\n GPS Latitude: {self.GPS_latitude}\n GPS Longitude: {self.GPS_longitude}\n GPS Altitude: {self.GPS_altitude}\n - OTP Pressure: {self.OTP_pressure}\n PHS Phase: {self.PHS_phase}\n HAL Tick: {self.HAL_tick}\n SOF Version: {self.SOF_version} ..." - + # TODO: Format into multiple lines (F"...\n" F"...\n" F"...\n") --Done + return (F"AllData Object: \n" + F"AccelX: {self.AGM_accelX}\n" + F"AccelY: {self.AGM_accelY}\n" + F"AccelZ: {self.AGM_accelZ}\n" + F"GyroX: {self.AGM_gyroX}\n" + F"GyroY: {self.AGM_gyroY}\n" + F"GyroZ: {self.AGM_gyroZ}\n" + F"MagnetoX: {self.AGM_magnetoX}\n" + F"MagnetoY: {self.AGM_magnetoY}\n" + F"MagnetoZ: {self.AGM_magnetoZ}\n" + F"Bar Pressure: {self.BAR_pressure}\n" + F"Bar Temperature: {self.BAR_temperature}\n" + F"CCP Pressure: {self.CCP_pressure}\n" + F"GPS Latitude: {self.GPS_latitude}\n" + F"GPS Longitude: {self.GPS_longitude}\n" + F"GPS Altitude: {self.GPS_altitude}\n" + F"OTP Pressure: {self.OTP_pressure}\n" + F"PHS Phase: {self.PHS_phase}\n" + F"HAL Tick: {self.HAL_tick}\n" + F"SOF Version: {self.SOF_version}...") + # TODO: Make "print_diff" method to cycle through __dict__ and check changes + def print_diff(self, actual): + for key in self.__dict__: + if self.__getattribute__(key) != actual.__getattribute__(key): + print(F"ATTR: Expected = {self}, " + F"Actual = {actual}") + """ + print_diff(self, actual): + for (key, _) in self.__dict__: + if self.getAttribute(key) != actual.getAttribute(key): + print("ATTR: Expected (self) = X, Actual (actual) = Y") + """ + + # TODO: Replace with `return self.__dict__ == other.__dict` --Done def __eq__(self, other): # Decide if self == other - # Does not check HAL_tick - result = True - # result = result and (self.AGM_accelX == other.AGM_accelX) - # result = result and (self.AGM_accelY == other.AGM_accelY) - # result = result and (self.AGM_accelZ == other.AGM_accelZ) - # result = result and (self.AGM_gyroX == other.AGM_gyroX) - # result = result and (self.AGM_gyroY == other.AGM_gyroY) - # result = result and (self.AGM_gyroZ == other.AGM_gyroZ) - # result = result and (self.AGM_magnetoX == other.AGM_magnetoX) - # result = result and (self.AGM_magnetoY == other.AGM_magnetoY) - # result = result and (self.AGM_magnetoZ == other.AGM_magnetoZ) - # result = result and (self.BAR_pressure == other.BAR_pressure) - # result = result and (self.BAR_temperature == other.BAR_temperature) - # result = result and (self.CCP_pressure == other.CCP_pressure) - # result = result and (self.GPS_latitude == other.GPS_latitude) - # result = result and (self.GPS_longitude == other.GPS_longitude) - # result = result and (self.GPS_altitude == other.GPS_altitude) - # result = result and (self.SOF_version == other.SOF_version) - # result = result and (self.HAL_tick == other.HAL_tick) - - if(self.AGM_accelX != other.AGM_accelX): - print("This is what the AGM AccelX actual is: " + other.AGM_accelX + "and this what expected is: " + self.AGM_accelX) - result = False - if(self.AGM_accelY != other.AGM_accelY): - print("This is what the AGM AccelY actual is: " + other.AGM_accelY + "and this what expected is: " + self.AGM_accelY) - result = False - if(self.AGM_accelZ != other.AGM_accelZ): - print("This is what the AGM AccelZ actual is: " + other.AGM_accelZ + "and this what expected is: " + self.AGM_accelZ) - result = False - if(self.AGM_gyroX != other.AGM_gyroX): - print("This is what the AGM GyroX actual is: " + other.AGM_gyroX + "and this what expected is: " + self.AGM_gyroX) - result = False - if(self.AGM_gyroY != other.AGM_gyroY): - print("This is what the AGM GyroY actual is: " + other.AGM_gyroY + "and this what expected is: " + self.AGM_gyroY) - result = False - if(self.AGM_gyroZ != other.AGM_gyroZ): - print("This is what the AGM GyroZ actual is: " + other.AGM_gyroZ + "and this what expected is: " + self.AGM_gyroZ) - result = False - if(self.AGM_magnetoX != other.AGM_magnetoX): - print("This is what the AGM MagnetoX actual is: " + other.AGM_magnetoX + "and this what expected is: " + self.AGM_magnetoX) - result = False - if(self.AGM_magnetoY != other.AGM_magnetoY): - print("This is what the AGM MagnetoY actual is: " + other.AGM_magnetoY + "and this what expected is: " + self.AGM_magnetoY) - result = False - if(self.AGM_magnetoZ != other.AGM_magnetoZ): - print("This is what the AGM MagnetoZ actual is: " + other.AGM_magnetoZ + "and this what expected is: " + self.AGM_magnetoZ) - result = False - if(self.BAR_pressure != other.BAR_pressure): - print("This is what the BAR Pressure actual is: " + other.BAR_pressure + "and this what expected is: " + self.BAR_pressure) - result = False - if(self.BAR_temperature != other.BAR_temperature): - print("This is what the BAR Temperature actual is: " + other.BAR_temperature + "and this what expected is: " + self.BAR_temperature) - result = False - if(self.CCP_pressure != other.CCP_pressure): - print("This is what the CCP Pressure actual is: " + other.CCP_pressure + "and this what expected is: " + self.CCP_pressure) - result = False - if(self.GPS_latitude != other.GPS_latitude): - print("This is what the GPS Latitude actual is: " + other.GPS_latitude + "and this what expected is: " + self.GPS_latitude) - result = False - if(self.GPS_longitude != other.GPS_longitude): - print("This is what the GPS Longitude actual is: " + other.GPS_longitude + "and this what expected is: " + self.GPS_longitude) - result = False - if(self.GPS_altitude != other.GPS_altitude): - print("This is what the GPS Altitude actual is: " + other.GPS_altitude + "and this what expected is: " + self.GPS_altitude) - result = False - if(self.OTP_pressure != other.OTP_pressure): - print("This is what the OTP Pressure actual is: " + other.OTP_pressure + "and this what expected is: " + self.OTP_pressure) - result = False - if(self.PHS_phase != other.PHS_phase): - print("This is what the PHS Phase actual is: " + other.PHS_phase + "and this what expected is: " + self.PHS_phase) - result = False - if(self.HAL_tick != other.HAL_tick): - print("This is what the HAL Tick actual is: " + other.HAL_tick + "and this what expected is: " + self.HAL_tick) - result = False - if(self.SOF_version != other.SOF_version): - print("This is what the SOF Version actual is: " + other.SOF_version + "and this what expected is: " + self.SOF_version) - result = False + return self.__dict__ == other.__dict__ # # Start of Functions # def writeAndReadTestData(ser): - ser.open() - #Instantiate the All Data Struct members - adsExpected = AllDataStruct() - adsExpected.AGM_accelX = 1 - adsExpected.AGM_accelY = 2 - adsExpected.AGM_accelZ = 3 - adsExpected.AGM_gyroX = 4 - adsExpected.AGM_gyroY = 5 - adsExpected.AGM_gyroZ = 6 - adsExpected.AGM_magnetoX = 7 - adsExpected.AGM_magnetoY = 8 - adsExpected.AGM_magnetoZ = 9 - adsExpected.BAR_pressure = 10 - adsExpected.BAR_temperature = 11 - adsExpected.CCP_pressure = 12 - adsExpected.GPS_latitude = 13 - adsExpected.GPS_longitude = 14 - adsExpected.GPS_altitude = 15 - adsExpected.OTP_pressure = 16 - adsExpected.PHS_phase = 17 - adsExpected.HAL_tick = 18 - adsExpected.SOF_version = 19 - - #BEGIN DONT NEED - # Send ADS to flight board - #numBytesRead = ser.write(adsExpected.toByteArray() <-- Needs implementing) - # END DONT NEED - - # Command flight board to run test - #The b' is the byte literal in Python - ser.write(b't') + # TODO: Change to check if open (Fail and return if not) --Done + """This function will write an AllData struct to the EEPROM, + sends a 't' character over the UART connection. + Keyword Arguments: + ser - For serial connection to communicate. + """ + if(ser.isOpen() == True): + #Instantiate the All Data Struct members + adsExpected = AllDataStruct() # TODO: Maybe optimize this? Or maybe not + adsExpected.AGM_accelX = 1 + adsExpected.AGM_accelY = 2 + adsExpected.AGM_accelZ = 3 + adsExpected.AGM_gyroX = 4 + adsExpected.AGM_gyroY = 5 + adsExpected.AGM_gyroZ = 6 + adsExpected.AGM_magnetoX = 7 + adsExpected.AGM_magnetoY = 8 + adsExpected.AGM_magnetoZ = 9 + adsExpected.BAR_pressure = 10 + adsExpected.BAR_temperature = 11 + adsExpected.CCP_pressure = 12 + adsExpected.GPS_latitude = 13 + adsExpected.GPS_longitude = 14 + adsExpected.GPS_altitude = 15 + adsExpected.OTP_pressure = 16 + adsExpected.PHS_phase = 17 + adsExpected.HAL_tick = 18 + adsExpected.SOF_version = 19 + + # Command flight board to run test + #The b' is the byte literal in Python + ser.write(b't') + else: + ser.open() - # Read ADS from flight board - adsActual = AllDataStruct(ser.read(size)) + # TODO: Put remaining code in try/except IndexError to handle timeout https://www.w3schools.com/python/python_try_except.asp --Done + try: + # Read ADS from flight board + adsActual = AllDataStruct(ser.read(size_bytes)) - # Compare - if (adsExpected == adsActual): - # Pass - print("All the expected and actual values were the same") - # Fail case has been moved compare earlier in the code - # if statements + # Compare + if (adsExpected == adsActual): + # Pass + print("All the expected and actual values were the same") - print("For closing the serial port press q") - #ser.write(b'q') More for the user talking to the script - #Remove the closePort() + # TODO: Don't need this --Done + else: + # TODO: Else case `print(adsExpects.print_diff(adsActual))` --Done + print(adsExpected.print_diff(adsActual)) + except IndexError: + print("An Index Error!") def dumpCurrentData(ser): -ser.open() -#Instantiate the All Data Struct members -#Needs to be indentend over - adsExpected = AllDataStruct() - adsExpected.AGM_accelX = 1 - adsExpected.AGM_accelY = 2 - adsExpected.AGM_accelZ = 3 - adsExpected.AGM_gyroX = 4 - adsExpected.AGM_gyroY = 5 - adsExpected.AGM_gyroZ = 6 - adsExpected.AGM_magnetoX = 7 - adsExpected.AGM_magnetoY = 8 - adsExpected.AGM_magnetoZ = 9 - adsExpected.BAR_pressure = 10 - adsExpected.BAR_temperature = 11 - adsExpected.CCP_pressure = 12 - adsExpected.GPS_latitude = 13 - adsExpected.GPS_longitude = 14 - adsExpected.GPS_altitude = 15 - adsExpected.OTP_pressure = 16 - adsExpected.PHS_phase = 17 - adsExpected.HAL_tick = 18 - adsExpected.SOF_version = 19 - - -# Command flight board to dump the data on the screen -#The b' is the byte literal in Python -ser.write(b'd') - -# Read ADS from flight board -adsActual = AllDataStruct(ser.read(size)) - print(adsActual) - -# Compare -if (adsExpected == adsActual): - # Pass - print(repr(AllDataStruct())) -else: - # Fail - print("The values were not the same close the serial port by pressing q") - ser.write(b'q') - closePort() - -def printHelp(ser=None): + #TODO: Docstrings! https://www.python.org/dev/peps/pep-0257/ Add these to all functions --Done + """This function will dump the data onto the screen, sends + a 'd' to the flight board to achieve this feaature. + Also ensures that this will dump the most recent + AllData struct from the EEPROM. + Keyword Arguments: + ser - For serial connection to communicate. + """ + # TODO: Change to check if open (Fail and return if not) --Done + if(ser.isOpen() == True): + + # TODO: This should just read and print, no need to compare --Done + + # Command flight board to dump the data on the screen + #The b' is the byte literal in Python + ser.write(b'd') + + # Read ADS from flight board + adsActual = AllDataStruct(ser.read(size_bytes)) + print(adsActual) + else: + ser.open() + + # TODO: Don't need to compare --Done + +# TODO: Doesn't need ser--Done +def printHelp(): + """This function will print the list of commands in a + mannerly form when the user sends a 'h' to ensure this + is displayed. + Keyword Arguments: + None + """ + # TODO: format into one print statement --Done print("Commands:") - print(" t - Write and read a test value") - print(" d - Dump most recent value") - print(" p - Print help") - print(" q - Quit") + print(" t - Write and read a test value\n" + " d - Dump most recent value\n" + " h - Print help\n" + " q - Quit\n") #CLosing serial port function -def closePort(): +# TODO: Does need ser -- Done +def quit(ser): # TODO: Probably name "quit", and handle ALL cleanup, then exit --Done + """This function will close the communication between the serial + port when the user sends a 'q'. + Keyword Arguments: + ser - For serial connection to communicate. + """ ser.close() print("The serial port is now closed") + exit() + # TODO: exit()--Done # # Main # -commandDictionary = {'t': writeAndReadTestData, 'd': dumpCurrentData, 'p': printHelp, 'q': closePort} -ser = NONE -print("SOAR EEPROM Test Shell") +# TODO: Make "if main" check up here, and make sure all remaining "main" code is tabbed into it --Done if __name__ == "__main__": + + # TODO: Better command names? Help should be h --Done + commandDictionary = {'t': writeAndReadTestData, 'd': dumpCurrentData, 'h': printHelp, 'q': quit} + ser = None + + print("SOAR EEPROM Test Shell") port = input('Enter a Serial Port to connect to: ') # Linux: /dev/ttyUSBx, Windows: COMx - ser = serial.Serial(port, 9600, timeout=0) + ser = serial.Serial(port, 9600, timeout=1) # TODO: Set timeout to 1 --Done ser.open() -while True: - command = input("> ") + while True: + command = input("> ") - if command in commandDictionary.keys(): - commandDictionary[command](ser) - else: - print("Command not found!") - printHelp() + if command in commandDictionary.keys(): + commandDictionary[command](ser) + else: + print("Command not found!") + printHelp() \ No newline at end of file diff --git a/parseData.py b/parseData.py index 9bab6d4..da72af9 100644 --- a/parseData.py +++ b/parseData.py @@ -11,6 +11,7 @@ def __init__(self): self.gps = [0] * 4 self.oxi = -1 self.cmb = -1 + self.bv = -1 self.phs = -1 self.upperVnt = -1 self.injValve = -1 @@ -27,6 +28,7 @@ def __str__(self): string+="BAR - TEMP:\t\t"+ str(self.bar[1]/100)+" degrees C"+"\n" string+="OXI - P:\t\t"+ str(self.oxi/1000)+" psi"+"\n" string+="CMB - P:\t\t"+ str(self.cmb/1000)+" psi"+"\n" + string+="BAT - VOLT:\t\t"+ str(self.bv * 4)+" V"+"\n" #Not Quite Sure about this string+="PHS - PHASE:\t\t"+ str(phases[self.phs+1])+"\n" string+="Inj Valve:\t\t" + str(valveStatus[self.injValve+1])+"\n" string+="Lower Vent:\t\t" + str(valveStatus[self.lowerVnt+1])+"\n" @@ -48,7 +50,7 @@ def disconnect(ser): time.sleep(1) return None -def readSerial(ser,data): +def readSerial(ser,data, logFilePath): line = ser.read(256).hex() i = 0 print(line) @@ -109,6 +111,13 @@ def readSerial(ser,data): i+=12 else: i+=1 + #Battery Voltage + elif((line[i:i+8]=='37373737') and (len(line)-i>=15)): + if(line[i+16:i+18]=='00'): + data.bv = twos_complement(line[i+8:i+16], 32) + i+=14 + else: i+=1 + #Injection Valve Status elif((line[i:i+8]=='38383838') and (len(line)-i>=13)): if(line[i+10:i+12]=='00'): @@ -126,7 +135,12 @@ def readSerial(ser,data): #No packet detected else: i+=1 print(data) - + if (logFilePath != None): + with open(logFilePath, 'a') as logFile: + logFile.write(data) + logFile.write(line) #Adding the line variable to file as well writing that into it as well + + if __name__ == "__main__": ser = None @@ -134,7 +148,18 @@ def readSerial(ser,data): while(True): port = input('Enter a Serial Port to connect to:') #Linux: /dev/ttyUSBx, Windows: COMx ser = serial.Serial(port, 9600, timeout=0) + currentDateTime = time.strftime("%Y-%m-%dT%H-%M-%S") + logFilePath = 'Logs/ParseDataLog' + currentDateTime + '.txt' + logToFile = None + while logToFile == None: + answer = input('Do you want to see the current logging of data Yes(Y) or No(N)').strip().upper()[0] + #Fixed the identation no more infinite loop + if answer == 'Y': + logToFile = True + elif answer == 'N': + logToFile = False while(ser!=None): time.sleep(1) - readSerial(ser, data) + readSerial(ser, data, logFilePath if logToFile else None) +