Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Allow recovering from errors that happen during the preparation part of an aspirate command #16896

Merged
merged 14 commits into from
Nov 25, 2024
Merged
108 changes: 68 additions & 40 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
FlowRateMixin,
BaseLiquidHandlingResult,
aspirate_in_place,
prepare_for_aspirate,
)
from .movement_common import (
LiquidHandlingWellLocationMixin,
Expand Down Expand Up @@ -94,6 +95,17 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id = params.pipetteId
labware_id = params.labwareId
well_name = params.wellName
well_location = params.wellLocation

state_update = StateUpdate()

final_location = self._state_view.geometry.get_well_position(
labware_id=labware_id,
well_name=well_name,
well_location=well_location,
operation_volume=-params.volume,
pipette_id=pipette_id,
)

ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate(
pipette_id=pipette_id
Expand All @@ -102,14 +114,32 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
current_well = None

if not ready_to_aspirate:
await self._movement.move_to_well(
move_result = await move_to_well(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=WellLocation(origin=WellOrigin.TOP),
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return DefinedErrorData(move_result.public, state_update=state_update)

await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id)
prepare_result = await prepare_for_aspirate(
pipette_id=pipette_id,
pipetting=self._pipetting,
model_utils=self._model_utils,
# Note that the retryLocation is the final location, inside the liquid,
# because that's where we'd want the client to try re-aspirating if this
# command fails and the run enters error recovery.
location_if_error={"retryLocation": final_location},
)
state_update.append(prepare_result.state_update)
if isinstance(prepare_result, DefinedErrorData):
return DefinedErrorData(
public=prepare_result.public, state_update=state_update
)

# set our current deck location to the well now that we've made
# an intermediate move for the "prepare for aspirate" step
Expand All @@ -125,12 +155,15 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipette_id=pipette_id,
labware_id=labware_id,
well_name=well_name,
well_location=params.wellLocation,
well_location=well_location,
current_well=current_well,
operation_volume=-params.volume,
)
state_update.append(move_result.state_update)
if isinstance(move_result, DefinedErrorData):
return move_result
return DefinedErrorData(
public=move_result.public, state_update=state_update
)

aspirate_result = await aspirate_in_place(
pipette_id=pipette_id,
Expand All @@ -147,47 +180,42 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
pipetting=self._pipetting,
model_utils=self._model_utils,
)
state_update.append(aspirate_result.state_update)
if isinstance(aspirate_result, DefinedErrorData):
return DefinedErrorData(
public=aspirate_result.public,
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
),
state_update_if_false_positive=StateUpdate.reduce(
move_result.state_update,
aspirate_result.state_update_if_false_positive,
state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id,
well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
else:
return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=StateUpdate.reduce(
move_result.state_update, aspirate_result.state_update
).set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
),
return DefinedErrorData(
public=aspirate_result.public, state_update=state_update
)

state_update.set_liquid_operated(
labware_id=labware_id,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well(
labware_id, well_name, pipette_id
),
volume_added=-aspirate_result.public.volume
* self._state_view.geometry.get_nozzles_per_well(
labware_id,
well_name,
params.pipetteId,
),
)

return SuccessData(
public=AspirateResult(
volume=aspirate_result.public.volume,
position=move_result.public.position,
),
state_update=state_update,
)


class Aspirate(
BaseCommand[
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/pipetting.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def get_is_ready_to_aspirate(self, pipette_id: str) -> bool:
)

async def prepare_for_aspirate(self, pipette_id: str) -> None:
"""Prepare for pipette aspiration."""
"""Prepare for pipette aspiration.

Raises:
PipetteOverpressureError, propagated as-is from the hardware controller.
"""
hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount()
await self._hardware_api.prepare_for_aspirate(mount=hw_mount)

Expand Down
30 changes: 17 additions & 13 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,26 +299,30 @@ class StateUpdate:

liquid_class_loaded: LiquidClassLoadedUpdate | NoChangeType = NO_CHANGE

def append(self, other: Self) -> Self:
"""Apply another `StateUpdate` "on top of" this one.

This object is mutated in-place, taking values from `other`.
If an attribute in `other` is `NO_CHANGE`, the value in this object is kept.
"""
fields = dataclasses.fields(other)
for field in fields:
other_value = other.__dict__[field.name]
if other_value != NO_CHANGE:
self.__dict__[field.name] = other_value
return self

Comment on lines +302 to +314
Copy link
Contributor Author

@SyntaxColoring SyntaxColoring Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is append() a good name for this? Is there a naming scheme that harmonizes better with reduce()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could make this reduce(), continue to take N arguments, and have self as a silent first argument. that is probably the most natural.

@classmethod
def reduce(cls: typing.Type[Self], *args: Self) -> Self:
"""Fuse multiple state updates into a single one.
SyntaxColoring marked this conversation as resolved.
Show resolved Hide resolved

State updates that are later in the parameter list are preferred to those that are earlier;
NO_CHANGE is ignored.
"""
fields = dataclasses.fields(cls)
changes_dicts = [
{
field.name: update.__dict__[field.name]
for field in fields
if update.__dict__[field.name] != NO_CHANGE
}
for update in args
]
changes = {}
for changes_dict in changes_dicts:
changes.update(changes_dict)
return cls(**changes)
accumulator = cls()
for arg in args:
accumulator.append(arg)
return accumulator

# These convenience functions let the caller avoid the boilerplate of constructing a
# complicated dataclass tree.
Expand Down
Loading
Loading