Skip to content

Commit

Permalink
refactor(hardware control): start liquid probe 2mm higher, refactor z…
Browse files Browse the repository at this point in the history
… moves (#15564)

Refactors liquid_probe- z distances for each pass are now calculated using appropriate plunger distance for the given pipette model/tip combination
  • Loading branch information
caila-marashaj authored Jul 8, 2024
1 parent e85a47a commit 3479875
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 197 deletions.
12 changes: 6 additions & 6 deletions api/src/opentrons/config/defaults_ot3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
DEFAULT_MODULE_OFFSET = [0.0, 0.0, 0.0]

DEFAULT_LIQUID_PROBE_SETTINGS: Final[LiquidProbeSettings] = LiquidProbeSettings(
starting_mount_height=100,
mount_speed=10,
plunger_speed=5,
sensor_threshold_pascals=40,
output_option=OutputOptions.stream_to_csv,
plunger_impulse_time=0.2,
sensor_threshold_pascals=15,
output_option=OutputOptions.sync_buffer_to_csv,
aspirate_while_sensing=False,
data_files={InstrumentProbeType.PRIMARY: "/data/pressure_sensor_data.csv"},
)
Expand Down Expand Up @@ -331,11 +331,11 @@ def _build_default_liquid_probe(
from_conf.get("data_files", {}), default.data_files
)
return LiquidProbeSettings(
starting_mount_height=from_conf.get(
"starting_mount_height", default.starting_mount_height
),
mount_speed=from_conf.get("mount_speed", default.mount_speed),
plunger_speed=from_conf.get("plunger_speed", default.plunger_speed),
plunger_impulse_time=from_conf.get(
"plunger_impulse_time", default.plunger_impulse_time
),
sensor_threshold_pascals=from_conf.get(
"sensor_threshold_pascals", default.sensor_threshold_pascals
),
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/config/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class ZSenseSettings:

@dataclass
class LiquidProbeSettings:
starting_mount_height: float
mount_speed: float
plunger_speed: float
plunger_impulse_time: float
sensor_threshold_pascals: float
output_option: OutputOptions
aspirate_while_sensing: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def update_encoder_position(self) -> OT3AxisMap[float]:
async def liquid_probe(
self,
mount: OT3Mount,
max_z_distance: float,
max_p_distance: float,
mount_speed: float,
plunger_speed: float,
threshold_pascals: float,
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ def _pop_queue() -> Optional[Tuple[NodeId, ErrorCode]]:
async def liquid_probe(
self,
mount: OT3Mount,
max_z_distance: float,
max_p_distance: float,
mount_speed: float,
plunger_speed: float,
threshold_pascals: float,
Expand Down Expand Up @@ -1395,7 +1395,7 @@ async def liquid_probe(
messenger=self._messenger,
tool=tool,
head_node=head_node,
max_z_distance=max_z_distance,
max_p_distance=max_p_distance,
plunger_speed=plunger_speed,
mount_speed=mount_speed,
threshold_pascals=threshold_pascals,
Expand Down
3 changes: 1 addition & 2 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async def update_encoder_position(self) -> OT3AxisMap[float]:
async def liquid_probe(
self,
mount: OT3Mount,
max_z_distance: float,
max_p_distance: float,
mount_speed: float,
plunger_speed: float,
threshold_pascals: float,
Expand All @@ -352,7 +352,6 @@ async def liquid_probe(
) -> float:
z_axis = Axis.by_mount(mount)
pos = self._position
pos[z_axis] += max_z_distance
self._position.update(pos)
self._encoder_position.update(pos)
return self._position[z_axis]
Expand Down
90 changes: 44 additions & 46 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,13 +2573,13 @@ async def _liquid_probe_pass(
mount: OT3Mount,
probe_settings: LiquidProbeSettings,
probe: InstrumentProbeType,
z_distance: float,
p_travel: float,
force_both_sensors: bool = False,
) -> float:
plunger_direction = -1 if probe_settings.aspirate_while_sensing else 1
await self._backend.liquid_probe(
mount,
z_distance,
p_travel,
probe_settings.mount_speed,
(probe_settings.plunger_speed * plunger_direction),
probe_settings.sensor_threshold_pascals,
Expand All @@ -2591,18 +2591,6 @@ async def _liquid_probe_pass(
end_pos = await self.gantry_position(mount, refresh=True)
return end_pos.z

def _get_probe_distances(
self, mount: OT3Mount, max_z_distance: float, p_speed: float, z_speed: float
) -> List[float]:
z_travels: List[float] = []
plunger_positions = self._pipette_handler.get_pipette(mount).plunger_positions
plunger_travel = plunger_positions.bottom - plunger_positions.top
p_travel_time = plunger_travel / p_speed
while max_z_distance > sum(z_travels):
next_travel = min(p_travel_time * z_speed, max_z_distance - sum(z_travels))
z_travels.append(next_travel)
return z_travels

async def liquid_probe(
self,
mount: Union[top_types.Mount, OT3Mount],
Expand All @@ -2613,8 +2601,9 @@ async def liquid_probe(
) -> float:
"""Search for and return liquid level height.
This function begins by moving the mount the distance specified by starting_mount_height in the
LiquidProbeSettings. After this, the mount and plunger motors will move simultaneously while
This function begins by moving the mount 2 mm upward to protect against a case where the tip starts right at a
liquid meniscus.
After this, the mount and plunger motors will move simultaneously while
reading from the pressure sensor.
If the move is completed without the specified threshold being triggered, a
Expand All @@ -2635,56 +2624,65 @@ async def liquid_probe(
if not probe_settings:
probe_settings = self.config.liquid_sense

pos = await self.gantry_position(checked_mount, refresh=True)
probe_start_pos = pos._replace(z=probe_settings.starting_mount_height)
await self.move_to(checked_mount, probe_start_pos)
total_z_travel = max_z_dist
z_travels = self._get_probe_distances(
checked_mount,
total_z_travel,
probe_settings.plunger_speed,
probe_settings.mount_speed,
probe_start_pos = await self.gantry_position(checked_mount, refresh=True)

p_travel = (
instrument.plunger_positions.bottom - instrument.plunger_positions.top
)
error: Optional[PipetteLiquidNotFoundError] = None
for z_travel in z_travels:
max_speeds = self.config.motion_settings.default_max_speed
p_prep_speed = max_speeds[self.gantry_load][OT3AxisKind.P]

error: Optional[PipetteLiquidNotFoundError] = None
pos = await self.gantry_position(checked_mount, refresh=True)
while (probe_start_pos.z - pos.z) < max_z_dist:
# safe distance so we don't accidentally aspirate liquid if we're already close to liquid
safe_plunger_pos = pos._replace(z=(pos.z + 2))
# overlap amount we want to use between passes
pass_start_pos = pos._replace(z=(pos.z + 0.5))

# Prep the plunger
await self.move_to(checked_mount, safe_plunger_pos)
if probe_settings.aspirate_while_sensing:
await self._move_to_plunger_bottom(checked_mount, rate=1.0)
# TODO(cm, 7/8/24): remove p_prep_speed from the rate at some point
await self._move_to_plunger_bottom(checked_mount, rate=p_prep_speed)
else:
# find the ideal travel distance by multiplying the plunger speed
# by the time it will take to complete the z move.
ideal_travel = probe_settings.plunger_speed * (
z_travel / probe_settings.mount_speed
)
assert (
instrument.plunger_positions.bottom - ideal_travel
>= instrument.plunger_positions.top
)
target_point = instrument.plunger_positions.bottom - ideal_travel
target_pos = target_position_from_plunger(
checked_mount, target_point, self._current_position
)
max_speeds = self.config.motion_settings.default_max_speed
speed = max_speeds[self.gantry_load][OT3AxisKind.P]
await self._move(target_pos, speed=speed, acquire_lock=True)
await self._move_to_plunger_top(checked_mount, rate=p_prep_speed)

try:
# move to where we want to start a pass and run a pass
await self.move_to(checked_mount, pass_start_pos)
height = await self._liquid_probe_pass(
checked_mount,
probe_settings,
probe if probe else InstrumentProbeType.PRIMARY,
z_travel,
p_travel,
)
# if we made it here without an error we found the liquid
error = None
break
except PipetteLiquidNotFoundError as lnfe:
error = lnfe
pos = await self.gantry_position(checked_mount, refresh=True)
await self.move_to(checked_mount, probe_start_pos)
if error is not None:
# if we never found an liquid raise an error
# if we never found liquid raise an error
raise error
return height

async def _move_to_plunger_top(
self,
mount: OT3Mount,
rate: float,
acquire_lock: bool = True,
) -> None:
instrument = self._pipette_handler.get_pipette(mount)
target_pos = target_position_from_plunger(
OT3Mount.from_mount(mount),
instrument.plunger_positions.top,
self._current_position,
)
await self._move(target_pos, speed=rate, acquire_lock=acquire_lock)

async def capacitive_probe(
self,
mount: OT3Mount,
Expand Down
2 changes: 1 addition & 1 deletion api/tests/opentrons/config/ot3_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
"left_mount_offset": (2, 2, 2),
"gripper_mount_offset": (1, 1, 1),
"liquid_sense": {
"starting_mount_height": 80,
"mount_speed": 10,
"plunger_speed": 10,
"plunger_impulse_time": 0.2,
"sensor_threshold_pascals": 17,
"output_option": OutputOptions.stream_to_csv,
"aspirate_while_sensing": False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ def controller(
@pytest.fixture
def fake_liquid_settings() -> LiquidProbeSettings:
return LiquidProbeSettings(
starting_mount_height=100,
mount_speed=40,
plunger_speed=10,
plunger_impulse_time=0.2,
sensor_threshold_pascals=15,
output_option=OutputOptions.can_bus_only,
aspirate_while_sensing=False,
Expand Down Expand Up @@ -711,11 +711,11 @@ async def test_liquid_probe(
mock_move_group_run: mock.AsyncMock,
mock_send_stop_threshold: mock.AsyncMock,
) -> None:
fake_max_z_dist = 15.0
fake_max_p_dist = 70
try:
await controller.liquid_probe(
mount=mount,
max_z_distance=fake_max_z_dist,
max_p_distance=fake_max_p_dist,
mount_speed=fake_liquid_settings.mount_speed,
plunger_speed=fake_liquid_settings.plunger_speed,
threshold_pascals=fake_liquid_settings.sensor_threshold_pascals,
Expand All @@ -728,11 +728,12 @@ async def test_liquid_probe(
move_groups = mock_move_group_run.call_args_list[0][0][0]._move_groups
head_node = axis_to_node(Axis.by_mount(mount))
tool_node = sensor_node_for_mount(mount)
assert move_groups[0][0][head_node].stop_condition == MoveStopCondition.none
assert len(move_groups) == 3
assert move_groups[0][0][head_node]
assert move_groups[1][0][tool_node]
assert move_groups[2][0][head_node], move_groups[2][0][tool_node]
# in tool_sensors, pipette moves down, then sensor move goes
assert move_groups[0][0][tool_node].stop_condition == MoveStopCondition.none
assert move_groups[1][0][tool_node].stop_condition == MoveStopCondition.sync_line
assert len(move_groups) == 2
assert move_groups[0][0][tool_node]
assert move_groups[1][0][head_node], move_groups[2][0][tool_node]


async def test_tip_action(
Expand Down
Loading

0 comments on commit 3479875

Please sign in to comment.