Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanthecoder committed Nov 15, 2024
1 parent 186ab43 commit 2d1ebf2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


class SerialDriver:

@classmethod
def get_com_list(cls):
port_list = serial.tools.list_ports.comports()
Expand Down Expand Up @@ -38,8 +37,14 @@ def init_serial(self, baud):
:param baud:
:return:
"""
self.com = serial.Serial(self.device, baud, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS, timeout=1)
self.com = serial.Serial(
self.device,
baud,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1,
)
if self.com.isOpen():
print(f"{self.device} Opened! \n")
# settings
Expand All @@ -66,15 +71,17 @@ def init(self, baud):
except:
print("Can't find device")

def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, delay=None, times=30):
def write_and_get_buffer(
self, send: Union[str, int, bytes], only_write=False, delay=None, times=30
):
"""
send cmd
:return:
"""
if self.com is None:
return
if type(send) is not bytes:
send = (send + "\r\n").encode('utf-8')
send = (send + "\r\n").encode("utf-8")
self.com.flushInput()
self.com.flushOutput()
self.com.write(send)
Expand All @@ -88,12 +95,12 @@ def write_and_get_buffer(self, send: Union[str, int, bytes], only_write=False, d
for i in range(times):
data = self.com.read(ReceiveBuffer)
if type(data) is not bytes:
if "OK" not in data.decode('utf-8') or "busy" in data.decode('utf-8'):
if "OK" not in data.decode("utf-8") or "busy" in data.decode("utf-8"):
time.sleep(1)
continue
else:
return data
return data.decode('utf-8')
return data.decode("utf-8")

def read_buffer(self):
"""
Expand All @@ -110,28 +117,27 @@ def read_buffer(self):
data = self.com.read(length)
self.com.flushInput()
self.com.flushOutput()
return data.decode('utf-8')
return data.decode("utf-8")

def get_pressure(self):
"""
analyze pressure value
"""
for _i in range(5):
try:
respond = self.read_buffer()
respond_list = respond.split('|')
respond_list = respond.split("|")
respond_value = respond_list[1]
average_value = respond_value.split('\r\n')[0].split('\t')[1].strip()

average_value = respond_value.split("\r\n")[0].split("\t")[1].strip()
average_value = float(average_value)
return average_value
except:
print(f"get pressure fail at {_i} times")
pass



if __name__ == '__main__':
if __name__ == "__main__":
s = SerialDriver()
s.init(9600)
for i in range(100):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ def get_pipette_serial_ot3(pipette: Union[PipetteOT2, PipetteOT3]) -> str:
elif "96" in model:
channels = "H"
else:
channels = "M"
channels = "M"
version = model.split("v")[-1].strip().replace(".", "")
assert pipette.pipette_id, f"no pipette_id found for pipette: {pipette}"
if "P" in pipette.pipette_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,20 +205,24 @@ async def _find_reservoir_pos() -> None:
if not api.is_simulator:
answer = ui.get_user_answer(f"Test {test_volume}uL")
else:
answer = pipette
answer = True
if not answer:
continue
tip_volume = 50 if test_volume<=50 else pipette
tip_volume = 50 if test_volume <= 50 else pipette
# PICK-UP 96 TIPS
ui.print_header("JOG to 96-Tip RACK")
if not api.is_simulator:
ui.get_user_ready(f"picking up tips, place tip-rack {tip_volume} on slot {TIP_RACK_96_SLOT}")
ui.get_user_ready(
f"picking up tips, place tip-rack {tip_volume} on slot {TIP_RACK_96_SLOT}"
)
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, tip_rack_96_a1_nominal + Point(z=30)
)
await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT)

