From 09c86e65052fa38c9339fb3cd993cad75732469c Mon Sep 17 00:00:00 2001 From: Carlos Date: Wed, 28 Feb 2024 14:47:13 -0500 Subject: [PATCH] changed name of file --- .../scripts/96_channel_partial_pickup.py | 613 ++++++++++++++++++ 1 file changed, 613 insertions(+) create mode 100644 hardware-testing/hardware_testing/scripts/96_channel_partial_pickup.py diff --git a/hardware-testing/hardware_testing/scripts/96_channel_partial_pickup.py b/hardware-testing/hardware_testing/scripts/96_channel_partial_pickup.py new file mode 100644 index 00000000000..2c2a6396957 --- /dev/null +++ b/hardware-testing/hardware_testing/scripts/96_channel_partial_pickup.py @@ -0,0 +1,613 @@ +"""Partial Tip Pick up For the 96 Channel.""" +import argparse +# import ast +import asyncio +import csv +import time +from typing import Tuple, Dict, Optional +from threading import Thread +import datetime +import os, sys +import termios +import tty +import json + +from hardware_testing.opentrons_api.types import ( + OT3Mount, + Axis, + Point, + CriticalPoint, +) +from hardware_testing.opentrons_api.helpers_ot3 import ( + build_async_ot3_hardware_api, + home_ot3, + move_plunger_absolute_ot3, + get_plunger_positions_ot3, + update_pick_up_current, + update_pick_up_speed, + update_pick_up_distance, + # update_drop_tip_speed, + _get_pipette_from_mount, +) + +from opentrons.config.types import LiquidProbeSettings + +from hardware_testing import data +from hardware_testing.drivers.mark10 import Mark10 +from hardware_testing.drivers import mitutoyo_digimatic_indicator + +aspirate_depth = 10 +dispense_depth = 3 +liquid_retract_dist = 12 +liquid_retract_speed = 5 +retract_dist = 100 +retract_speed = 60 + +leak_test_time = 30 +test_volume = 1000 + +def dict_keys_to_line(dict): + return str.join(",", list(dict.keys())) + "\n" + + +def file_setup(test_data, details): + today = datetime.date.today() + test_name = "{}-pick_up-up-test-{}Amps".format( + details[0], # Pipette model + details[1], # Motor Current + ) + test_header = dict_keys_to_line(test_data) + test_tag = "-{}".format(today.strftime("%b-%d-%Y")) + test_id = data.create_run_id() + test_path = data.create_folder_for_test_data(test_name) + test_file = data.create_file_name(test_name, test_id, test_tag) + data.append_data_to_file(test_name, test_file, test_header) + print("FILE PATH = ", test_path) + print("FILE NAME = ", test_file) + return test_name, test_file + + +def dial_indicator_setup(port): + gauge = mitutoyo_digimatic_indicator.Mitutoyo_Digimatic_Indicator(port=port) + gauge.connect() + return gauge + + +def getch(): + """ + fd: file descriptor stdout, stdin, stderr + This functions gets a single input keyboard character from the user + """ + + def _getch(): + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(fd) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + + return _getch() + + +async def jog(api, position, cp) -> Dict[Axis, float]: + step_size = [0.01, 0.05, 0.1, 0.5, 1, 10, 20, 50] + step_length_index = 3 + step = step_size[step_length_index] + xy_speed = 60 + za_speed = 65 + information_str = """ + Click >> i << to move up + Click >> k << to move down + Click >> a << to move left + Click >> d << to move right + Click >> w << to move forward + Click >> s << to move back + Click >> + << to Increase the length of each step + Click >> - << to decrease the length of each step + Click >> Enter << to save position + Click >> q << to quit the test script + """ + print(information_str) + while True: + input = getch() + if input == "a": + # minus x direction + sys.stdout.flush() + await api.move_rel( + mount, Point(-step_size[step_length_index], 0, 0), speed=xy_speed + ) + + elif input == "d": + # plus x direction + sys.stdout.flush() + await api.move_rel( + mount, Point(step_size[step_length_index], 0, 0), speed=xy_speed + ) + + elif input == "w": + # minus y direction + sys.stdout.flush() + await api.move_rel( + mount, Point(0, step_size[step_length_index], 0), speed=xy_speed + ) + + elif input == "s": + # plus y direction + sys.stdout.flush() + await api.move_rel( + mount, Point(0, -step_size[step_length_index], 0), speed=xy_speed + ) + + elif input == "i": + sys.stdout.flush() + await api.move_rel( + mount, Point(0, 0, step_size[step_length_index]), speed=za_speed + ) + + elif input == "k": + sys.stdout.flush() + await api.move_rel( + mount, Point(0, 0, -step_size[step_length_index]), speed=za_speed + ) + + elif input == "q": + sys.stdout.flush() + print("TEST CANCELLED") + quit() + + elif input == "+": + sys.stdout.flush() + step_length_index = step_length_index + 1 + if step_length_index >= 7: + step_length_index = 7 + step = step_size[step_length_index] + + elif input == "-": + sys.stdout.flush() + step_length_index = step_length_index - 1 + if step_length_index <= 0: + step_length_index = 0 + step = step_size[step_length_index] + + elif input == "\r": + sys.stdout.flush() + position = await api.current_position_ot3( + mount, refresh=True, critical_point=cp + ) + print("\r\n") + return position + position = await api.current_position_ot3( + mount, refresh=True, critical_point=cp + ) + + print( + "Coordinates: ", + round(position[Axis.X], 2), + ",", + round(position[Axis.Y], 2), + ",", + round(position[Axis.by_mount(mount)], 2), + " Motor Step: ", + step_size[step_length_index], + end="", + ) + print("\r", end="") + + +async def countdown(count_time: float): + """ + This function loops through a countdown before checking the leak visually + """ + time_suspend = 0 + while time_suspend < count_time: + await asyncio.sleep(1) + time_suspend += 1 + print(f"Remaining: {count_time-time_suspend} (s)", end="") + print("\r", end="") + print("") + + +async def update_pickup_tip_speed(api, mount, speed) -> None: + """Update drop-tip current.""" + pipette = _get_pipette_from_mount(api, mount) + config_model = pipette.pick_up_configurations + config_model.speed = speed + pipette.pick_up_configurations = config_model + print(pipette.pick_up_configurations) + +async def move_to_point(api, mount, point, cp): + home_pos = api.get_instrument_max_height(mount, cp) + pos = await api.current_position_ot3(mount, refresh=True, critical_point = cp) + await api.move_to(mount, + Point(pos[Axis.X], + pos[Axis.Y], + home_pos)) + await api.move_to(mount, + Point(point.x, + point.y, + home_pos)) + await api.move_to(mount, + Point(point.x, + point.y, + point.z)) + +def load_config_(filename: str) -> Dict: + """This function loads a given config file""" + try: + with open(filename, 'r') as file: + data = json.load(file) + except FileNotFoundError: + print('Warning: {0} not found'.format(filename)) + data = {} + except json.decoder.JSONDecodeError: + print('Error: {0} is corrupt'.format(filename)) + data = {} + return data + +def save_config_(filename: str, data: str) -> Dict: + """This function saves a given config file with data""" + try: + with open(filename, 'w') as file: + json.dump( + data, file, sort_keys=True, indent=4, separators=(',', ': ') + ) + except FileNotFoundError: + print('Warning: {0} not found'.format(filename)) + data = {} + except json.decoder.JSONDecodeError: + print('Error: {0} is corrupt'.format(filename)) + data = {} + return data + +async def calibrate_tiprack(api, home_position, mount): + cp = CriticalPoint.NOZZLE + + tiprack_loc = Point( + deck_slot['deck_slot'][args.tiprack_slot]['X'], + deck_slot['deck_slot'][args.tiprack_slot]['Y'], + deck_slot['deck_slot'][args.tiprack_slot]['Z']) + print(tiprack_loc) + print("Calibrate for Pick up tip") + await move_to_point(api, mount, tiprack_loc, cp) + current_position = await api.current_position_ot3(mount, cp) + tiprack_loc = await jog(api, current_position, cp) + tiprack_loc = Point(tiprack_loc[Axis.X], + tiprack_loc[Axis.Y], + tiprack_loc[Axis.by_mount(mount)]) + await api.pick_up_tip( + mount, tip_length=tip_length[args.tip_size], + presses = 1, + increment = 0) + await api.home([Axis.Z_L]) + cp = CriticalPoint.TIP + await asyncio.sleep(1) + home_with_tip = await api.current_position(mount, cp) + print("Calibrate Drop Tip Position") + drop_tip_loc = await jog(api, home_with_tip, cp) + drop_tip_loc = Point(drop_tip_loc[Axis.X], + drop_tip_loc[Axis.Y], + drop_tip_loc[Axis.by_mount(mount)]) + return tiprack_loc, drop_tip_loc + +async def update_nozzle_manager(api, mount, tip_count): + if args.nozzles == 1: + await api.update_nozzle_configuration_for_mount(OT3Mount.LEFT, 'A1', 'A1') + elif args.nozzles == 8: + await api.update_nozzle_configuration_for_mount(OT3Mount.LEFT, 'A1', 'H1') + elif args.nozzles == 16: + await api.update_nozzle_configuration_for_mount(OT3Mount.LEFT, 'A1', 'H2') + elif args.nozzles == 24: + await api.update_nozzle_configuration_for_mount(OT3Mount.LEFT, 'A1', 'H1') + elif args.nozzles == 12: + await api.update_nozzle_configuration_for_mount(OT3Mount.LEFT, 'A1', 'A12') + +async def _main() -> None: + today = datetime.date.today() + hw_api = await build_async_ot3_hardware_api( + is_simulating=args.simulate, use_defaults=True + ) + await asyncio.sleep(1) + await hw_api.cache_instruments() + pipette_model = hw_api.get_all_attached_instr()[OT3Mount.LEFT]["pipette_id"] + dial_data = {"Column_1": None, "Column_2": None, "Column_3": None, + "Column_4": None, "Column_5": None, "Column_6": None, + "Column_7": None, "Column_8": None, "Column_9": None, + "Column_10": None, "Column_11": None, "Column_12": None} + m_current = float(input("motor_current in amps: ")) + await update_nozzle_manager(hw_api, OT3Mount.LEFT, args.nozzles) + instrument = hw_api._pipette_handler.get_pipette(OT3Mount.LEFT) + # pick_up_speed = float(input("pick up tip speed in mm/s: ")) + details = [pipette_model, m_current] + test_n, test_f = file_setup(dial_data, details) + file_name = "/home/root/.opentrons/testing_data/pickup_tip_test/pu_96_pipette_%s-%s.csv" % ( + m_current, + datetime.datetime.now().strftime("%m-%d-%y_%H-%M"), + ) + lp_file_name = '/var/{}-P-{}_Z-{}-{}.csv'.format( pipette_model, + args.plunger_speed, + args.mount_speed, + today.strftime("%b-%d-%Y")) + liquid_probe_settings = LiquidProbeSettings( + # starting_mount_height = 100, + max_z_distance = args.max_z_distance, + min_z_distance = args.min_z_distance, + mount_speed = args.mount_speed, + plunger_speed = args.plunger_speed, + sensor_threshold_pascals = args.sensor_threshold, + expected_liquid_height = args.expected_liquid_height, + log_pressure = args.log_pressure, + aspirate_while_sensing = False, + auto_zero_sensor = False, + num_baseline_reads = 10, + data_file = lp_file_name, + ) + try: + await hw_api.home() + await asyncio.sleep(1) + # await hw_api.home_plunger(mount) + await hw_api.set_lights(rails=True) + plunger_pos = get_plunger_positions_ot3(hw_api, mount) + print(plunger_pos) + home_position = await hw_api.current_position_ot3(mount) + start_time = time.perf_counter() + m_current = float(input("motor_current in amps: ")) + pick_up_speed = float(input("pick up tip speed in mm/s: ")) + hw_api.clamp_tip_speed = float(input("clamp pick up Speed: ")) + pick_up_distance = float(input("pick up distance in mm: ")) + await update_pick_up_current(hw_api, mount, m_current) + await update_pick_up_speed(hw_api, mount, pick_up_speed) + await update_pick_up_distance(hw_api, mount, pick_up_distance) + # if (args.calibrate): + # cp = CriticalPoint.NOZZLE + # home_w_tip = await hw_api.current_position_ot3(mount, cp) + # initial_dial_loc = Point( + # deck_slot['deck_slot'][args.dial_slot]['X'], + # deck_slot['deck_slot'][args.dial_slot]['Y'], + # home_w_tip[Axis.by_mount(mount)] + # ) + # print("Move Nozzle to Dial Indicator") + # await move_to_point(hw_api, mount, initial_dial_loc, cp) + # current_position = await hw_api.current_position_ot3(mount, cp) + # nozzle_loc = await jog(hw_api, current_position, cp) + # number_of_channels = 96 + # nozzle_count = 0 + # x_offset = 0 + # y_offset = 0 + # measurements = [] + # num_of_columns = 12 + # for tip in range(1, number_of_channels + 1): + # cp = CriticalPoint.NOZZLE + # nozzle_count += 1 + # nozzle_position = Point(nozzle_loc[Axis.X] + x_offset, + # nozzle_loc[Axis.Y] + y_offset, + # nozzle_loc[Axis.by_mount(mount)]) + # await move_to_point(hw_api, mount, nozzle_position, cp) + # await asyncio.sleep(1) + # nozzle_measurement = gauge.read() + # print("nozzle-",nozzle_count, "(mm): " , nozzle_measurement, end="") + # print("\r", end="") + # measurements.append(nozzle_measurement) + # if nozzle_count % num_of_columns == 0: + # d_str = '' + # for m in measurements: + # d_str += str(m) + ',' + # d_str = d_str[:-1] + '\n' + # print(f"{d_str}") + # data.append_data_to_file(test_n, test_f, d_str) + # # Reset Measurements list + # measurements = [] + # print("\r\n") + # x_offset -= 9 + # if nozzle_count % num_of_columns == 0: + # y_offset += 9 + # if nozzle_count % num_of_columns == 0: + # x_offset = 0 + # Calibrate to tiprack + if (args.calibrate): + pickup_loc, droptip_loc = await calibrate_tiprack(hw_api, home_position, mount) + print(pickup_loc) + deck_slot['deck_slot'][args.tiprack_slot][Axis.X.name] = pickup_loc.x + deck_slot['deck_slot'][args.tiprack_slot][Axis.Y.name] = pickup_loc.y + deck_slot['deck_slot'][args.tiprack_slot]['Z'] = pickup_loc.z + save_config_(path+cal_fn, deck_slot) + + await hw_api.home_z(mount) + cp = CriticalPoint.TIP + home_w_tip = await hw_api.current_position_ot3(mount, cp) + # Calibrate Dial Indicator with single tip + if (args.calibrate): + # cp = CriticalPoint.TIP + initial_dial_loc = Point( + deck_slot['deck_slot'][args.dial_slot]['X'], + deck_slot['deck_slot'][args.dial_slot]['Y'], + home_w_tip[Axis.by_mount(mount)]) + print("Move to Dial Indicator") + await move_to_point(hw_api, mount, initial_dial_loc, cp) + current_position = await hw_api.current_position_ot3(mount, cp) + dial_loc = await jog(hw_api, current_position, cp) + dial_loc = Point(dial_loc[Axis.X], + dial_loc[Axis.Y], + dial_loc[Axis.by_mount(mount)]) + deck_slot['deck_slot'][args.dial_slot][Axis.X.name] = dial_loc.x + deck_slot['deck_slot'][args.dial_slot][Axis.Y.name] = dial_loc.y + deck_slot['deck_slot'][args.dial_slot]['Z'] = dial_loc.z + save_config_(path+cal_fn, deck_slot) + if (args.trough): + cp = CriticalPoint.TIP + trough_loc = Point(deck_slot['deck_slot'][args.trough_slot]['X'], + deck_slot['deck_slot'][args.trough_slot]['Y'], + home_w_tip[Axis.by_mount(mount)]) + print("Move to Trough") + await move_to_point(hw_api, mount, trough_loc, cp) + current_position = await hw_api.current_position_ot3(mount, cp) + trough_loc = await jog(hw_api, current_position, cp) + trough_loc = Point(trough_loc[Axis.X], + trough_loc[Axis.Y], + trough_loc[Axis.by_mount(mount)]) + deck_slot['deck_slot'][args.trough_slot][Axis.X.name] = dial_loc.x + deck_slot['deck_slot'][args.trough_slot][Axis.Y.name] = dial_loc.y + deck_slot['deck_slot'][args.trough_slot]['Z'] = dial_loc.z + save_config_(path+cal_fn, deck_slot) + + num_of_columns = int(input("How many Columns: ")) + num_of_rows = int(input("Number of Rows: ")) + tips_to_use = (num_of_rows * num_of_columns) + # tips_to_use = (num_of_columns * 8) + while True: + measurements = [] + tip_count = 0 + x_offset = 0 + y_offset = 0 + cp = CriticalPoint.TIP + if args.dial_indicator: + for tip in range(1, tips_to_use + 1): + cp = CriticalPoint.TIP + tip_count += 1 + tip_position = Point(dial_loc[0] + x_offset, + dial_loc[1] + y_offset, + dial_loc[2]) + await move_to_point(hw_api, mount, tip_position, cp) + await asyncio.sleep(1) + tip_measurement = gauge.read() + print("tip-",tip_count, "(mm): " ,tip_measurement, end="") + print("\r", end="") + measurements.append(tip_measurement) + if tip_count % num_of_columns == 0: + d_str = '' + for m in measurements: + d_str += str(m) + ',' + d_str = d_str[:-1] + '\n' + print(f"{d_str}") + data.append_data_to_file(test_n, test_f, d_str) + # Reset Measurements list + measurements = [] + print("\r\n") + x_offset -= 9 + # if tip_count % num_of_column == 0: + if tip_count % num_of_columns == 0: + y_offset += 9 + if tip_count % num_of_columns == 0: + x_offset = 0 + + if args.trough: + await hw_api.prepare_for_aspirate(mount) + await move_to_point(hw_api, mount, trough_loc, cp) + await hw_api.aspirate(mount, test_volume) + await hw_api.home_z(mount) + await countdown(leak_test_time) + await move_to_point(hw_api, mount, trough_loc, cp) + await hw_api.dispense(mount) + # await hw_api.home_z(mount) + # hw_api.clamp_drop_tip_speed = float(input("Drop tip speed: ")) + # await update_drop_tip_speed(hw_api, mount, hw_api.clamp_drop_tip_speed ) + cp = CriticalPoint.TIP + await move_to_point(hw_api, mount, droptip_loc, cp) + input("Feel the Tip!") + await hw_api.drop_tip(mount) + await hw_api.home_z(mount) + + m_current = float(input("motor_current in amps: ")) + pick_up_speed = float(input("prep pick up tip speed in mm/s: ")) + # Pick up distance i originally used was 16.5 + pick_up_distance = float(input("pick up distance in mm: ")) + hw_api.clamp_tip_speed = float(input("clamp pick up Speed: ")) + num_of_columns = int(input("How many Columns: ")) + num_of_rows = int(input("Number of Rows: ")) + tips_to_use = (num_of_rows * num_of_columns) + # tips_to_use = num_of_columns * 8 + await update_pick_up_current(hw_api, mount, m_current) + await update_pick_up_speed(hw_api, mount, pick_up_speed) + await update_pick_up_distance(hw_api, mount, pick_up_distance) + cp = CriticalPoint.NOZZLE + if args.columns: + column = float(input("How many Columns to Move: ")) + column = column*9 + pickup_loc = Point(pickup_loc[0] - column, + pickup_loc[1], + pickup_loc[2]) + else: + row = float(input("How many Row to Move: ")) + row = row*9 + pickup_loc = Point(pickup_loc[0], + pickup_loc[1] + row, + pickup_loc[2]) + await move_to_point(hw_api, mount, pickup_loc, cp) + await hw_api.pick_up_tip(mount, + tip_length=tip_length[args.tip_size], + presses = 1, + increment = 0) + await hw_api.home_z(mount.LEFT) + cp = CriticalPoint.TIP + current_position = await hw_api.current_position_ot3(mount, cp) + this_position = await jog(hw_api, current_position, cp) + input("Press Enter to continue") + + except KeyboardInterrupt: + await hw_api.disengage_axes([Axis.X, Axis.Y]) + finally: + await hw_api.disengage_axes([Axis.X, Axis.Y]) + await hw_api.clean_up() + + +if __name__ == "__main__": + slot_locs = [ + "A1", + "A2", + "A3", + "B1", + "B2", + "B3:", + "C1", + "C2", + "C3", + "D1", + "D2", + "D3", + ] + parser = argparse.ArgumentParser() + parser.add_argument("--simulate", action="store_true") + parser.add_argument("--trough", action="store_true") + parser.add_argument("--tiprack", action="store_true") + parser.add_argument("--mount", type=str, choices=["left", "right"], default="left") + parser.add_argument("--tiprack_slot", type=str, choices=slot_locs, default="B2") + parser.add_argument("--dial_slot", type=str, choices=slot_locs, default="C2") + parser.add_argument("--trough_slot", type=str, choices=slot_locs, default="B3") + parser.add_argument("--dial_indicator", action="store_true") + parser.add_argument("--calibrate", action="store_true") + parser.add_argument("--columns", action="store_true") + parser.add_argument("--tip_size", type=str, default="T1K", help="Tip Size") + parser.add_argument("--max_z_distance", type=float, default=40) + parser.add_argument("--nozzles", type=int, default=96) + parser.add_argument("--min_z_distance", type=float, default=5) + parser.add_argument("--mount_speed", type=float, default=5) + parser.add_argument("--plunger_speed", type=float, default=10) + parser.add_argument( + "--sensor_threshold", type=float, default=100, help="Threshold in Pascals" + ) + parser.add_argument("--expected_liquid_height", type=int, default=0) + parser.add_argument("--log_pressure", action="store_true") + parser.add_argument( + "--dial_port", type=str, default="/dev/ttyUSB0", help="Dial indicator Port" + ) + args = parser.parse_args() + path = '/data/testing_data/' + cal_fn = 'calibrations.json' + if args.calibrate: + with open(path + cal_fn, 'r') as openfile: + deck_slot = json.load(openfile) + print(deck_slot) + else: + with open(path + cal_fn, 'r') as openfile: + deck_slot = json.load(openfile) + tip_length = {"T1K": 95.7, "T200": 58.35, "T50": 57.9} + if args.mount == "left": + mount = OT3Mount.LEFT + else: + mount = OT3Mount.RIGHT + + if args.dial_indicator: + gauge = dial_indicator_setup(port=args.dial_port) + asyncio.run(_main())