Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Allow recovering from errors that happen during the preparation part of an aspirate command #16896

Merged
merged 14 commits into from
Nov 25, 2024
Merged
112 changes: 72 additions & 40 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from typing_extensions import Literal

from .pipetting_common import (
ErrorLocationInfo,
OverpressureError,
PipetteIdMixin,
AspirateVolumeMixin,
FlowRateMixin,
BaseLiquidHandlingResult,
aspirate_in_place,
prepare_for_aspirate,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
Expand Down Expand Up @@ -94,6 +96,17 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName
well_location = params.wellLocation

state_update = StateUpdate()

final_location = self._state_view.geometry.get_well_position(
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
operation_volume=-params.volume,
pipette_id=pipette_id,
)

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=pipette_id
Expand All @@ -102,14 +115,34 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
current_well = None

if not ready_to_aspirate:
await self._movement.move_to_well(
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(move_result.public, state_update=state_update)

# TODO: Figure out what to do for state_update_if_false_positive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo passthrough and union with previous successes

Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've settled on ignoring state_update_if_false_positive until/unless this command actually uses it for something, in which case the right thing to do will hopefully be more obvious.

The reason I'm not setting up a pattern of "passthrough and union with previous successes" (yet) is just uncertainty on my part. Like, it kind of smells to me like an Implementation.execute() concern that our inner helper functions aren't in a good position to anticipate. And I wouldn't want to go out of our way to set up a pattern that does nothing now and starts doing the wrong thing later on.


await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id)
prepare_result = await prepare_for_aspirate(
pipette_id=pipette_id,
pipetting=self._pipetting,
model_utils=self._model_utils,
# Note that the retryLocation is the final location, inside the liquid,
# because that's where we'd want the client to try re-aspirating if this
# command fails and the run enters error recovery.
location_if_error={"retryLocation": final_location},
)
state_update.append(prepare_result.state_update)
if isinstance(prepare_result, DefinedErrorData):
return DefinedErrorData(
public=prepare_result.public, state_update=state_update
)

# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
Expand All @@ -125,12 +158,15 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
well_location=well_location,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return move_result
return DefinedErrorData(
public=move_result.public, state_update=state_update
)

aspirate_result = await aspirate_in_place(
pipette_id=pipette_id,
Expand All @@ -147,46 +183,42 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipetting=self._pipetting,
model_utils=self._model_utils,
)
state_update.append(aspirate_result.state_update)
if isinstance(aspirate_result, DefinedErrorData):
return DefinedErrorData(
public=aspirate_result.public,
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=StateUpdate.reduce(
move_result.state_update,
aspirate_result.state_update_if_false_positive,
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
else:
return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
),
return DefinedErrorData(
public=aspirate_result.public, state_update=state_update
)
# TODO: state_update_if_false_positive?

state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
)

return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=state_update,
)


class Aspirate(
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def get_is_ready_to_aspirate(self, pipette_id: str) -> bool:
)

async def prepare_for_aspirate(self, pipette_id: str) -> None:
"""Prepare for pipette aspiration."""
"""Prepare for pipette aspiration.

Raises:
PipetteOverpressureError, propagated as-is from the hardware controller.
"""
hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount()
await self._hardware_api.prepare_for_aspirate(mount=hw_mount)

Expand Down
13 changes: 13 additions & 0 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,19 @@ class StateUpdate:

liquid_class_loaded: LiquidClassLoadedUpdate | NoChangeType = NO_CHANGE

def append(self, other: Self) -> Self:
"""Apply another `StateUpdate` "on top of" this one.

This object is mutated in-place, taking values from `other`.
If an attribute in `other` is `NO_CHANGE`, the value in this object is kept.
"""
fields = dataclasses.fields(other)
for field in fields:
other_value = other.__dict__[field.name]
if other_value != NO_CHANGE:
self.__dict__[field.name] = other_value
return self

Comment on lines +302 to +314
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is append() a good name for this? Is there a naming scheme that harmonizes better with reduce()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could make this reduce(), continue to take N arguments, and have self as a silent first argument. that is probably the most natural.

@classmethod
def reduce(cls: typing.Type[Self], *args: Self) -> Self:
"""Fuse multiple state updates into a single one.
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
11 changes: 0 additions & 11 deletions api/tests/opentrons/protocol_engine/commands/test_aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def subject(
async def test_aspirate_implementation_no_prep(
decoy: Decoy,
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
pipetting: PipettingHandler,
subject: AspirateImplementation,
Expand Down Expand Up @@ -151,7 +150,6 @@ async def test_aspirate_implementation_no_prep(
async def test_aspirate_implementation_with_prep(
decoy: Decoy,
state_view: StateView,
hardware_api: HardwareControlAPI,
movement: MovementHandler,
pipetting: PipettingHandler,
mock_command_note_adder: CommandNoteAdder,
Expand Down Expand Up @@ -416,15 +414,6 @@ async def test_overpressure_error(
pipette_id=pipette_id
),
),
state_update_if_false_positive=update_types.StateUpdate(
pipette_location=update_types.PipetteLocationUpdate(
pipette_id=pipette_id,
new_location=update_types.Well(
labware_id=labware_id, well_name=well_name
),
new_deck_point=DeckPoint(x=position.x, y=position.y, z=position.z),
),
),
)


Expand Down
Loading