await api.pick_up_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(tip_volume))
await api.pick_up_tip(
OT3Mount.LEFT, helpers_ot3.get_default_tip_length(tip_volume)
)
await api.home_z(OT3Mount.LEFT)
if not api.is_simulator:
ui.get_user_ready("about to move to RESERVOIR")
Expand All @@ -229,7 +233,7 @@ async def _find_reservoir_pos() -> None:
ret, duration = await aspirate_and_wait(
api, reservoir_a1_actual, test_volume, seconds=NUM_SECONDS_TO_WAIT
)
result = result&ret
result = result & ret
await api.home_z(OT3Mount.LEFT)
await _drop_tip(api, trash_nominal)
report(section, "droplets-96-tips", [duration, CSVResult.from_bool(result)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
def build_csv_lines() -> List[Union[CSVLine, CSVLineRepeating]]:
"""Build CSV Lines."""
return [
CSVLine(f"environment-{sensor_id.name}-celsius-humidity", [float, float, CSVResult])
CSVLine(
f"environment-{sensor_id.name}-celsius-humidity", [float, float, CSVResult]
)
for sensor_id in [SensorId.S0, SensorId.S1]
]

Expand Down Expand Up @@ -66,10 +68,15 @@ async def run(
humidity = _remove_outliers_and_average(humidity_samples)
print(f"[{sensor_id.name}] Celsius = {round(celsius, 2)} degrees")
print(f"[{sensor_id.name}] Humidity = {round(humidity, 2)} percent")
air_params = air_params & True if TEMPERATURE_THRESHOLD[0] <= celsius <= TEMPERATURE_THRESHOLD[1] and HUMIDITY_THRESHOLD[0] <= humidity <= HUMIDITY_THRESHOLD[1] else False

air_params = (
air_params & True
if TEMPERATURE_THRESHOLD[0] <= celsius <= TEMPERATURE_THRESHOLD[1]
and HUMIDITY_THRESHOLD[0] <= humidity <= HUMIDITY_THRESHOLD[1]
else False
)

report(
section,
f"environment-{sensor_id.name}-celsius-humidity",
[celsius, humidity, CSVResult.from_bool(air_params)]
[celsius, humidity, CSVResult.from_bool(air_params)],
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Test Pressure."""
from asyncio import sleep
from hardware_testing.drivers.sealed_pressure_fixture import SerialDriver as SealedPressureDriver
from hardware_testing.drivers.sealed_pressure_fixture import (
SerialDriver as SealedPressureDriver,
)
from hardware_testing.opentrons_api import helpers_ot3
from typing import List, Union, Literal

Expand All @@ -24,9 +26,13 @@

USE_SEALED_FIXTURE = False
USE_SEALED_BLOCK = True
PRIMARY_SEALED_PRESSURE_FIXTURE_POS = Point(362.68, 148.83, 49.4) if USE_SEALED_BLOCK else Point(362.68, 148.83, 44.4) # attached tip
SECOND_SEALED_PRESSURE_FIXTURE_POS = Point(264.71, 212.81, 49.4) if USE_SEALED_BLOCK else Point(264.71, 212.81, 44.4) # attached tip
SET_PRESSURE_TARGET = 100 # read air pressure when the force pressure value is over 100
PRIMARY_SEALED_PRESSURE_FIXTURE_POS = (
Point(362.68, 148.83, 49.4) if USE_SEALED_BLOCK else Point(362.68, 148.83, 44.4)
) # attached tip
SECOND_SEALED_PRESSURE_FIXTURE_POS = (
Point(264.71, 212.81, 49.4) if USE_SEALED_BLOCK else Point(264.71, 212.81, 44.4)
) # attached tip
SET_PRESSURE_TARGET = 100 # read air pressure when the force pressure value is over 100
REACHED_PRESSURE = 0

SECONDS_BETWEEN_READINGS = 0.25
Expand All @@ -38,7 +44,7 @@
SLOT_FOR_PICK_UP_TIP = 5
TIP_RACK_FOR_PICK_UP_TIP = f"opentrons_flex_96_tiprack_{TIP_VOLUME}ul"
A1_OFFSET = Point(x=9 * 11, y=-9 * 7)
H12_OFFSET = Point(x=-9*11, y=9*7)
H12_OFFSET = Point(x=-9 * 11, y=9 * 7)
OFFSET_FOR_1_WELL_LABWARE = Point(x=9 * -11 * 0.5, y=9 * 7 * 0.5)

THRESHOLDS = {
Expand Down Expand Up @@ -113,7 +119,10 @@ def check_value(test_value: float, test_name: str) -> CSVResult:
else:
return CSVResult.FAIL

async def calibrate_to_pressue_fixture(api: OT3API, sensor:SealedPressureDriver, fixture_pos:Point):

async def calibrate_to_pressue_fixture(
api: OT3API, sensor: SealedPressureDriver, fixture_pos: Point
):
"""move to suitable height for readding air pressure"""
global REACHED_PRESSURE
await api.move_to(OT3Mount.LEFT, fixture_pos)
Expand Down Expand Up @@ -156,11 +165,14 @@ async def _partial_pick_up(api: OT3API, position: Point, current: float) -> None
position,
safe_height=position.z + 10,
)
await _partial_pick_up_z_motion(api, current=current, distance=12, speed=3) # change distance and speed, in case collision detected error
await _partial_pick_up_z_motion(
api, current=current, distance=12, speed=3
) # change distance and speed, in case collision detected error
await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
await api.prepare_for_aspirate(OT3Mount.LEFT)
await api.home_z(OT3Mount.LEFT)


async def run(
api: OT3API, report: CSVReport, section: str, pipette: Literal[200, 1000]
) -> None:
Expand All @@ -177,15 +189,19 @@ async def run(
# move to slot
if not api.is_simulator:
ui.get_user_ready(f"Place tip tack 50ul at slot - {SLOT_FOR_PICK_UP_TIP}")
#await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
# await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))

tip_rack_pos = helpers_ot3.get_theoretical_a1_position(SLOT_FOR_PICK_UP_TIP, TIP_RACK_FOR_PICK_UP_TIP)
tip_rack_pos = helpers_ot3.get_theoretical_a1_position(
SLOT_FOR_PICK_UP_TIP, TIP_RACK_FOR_PICK_UP_TIP
)
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, tip_rack_pos + Point(z=30))
await helpers_ot3.jog_mount_ot3(api, OT3Mount.LEFT)
tip_rack_actual_pos = await api.gantry_position(OT3Mount.LEFT)

for probe in InstrumentProbeType:
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, tip_rack_pos + Point(z=50))
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, tip_rack_pos + Point(z=50)
)
sensor_id = sensor_id_for_instrument(probe)
ui.print_header(f"Sensor: {probe}")

Expand Down Expand Up @@ -221,14 +237,20 @@ async def run(
await _partial_pick_up(api, tip_pos, current=0.1)
await api.prepare_for_aspirate(OT3Mount.LEFT)
if not (USE_SEALED_FIXTURE or USE_SEALED_BLOCK):
ui.get_user_ready("SEAL tip using your FINGER")
ui.get_user_ready("SEAL tip using your FINGER")
else:
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50))
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50)
)
ui.get_user_ready("Ready for moving to sealed fixture")
if USE_SEALED_FIXTURE:
await calibrate_to_pressue_fixture(api, pressure_sensor, fixture_pos)
await calibrate_to_pressue_fixture(
api, pressure_sensor, fixture_pos
)
else:
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos)
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, fixture_pos
)

try:
sealed_pa = await _read_from_sensor(
Expand All @@ -238,11 +260,17 @@ async def run(
ui.print_error(f"{probe} pressure sensor not working, skipping")
break
else:
await api.add_tip(OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME))
await api.add_tip(
OT3Mount.LEFT, helpers_ot3.get_default_tip_length(TIP_VOLUME)
)
await api.prepare_for_aspirate(OT3Mount.LEFT)
print(f"sealed-pa: {sealed_pa}")
sealed_result = check_value(sealed_pa, "sealed-pa")
report(section, _get_test_tag(probe, "sealed-pa"), [sealed_pa, sealed_result, REACHED_PRESSURE])
report(
section,
_get_test_tag(probe, "sealed-pa"),
[sealed_pa, sealed_result, REACHED_PRESSURE],
)

# ASPIRATE-Pa
aspirate_pa = 0.0
Expand Down Expand Up @@ -278,17 +306,21 @@ async def run(
section, _get_test_tag(probe, "dispense-pa"), [dispense_pa, dispense_result]
)
if USE_SEALED_FIXTURE or USE_SEALED_BLOCK:
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50))
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, fixture_pos._replace(z=fixture_pos.z + 50)
)
if not api.is_simulator:
ui.get_user_ready("REMOVE tip")

trash_nominal = helpers_ot3.get_slot_calibration_square_position_ot3(12) + Point(z=40)
trash_nominal = helpers_ot3.get_slot_calibration_square_position_ot3(
12
) + Point(z=40)
# center the 96ch of the 1-well labware
trash_nominal += OFFSET_FOR_1_WELL_LABWARE
await helpers_ot3.move_to_arched_ot3(api, OT3Mount.LEFT, trash_nominal + Point(z=20))
await helpers_ot3.move_to_arched_ot3(
api, OT3Mount.LEFT, trash_nominal + Point(z=20)
)
await api.move_to(OT3Mount.LEFT, trash_nominal)
await api.drop_tip(OT3Mount.LEFT)
await api.remove_tip(OT3Mount.LEFT)
await api.home()


Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def _check_if_jaw_is_aligned_with_endstop(api: OT3API) -> bool:
ui.print_error("endstop hit too early")
return pass_no_hit


async def jaw_precheck(api: OT3API, ax: Axis, speed: float) -> Tuple[bool, bool]:
"""Check the LEDs work and jaws are aligned."""
# HOME
Expand All @@ -73,6 +74,7 @@ async def jaw_precheck(api: OT3API, ax: Axis, speed: float) -> Tuple[bool, bool]
ui.print_error("Jaws Misaligned")
return led_check, jaws_aligned


async def _run_test_jaw(api: OT3API):
ax = Axis.Q
settings = helpers_ot3.get_gantry_load_per_axis_motion_settings_ot3(api, ax)
Expand All @@ -85,6 +87,7 @@ async def _save_result(tag: str, led_check: bool, jaws_aligned: bool) -> bool:
else:
no_hit = False
return led_check and jaws_aligned and no_hit

await api.home_z(OT3Mount.LEFT)
slot_5 = helpers_ot3.get_slot_calibration_square_position_ot3(5)
home_pos = await api.gantry_position(OT3Mount.LEFT)
Expand Down Expand Up @@ -122,6 +125,7 @@ async def _save_result(tag: str, led_check: bool, jaws_aligned: bool) -> bool:
print("homing...")
await api.home([ax])


async def _main() -> None:
print("start")
api = await helpers_ot3.build_async_ot3_hardware_api(is_simulating=False)
Expand Down

0 comments on commit 2d1ebf2

Please sign in to comment.