From 2d1ebf2493aed055940e01b1b39dc51619f2b4da Mon Sep 17 00:00:00 2001 From: Ryan howard Date: Fri, 15 Nov 2024 11:55:49 -0500 Subject: [PATCH] format --- .../drivers/sealed_pressure_fixture.py | 34 +++++---- .../opentrons_api/helpers_ot3.py | 2 +- .../test_droplets.py | 14 ++-- .../test_environmental_sensor.py | 15 +++- .../test_pressure.py | 74 +++++++++++++------ .../ninety_six_assembly_tip_motors.py | 4 + 6 files changed, 98 insertions(+), 45 deletions(-) diff --git a/hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py b/hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py index ca8221f2282..712f760a9cd 100644 --- a/hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py +++ b/hardware-testing/hardware_testing/drivers/sealed_pressure_fixture.py @@ -9,7 +9,6 @@ class SerialDriver: - @classmethod def get_com_list(cls): port_list = serial.tools.list_ports.comports() @@ -38,8 +37,14 @@ def init_serial(self, baud): :param baud: :return: """ - self.com = serial.Serial(self.device, baud, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, - bytesize=serial.EIGHTBITS, timeout=1) + self.com = serial.Serial( + self.device, + baud, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.EIGHTBITS, + timeout=1, + ) if self.com.isOpen(): print(f"{self.device} Opened! \n") # settings @@ -66,7 +71,9 @@ def init(self, baud): except: print("Can't find device") - def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, delay=None, times=30): + def write_and_get_buffer( + self, send: Union[str, int, bytes], only_write=False, delay=None, times=30 + ): """ send cmd :return: @@ -74,7 +81,7 @@ def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, d if self.com is None: return if type(send) is not bytes: - send = (send + "\r\n").encode('utf-8') + send = (send + "\r\n").encode("utf-8") self.com.flushInput() self.com.flushOutput() self.com.write(send) @@ -88,12 +95,12 @@ def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, d for i in range(times): data = self.com.read(ReceiveBuffer) if type(data) is not bytes: - if "OK" not in data.decode('utf-8') or "busy" in data.decode('utf-8'): + if "OK" not in data.decode("utf-8") or "busy" in data.decode("utf-8"): time.sleep(1) continue else: return data - return data.decode('utf-8') + return data.decode("utf-8") def read_buffer(self): """ @@ -110,8 +117,8 @@ def read_buffer(self): data = self.com.read(length) self.com.flushInput() self.com.flushOutput() - return data.decode('utf-8') - + return data.decode("utf-8") + def get_pressure(self): """ analyze pressure value @@ -119,10 +126,10 @@ def get_pressure(self): for _i in range(5): try: respond = self.read_buffer() - respond_list = respond.split('|') + respond_list = respond.split("|") respond_value = respond_list[1] - - average_value = respond_value.split('\r\n')[0].split('\t')[1].strip() + + average_value = respond_value.split("\r\n")[0].split("\t")[1].strip() average_value = float(average_value) return average_value except: @@ -130,8 +137,7 @@ def get_pressure(self): pass - -if __name__ == '__main__': +if __name__ == "__main__": s = SerialDriver() s.init(9600) for i in range(100): diff --git a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py index 102e4148a1f..224dcdda108 100644 --- a/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py +++ b/hardware-testing/hardware_testing/opentrons_api/helpers_ot3.py @@ -1101,7 +1101,7 @@ def get_pipette_serial_ot3(pipette: Union[PipetteOT2, PipetteOT3]) -> str: elif "96" in model: channels = "H" else: - channels = "M" + channels = "M" version = model.split("v")[-1].strip().replace(".", "") assert pipette.pipette_id, f"no pipette_id found for pipette: {pipette}" if "P" in pipette.pipette_id: diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_droplets.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_droplets.py index d99d6a3544d..ec773a4b422 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_droplets.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_droplets.py @@ -205,20 +205,24 @@ async def _find_reservoir_pos() -> None: if not api.is_simulator: answer = ui.get_user_answer(f"Test {test_volume}uL") else: - answer = pipette + answer = True if not answer: continue - tip_volume = 50 if test_volume<=50 else pipette + tip_volume = 50 if test_volume <= 50 else pipette # PICK-UP 96 TIPS ui.print_header("JOG to 96-Tip RACK") if not api.is_simulator: - ui.get_user_ready(f"picking up tips, place tip-rack {tip_volume} on slot {TIP_RACK_96_SLOT}") + ui.get_user_ready( + f"picking up tips, place tip-rack {tip_volume} on slot {TIP_RACK_96_SLOT}" + ) await helpers_ot3.move_to_arched_ot3( api, OT3Mount.LEFT, tip_rack_96_a1_nominal + Point(z=30) ) await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT) - await api.pick_up_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(tip_volume)) + await api.pick_up_tip( + OT3Mount.LEFT, helpers_ot3.get_default_tip_length(tip_volume) + ) await api.home_z(OT3Mount.LEFT) if not api.is_simulator: ui.get_user_ready("about to move to RESERVOIR") @@ -229,7 +233,7 @@ async def _find_reservoir_pos() -> None: ret, duration = await aspirate_and_wait( api, reservoir_a1_actual, test_volume, seconds=NUM_SECONDS_TO_WAIT ) - result = result&ret + result = result & ret await api.home_z(OT3Mount.LEFT) await _drop_tip(api, trash_nominal) report(section, "droplets-96-tips", [duration, CSVResult.from_bool(result)]) diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_environmental_sensor.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_environmental_sensor.py index ac8e6050fca..f3bf239f6e7 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_environmental_sensor.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_environmental_sensor.py @@ -26,7 +26,9 @@ def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]: """Build CSV Lines.""" return [ - CSVLine(f"environment-{sensor_id.name}-celsius-humidity", [float, float, CSVResult]) + CSVLine( + f"environment-{sensor_id.name}-celsius-humidity", [float, float, CSVResult] + ) for sensor_id in [SensorId.S0, SensorId.S1] ] @@ -66,10 +68,15 @@ async def run( humidity = _remove_outliers_and_average(humidity_samples) print(f"[{sensor_id.name}] Celsius = {round(celsius, 2)} degrees") print(f"[{sensor_id.name}] Humidity = {round(humidity, 2)} percent") - air_params = air_params & True if TEMPERATURE_THRESHOLD[0] <= celsius <= TEMPERATURE_THRESHOLD[1] and HUMIDITY_THRESHOLD[0] <= humidity <= HUMIDITY_THRESHOLD[1] else False - + air_params = ( + air_params & True + if TEMPERATURE_THRESHOLD[0] <= celsius <= TEMPERATURE_THRESHOLD[1] + and HUMIDITY_THRESHOLD[0] <= humidity <= HUMIDITY_THRESHOLD[1] + else False + ) + report( section, f"environment-{sensor_id.name}-celsius-humidity", - [celsius, humidity, CSVResult.from_bool(air_params)] + [celsius, humidity, CSVResult.from_bool(air_params)], ) diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_pressure.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_pressure.py index c059ad3b949..679695ecfb8 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_pressure.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_qc_ot3/test_pressure.py @@ -1,6 +1,8 @@ """Test Pressure.""" from asyncio import sleep -from hardware_testing.drivers.sealed_pressure_fixture import SerialDriver as SealedPressureDriver +from hardware_testing.drivers.sealed_pressure_fixture import ( + SerialDriver as SealedPressureDriver, +) from hardware_testing.opentrons_api import helpers_ot3 from typing import List, Union, Literal @@ -24,9 +26,13 @@ USE_SEALED_FIXTURE = False USE_SEALED_BLOCK = True -PRIMARY_SEALED_PRESSURE_FIXTURE_POS = Point(362.68, 148.83, 49.4) if USE_SEALED_BLOCK else Point(362.68, 148.83, 44.4) # attached tip -SECOND_SEALED_PRESSURE_FIXTURE_POS = Point(264.71, 212.81, 49.4) if USE_SEALED_BLOCK else Point(264.71, 212.81, 44.4) # attached tip -SET_PRESSURE_TARGET = 100 # read air pressure when the force pressure value is over 100 +PRIMARY_SEALED_PRESSURE_FIXTURE_POS = ( + Point(362.68, 148.83, 49.4) if USE_SEALED_BLOCK else Point(362.68, 148.83, 44.4) +) # attached tip +SECOND_SEALED_PRESSURE_FIXTURE_POS = ( + Point(264.71, 212.81, 49.4) if USE_SEALED_BLOCK else Point(264.71, 212.81, 44.4) +) # attached tip +SET_PRESSURE_TARGET = 100 # read air pressure when the force pressure value is over 100 REACHED_PRESSURE = 0 SECONDS_BETWEEN_READINGS = 0.25 @@ -38,7 +44,7 @@ SLOT_FOR_PICK_UP_TIP = 5 TIP_RACK_FOR_PICK_UP_TIP = f"opentrons_flex_96_tiprack_{TIP_VOLUME}ul" A1_OFFSET = Point(x=9 * 11, y=-9 * 7) -H12_OFFSET = Point(x=-9*11, y=9*7) +H12_OFFSET = Point(x=-9 * 11, y=9 * 7) OFFSET_FOR_1_WELL_LABWARE = Point(x=9 * -11 * 0.5, y=9 * 7 * 0.5) THRESHOLDS = { @@ -113,7 +119,10 @@ def check_value(test_value: float, test_name: str) -> CSVResult: else: return CSVResult.FAIL -async def calibrate_to_pressue_fixture(api: OT3API, sensor:SealedPressureDriver, fixture_pos:Point): + +async def calibrate_to_pressue_fixture( + api: OT3API, sensor: SealedPressureDriver, fixture_pos: Point +): """move to suitable height for readding air pressure""" global REACHED_PRESSURE await api.move_to(OT3Mount.LEFT, fixture_pos) @@ -156,11 +165,14 @@ async def _partial_pick_up(api: OT3API, position: Point, current: float) -> None position, safe_height=position.z + 10, ) - await _partial_pick_up_z_motion(api, current=current, distance=12, speed=3) # change distance and speed, in case collision detected error + await _partial_pick_up_z_motion( + api, current=current, distance=12, speed=3 + ) # change distance and speed, in case collision detected error await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME)) await api.prepare_for_aspirate(OT3Mount.LEFT) await api.home_z(OT3Mount.LEFT) + async def run( api: OT3API, report: CSVReport, section: str, pipette: Literal[200, 1000] ) -> None: @@ -177,15 +189,19 @@ async def run( # move to slot if not api.is_simulator: ui.get_user_ready(f"Place tip tack 50ul at slot - {SLOT_FOR_PICK_UP_TIP}") - #await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME)) + # await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME)) - tip_rack_pos = helpers_ot3.get_theoretical_a1_position(SLOT_FOR_PICK_UP_TIP, TIP_RACK_FOR_PICK_UP_TIP) + tip_rack_pos = helpers_ot3.get_theoretical_a1_position( + SLOT_FOR_PICK_UP_TIP, TIP_RACK_FOR_PICK_UP_TIP + ) await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, tip_rack_pos + Point(z=30)) await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT) tip_rack_actual_pos = await api.gantry_position(OT3Mount.LEFT) for probe in InstrumentProbeType: - await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, tip_rack_pos + Point(z=50)) + await helpers_ot3.move_to_arched_ot3( + api, OT3Mount.LEFT, tip_rack_pos + Point(z=50) + ) sensor_id = sensor_id_for_instrument(probe) ui.print_header(f"Sensor: {probe}") @@ -221,14 +237,20 @@ async def run( await _partial_pick_up(api, tip_pos, current=0.1) await api.prepare_for_aspirate(OT3Mount.LEFT) if not (USE_SEALED_FIXTURE or USE_SEALED_BLOCK): - ui.get_user_ready("SEAL tip using your FINGER") + ui.get_user_ready("SEAL tip using your FINGER") else: - await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50)) + await helpers_ot3.move_to_arched_ot3( + api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50) + ) ui.get_user_ready("Ready for moving to sealed fixture") if USE_SEALED_FIXTURE: - await calibrate_to_pressue_fixture(api, pressure_sensor, fixture_pos) + await calibrate_to_pressue_fixture( + api, pressure_sensor, fixture_pos + ) else: - await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos) + await helpers_ot3.move_to_arched_ot3( + api, OT3Mount.LEFT, fixture_pos + ) try: sealed_pa = await _read_from_sensor( @@ -238,11 +260,17 @@ async def run( ui.print_error(f"{probe} pressure sensor not working, skipping") break else: - await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME)) + await api.add_tip( + OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME) + ) await api.prepare_for_aspirate(OT3Mount.LEFT) print(f"sealed-pa: {sealed_pa}") sealed_result = check_value(sealed_pa, "sealed-pa") - report(section, _get_test_tag(probe, "sealed-pa"), [sealed_pa, sealed_result, REACHED_PRESSURE]) + report( + section, + _get_test_tag(probe, "sealed-pa"), + [sealed_pa, sealed_result, REACHED_PRESSURE], + ) # ASPIRATE-Pa aspirate_pa = 0.0 @@ -278,17 +306,21 @@ async def run( section, _get_test_tag(probe, "dispense-pa"), [dispense_pa, dispense_result] ) if USE_SEALED_FIXTURE or USE_SEALED_BLOCK: - await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50)) + await helpers_ot3.move_to_arched_ot3( + api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50) + ) if not api.is_simulator: ui.get_user_ready("REMOVE tip") - trash_nominal = helpers_ot3.get_slot_calibration_square_position_ot3(12) + Point(z=40) + trash_nominal = helpers_ot3.get_slot_calibration_square_position_ot3( + 12 + ) + Point(z=40) # center the 96ch of the 1-well labware trash_nominal += OFFSET_FOR_1_WELL_LABWARE - await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, trash_nominal + Point(z=20)) + await helpers_ot3.move_to_arched_ot3( + api, OT3Mount.LEFT, trash_nominal + Point(z=20) + ) await api.move_to(OT3Mount.LEFT, trash_nominal) await api.drop_tip(OT3Mount.LEFT) await api.remove_tip(OT3Mount.LEFT) await api.home() - - diff --git a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_tip_motors.py b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_tip_motors.py index 5096056a8d5..8db39a4cd47 100644 --- a/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_tip_motors.py +++ b/hardware-testing/hardware_testing/production_qc/ninety_six_assembly_tip_motors.py @@ -49,6 +49,7 @@ async def _check_if_jaw_is_aligned_with_endstop(api: OT3API) -> bool: ui.print_error("endstop hit too early") return pass_no_hit + async def jaw_precheck(api: OT3API, ax: Axis, speed: float) -> Tuple[bool, bool]: """Check the LEDs work and jaws are aligned.""" # HOME @@ -73,6 +74,7 @@ async def jaw_precheck(api: OT3API, ax: Axis, speed: float) -> Tuple[bool, bool] ui.print_error("Jaws Misaligned") return led_check, jaws_aligned + async def _run_test_jaw(api: OT3API): ax = Axis.Q settings = helpers_ot3.get_gantry_load_per_axis_motion_settings_ot3(api, ax) @@ -85,6 +87,7 @@ async def _save_result(tag: str, led_check: bool, jaws_aligned: bool) -> bool: else: no_hit = False return led_check and jaws_aligned and no_hit + await api.home_z(OT3Mount.LEFT) slot_5 = helpers_ot3.get_slot_calibration_square_position_ot3(5) home_pos = await api.gantry_position(OT3Mount.LEFT) @@ -122,6 +125,7 @@ async def _save_result(tag: str, led_check: bool, jaws_aligned: bool) -> bool: print("homing...") await api.home([ax]) + async def _main() -> None: print("start") api = await helpers_ot3.build_async_ot3_hardware_api(is_simulating=False)