diff --git a/api/src/opentrons/protocol_engine/commands/aspirate.py b/api/src/opentrons/protocol_engine/commands/aspirate.py index 38dbe03c7e0..fa84afbde8c 100644 --- a/api/src/opentrons/protocol_engine/commands/aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/aspirate.py @@ -11,6 +11,7 @@ FlowRateMixin, BaseLiquidHandlingResult, aspirate_in_place, + prepare_for_aspirate, ) from .movement_common import ( LiquidHandlingWellLocationMixin, @@ -94,6 +95,17 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: pipette_id = params.pipetteId labware_id = params.labwareId well_name = params.wellName + well_location = params.wellLocation + + state_update = StateUpdate() + + final_location = self._state_view.geometry.get_well_position( + labware_id=labware_id, + well_name=well_name, + well_location=well_location, + operation_volume=-params.volume, + pipette_id=pipette_id, + ) ready_to_aspirate = self._pipetting.get_is_ready_to_aspirate( pipette_id=pipette_id @@ -102,14 +114,32 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: current_well = None if not ready_to_aspirate: - await self._movement.move_to_well( + move_result = await move_to_well( + movement=self._movement, + model_utils=self._model_utils, pipette_id=pipette_id, labware_id=labware_id, well_name=well_name, well_location=WellLocation(origin=WellOrigin.TOP), ) + state_update.append(move_result.state_update) + if isinstance(move_result, DefinedErrorData): + return DefinedErrorData(move_result.public, state_update=state_update) - await self._pipetting.prepare_for_aspirate(pipette_id=pipette_id) + prepare_result = await prepare_for_aspirate( + pipette_id=pipette_id, + pipetting=self._pipetting, + model_utils=self._model_utils, + # Note that the retryLocation is the final location, inside the liquid, + # because that's where we'd want the client to try re-aspirating if this + # command fails and the run enters error recovery. + location_if_error={"retryLocation": final_location}, + ) + state_update.append(prepare_result.state_update) + if isinstance(prepare_result, DefinedErrorData): + return DefinedErrorData( + public=prepare_result.public, state_update=state_update + ) # set our current deck location to the well now that we've made # an intermediate move for the "prepare for aspirate" step @@ -125,12 +155,15 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: pipette_id=pipette_id, labware_id=labware_id, well_name=well_name, - well_location=params.wellLocation, + well_location=well_location, current_well=current_well, operation_volume=-params.volume, ) + state_update.append(move_result.state_update) if isinstance(move_result, DefinedErrorData): - return move_result + return DefinedErrorData( + public=move_result.public, state_update=state_update + ) aspirate_result = await aspirate_in_place( pipette_id=pipette_id, @@ -147,47 +180,42 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn: pipetting=self._pipetting, model_utils=self._model_utils, ) + state_update.append(aspirate_result.state_update) if isinstance(aspirate_result, DefinedErrorData): - return DefinedErrorData( - public=aspirate_result.public, - state_update=StateUpdate.reduce( - move_result.state_update, aspirate_result.state_update - ).set_liquid_operated( - labware_id=labware_id, - well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( - labware_id, - well_name, - params.pipetteId, - ), - volume_added=CLEAR, - ), - state_update_if_false_positive=StateUpdate.reduce( - move_result.state_update, - aspirate_result.state_update_if_false_positive, + state_update.set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, + well_name, + params.pipetteId, ), + volume_added=CLEAR, ) - else: - return SuccessData( - public=AspirateResult( - volume=aspirate_result.public.volume, - position=move_result.public.position, - ), - state_update=StateUpdate.reduce( - move_result.state_update, aspirate_result.state_update - ).set_liquid_operated( - labware_id=labware_id, - well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( - labware_id, well_name, pipette_id - ), - volume_added=-aspirate_result.public.volume - * self._state_view.geometry.get_nozzles_per_well( - labware_id, - well_name, - params.pipetteId, - ), - ), + return DefinedErrorData( + public=aspirate_result.public, state_update=state_update ) + state_update.set_liquid_operated( + labware_id=labware_id, + well_names=self._state_view.geometry.get_wells_covered_by_pipette_with_active_well( + labware_id, well_name, pipette_id + ), + volume_added=-aspirate_result.public.volume + * self._state_view.geometry.get_nozzles_per_well( + labware_id, + well_name, + params.pipetteId, + ), + ) + + return SuccessData( + public=AspirateResult( + volume=aspirate_result.public.volume, + position=move_result.public.position, + ), + state_update=state_update, + ) + class Aspirate( BaseCommand[ diff --git a/api/src/opentrons/protocol_engine/execution/pipetting.py b/api/src/opentrons/protocol_engine/execution/pipetting.py index 2964f02d183..10d613e4dcf 100644 --- a/api/src/opentrons/protocol_engine/execution/pipetting.py +++ b/api/src/opentrons/protocol_engine/execution/pipetting.py @@ -91,7 +91,11 @@ def get_is_ready_to_aspirate(self, pipette_id: str) -> bool: ) async def prepare_for_aspirate(self, pipette_id: str) -> None: - """Prepare for pipette aspiration.""" + """Prepare for pipette aspiration. + + Raises: + PipetteOverpressureError, propagated as-is from the hardware controller. + """ hw_mount = self._state_view.pipettes.get_mount(pipette_id).to_hw_mount() await self._hardware_api.prepare_for_aspirate(mount=hw_mount) diff --git a/api/src/opentrons/protocol_engine/state/update_types.py b/api/src/opentrons/protocol_engine/state/update_types.py index 567ba39144c..76f16dadfbe 100644 --- a/api/src/opentrons/protocol_engine/state/update_types.py +++ b/api/src/opentrons/protocol_engine/state/update_types.py @@ -299,6 +299,19 @@ class StateUpdate: liquid_class_loaded: LiquidClassLoadedUpdate | NoChangeType = NO_CHANGE + def append(self, other: Self) -> Self: + """Apply another `StateUpdate` "on top of" this one. + + This object is mutated in-place, taking values from `other`. + If an attribute in `other` is `NO_CHANGE`, the value in this object is kept. + """ + fields = dataclasses.fields(other) + for field in fields: + other_value = other.__dict__[field.name] + if other_value != NO_CHANGE: + self.__dict__[field.name] = other_value + return self + @classmethod def reduce(cls: typing.Type[Self], *args: Self) -> Self: """Fuse multiple state updates into a single one. @@ -306,19 +319,10 @@ def reduce(cls: typing.Type[Self], *args: Self) -> Self: State updates that are later in the parameter list are preferred to those that are earlier; NO_CHANGE is ignored. """ - fields = dataclasses.fields(cls) - changes_dicts = [ - { - field.name: update.__dict__[field.name] - for field in fields - if update.__dict__[field.name] != NO_CHANGE - } - for update in args - ] - changes = {} - for changes_dict in changes_dicts: - changes.update(changes_dict) - return cls(**changes) + accumulator = cls() + for arg in args: + accumulator.append(arg) + return accumulator # These convenience functions let the caller avoid the boilerplate of constructing a # complicated dataclass tree. diff --git a/api/tests/opentrons/protocol_engine/commands/test_aspirate.py b/api/tests/opentrons/protocol_engine/commands/test_aspirate.py index 11078fb43cf..8e50d1825ae 100644 --- a/api/tests/opentrons/protocol_engine/commands/test_aspirate.py +++ b/api/tests/opentrons/protocol_engine/commands/test_aspirate.py @@ -12,7 +12,7 @@ from opentrons.protocol_engine.commands.pipetting_common import OverpressureError from opentrons.protocol_engine.commands.movement_common import StallOrCollisionError from opentrons.protocol_engine.state import update_types -from opentrons.types import MountType, Point +from opentrons.types import Point from opentrons.protocol_engine import ( LiquidHandlingWellLocation, WellOrigin, @@ -36,9 +36,9 @@ from opentrons.protocol_engine.resources.model_utils import ModelUtils from opentrons.protocol_engine.types import ( CurrentWell, - LoadedPipette, AspiratedFluid, FluidKind, + WellLocation, ) from opentrons.hardware_control import HardwareControlAPI from opentrons.protocol_engine.notes import CommandNoteAdder @@ -67,47 +67,50 @@ def subject( async def test_aspirate_implementation_no_prep( decoy: Decoy, state_view: StateView, - hardware_api: HardwareControlAPI, movement: MovementHandler, pipetting: PipettingHandler, subject: AspirateImplementation, mock_command_note_adder: CommandNoteAdder, ) -> None: """An Aspirate should have an execution implementation without preparing to aspirate.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" location = LiquidHandlingWellLocation( origin=WellOrigin.BOTTOM, offset=WellOffset(x=0, y=0, z=1) ) - - data = AspirateParams( - pipetteId="abc", - labwareId="123", - wellName="A3", + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, wellLocation=location, volume=50, flowRate=1.23, ) - decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id="abc")).then_return(True) + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + True + ) decoy.when( state_view.geometry.get_nozzles_per_well( - labware_id="123", - target_well_name="A3", - pipette_id="abc", + labware_id=labware_id, + target_well_name=well_name, + pipette_id=pipette_id, ) ).then_return(2) decoy.when( state_view.geometry.get_wells_covered_by_pipette_with_active_well( - "123", "A3", "abc" + labware_id, well_name, pipette_id ) - ).then_return(["A3", "A4"]) + ).then_return(["covered-well-1", "covered-well-2"]) decoy.when( await movement.move_to_well( - pipette_id="abc", - labware_id="123", - well_name="A3", + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, well_location=location, current_well=None, force_direct=False, @@ -119,30 +122,33 @@ async def test_aspirate_implementation_no_prep( decoy.when( await pipetting.aspirate_in_place( - pipette_id="abc", + pipette_id=pipette_id, volume=50, flow_rate=1.23, command_note_adder=mock_command_note_adder, ), ).then_return(50) - result = await subject.execute(data) + result = await subject.execute(params) assert result == SuccessData( public=AspirateResult(volume=50, position=DeckPoint(x=1, y=2, z=3)), state_update=update_types.StateUpdate( pipette_location=update_types.PipetteLocationUpdate( - pipette_id="abc", - new_location=update_types.Well(labware_id="123", well_name="A3"), + pipette_id=pipette_id, + new_location=update_types.Well( + labware_id=labware_id, well_name=well_name + ), new_deck_point=DeckPoint(x=1, y=2, z=3), ), liquid_operated=update_types.LiquidOperatedUpdate( - labware_id="123", - well_names=["A3", "A4"], + labware_id=labware_id, + well_names=["covered-well-1", "covered-well-2"], volume_added=-100, ), pipette_aspirated_fluid=update_types.PipetteAspiratedFluidUpdate( - pipette_id="abc", fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50) + pipette_id=pipette_id, + fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50), ), ), ) @@ -151,104 +157,112 @@ async def test_aspirate_implementation_no_prep( async def test_aspirate_implementation_with_prep( decoy: Decoy, state_view: StateView, - hardware_api: HardwareControlAPI, movement: MovementHandler, pipetting: PipettingHandler, mock_command_note_adder: CommandNoteAdder, subject: AspirateImplementation, ) -> None: """An Aspirate should have an execution implementation with preparing to aspirate.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" location = LiquidHandlingWellLocation( origin=WellOrigin.BOTTOM, offset=WellOffset(x=0, y=0, z=1) ) - - data = AspirateParams( - pipetteId="abc", - labwareId="123", - wellName="A3", + volume = 50 + flow_rate = 1.23 + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, wellLocation=location, - volume=50, - flowRate=1.23, + volume=volume, + flowRate=flow_rate, ) - decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id="abc")).then_return(False) - - decoy.when(state_view.pipettes.get(pipette_id="abc")).then_return( - LoadedPipette.construct( # type:ignore[call-arg] - mount=MountType.LEFT - ) + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + False ) + decoy.when( state_view.geometry.get_nozzles_per_well( - labware_id="123", - target_well_name="A3", - pipette_id="abc", + labware_id=labware_id, + target_well_name=well_name, + pipette_id=pipette_id, ) ).then_return(2) decoy.when( state_view.geometry.get_wells_covered_by_pipette_with_active_well( - "123", "A3", "abc" + labware_id, well_name, pipette_id ) - ).then_return(["A3", "A4"]) + ).then_return(["covered-well-1", "covered-well-2"]) + + decoy.when( + await movement.move_to_well( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + well_location=WellLocation(origin=WellOrigin.TOP), + current_well=None, + force_direct=False, + minimum_z_height=None, + speed=None, + operation_volume=None, + ), + ).then_return(Point()) + decoy.when( await movement.move_to_well( - pipette_id="abc", - labware_id="123", - well_name="A3", + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, well_location=location, current_well=CurrentWell( - pipette_id="abc", - labware_id="123", - well_name="A3", + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, ), force_direct=False, minimum_z_height=None, speed=None, - operation_volume=-50, + operation_volume=-volume, ), ).then_return(Point(x=1, y=2, z=3)) decoy.when( await pipetting.aspirate_in_place( - pipette_id="abc", - volume=50, - flow_rate=1.23, + pipette_id=pipette_id, + volume=volume, + flow_rate=flow_rate, command_note_adder=mock_command_note_adder, ), - ).then_return(50) + ).then_return(volume) - result = await subject.execute(data) + result = await subject.execute(params) assert result == SuccessData( public=AspirateResult(volume=50, position=DeckPoint(x=1, y=2, z=3)), state_update=update_types.StateUpdate( pipette_location=update_types.PipetteLocationUpdate( - pipette_id="abc", - new_location=update_types.Well(labware_id="123", well_name="A3"), + pipette_id=pipette_id, + new_location=update_types.Well( + labware_id=labware_id, well_name=well_name + ), new_deck_point=DeckPoint(x=1, y=2, z=3), ), liquid_operated=update_types.LiquidOperatedUpdate( - labware_id="123", - well_names=["A3", "A4"], + labware_id=labware_id, + well_names=["covered-well-1", "covered-well-2"], volume_added=-100, ), pipette_aspirated_fluid=update_types.PipetteAspiratedFluidUpdate( - pipette_id="abc", fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50) + pipette_id=pipette_id, + fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50), ), ), ) - decoy.verify( - await movement.move_to_well( - pipette_id="abc", - labware_id="123", - well_name="A3", - well_location=LiquidHandlingWellLocation(origin=WellOrigin.TOP), - ), - await pipetting.prepare_for_aspirate(pipette_id="abc"), - ) - async def test_aspirate_raises_volume_error( decoy: Decoy, @@ -259,40 +273,44 @@ async def test_aspirate_raises_volume_error( subject: AspirateImplementation, ) -> None: """Should raise an assertion error for volume larger than working volume.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" location = LiquidHandlingWellLocation( origin=WellOrigin.BOTTOM, offset=WellOffset(x=0, y=0, z=1) ) - - data = AspirateParams( - pipetteId="abc", - labwareId="123", - wellName="A3", + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, wellLocation=location, volume=50, flowRate=1.23, ) - decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id="abc")).then_return(True) + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + True + ) decoy.when( state_view.geometry.get_nozzles_per_well( - labware_id="123", - target_well_name="A3", - pipette_id="abc", + labware_id=labware_id, + target_well_name=well_name, + pipette_id=pipette_id, ) ).then_return(2) decoy.when( state_view.geometry.get_wells_covered_by_pipette_with_active_well( - "123", "A3", "abc" + labware_id, well_name, pipette_id ) - ).then_return(["A3", "A4"]) + ).then_return(["covered-well-1", "covered-well-2"]) decoy.when( await movement.move_to_well( - pipette_id="abc", - labware_id="123", - well_name="A3", + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, well_location=location, current_well=None, force_direct=False, @@ -304,7 +322,7 @@ async def test_aspirate_raises_volume_error( decoy.when( await pipetting.aspirate_in_place( - pipette_id="abc", + pipette_id=pipette_id, volume=50, flow_rate=1.23, command_note_adder=mock_command_note_adder, @@ -312,7 +330,7 @@ async def test_aspirate_raises_volume_error( ).then_raise(AssertionError("blah blah")) with pytest.raises(AssertionError): - await subject.execute(data) + await subject.execute(params) async def test_overpressure_error( @@ -337,7 +355,7 @@ async def test_overpressure_error( error_id = "error-id" error_timestamp = datetime(year=2020, month=1, day=2) - data = AspirateParams( + params = AspirateParams( pipetteId=pipette_id, labwareId=labware_id, wellName=well_name, @@ -348,17 +366,17 @@ async def test_overpressure_error( decoy.when( state_view.geometry.get_nozzles_per_well( - labware_id="labware-id", - target_well_name="well-name", - pipette_id="pipette-id", + labware_id=labware_id, + target_well_name=well_name, + pipette_id=pipette_id, ) ).then_return(2) decoy.when( state_view.geometry.get_wells_covered_by_pipette_with_active_well( - "labware-id", "well-name", "pipette-id" + labware_id, well_name, pipette_id ) - ).then_return(["A3", "A4"]) + ).then_return(["covered-well-1", "covered-well-2"]) decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( True @@ -390,7 +408,7 @@ async def test_overpressure_error( decoy.when(model_utils.generate_id()).then_return(error_id) decoy.when(model_utils.get_timestamp()).then_return(error_timestamp) - result = await subject.execute(data) + result = await subject.execute(params) assert result == DefinedErrorData( public=OverpressureError.construct( @@ -409,22 +427,13 @@ async def test_overpressure_error( ), liquid_operated=update_types.LiquidOperatedUpdate( labware_id=labware_id, - well_names=["A3", "A4"], + well_names=["covered-well-1", "covered-well-2"], volume_added=update_types.CLEAR, ), pipette_aspirated_fluid=update_types.PipetteUnknownFluidUpdate( pipette_id=pipette_id ), ), - state_update_if_false_positive=update_types.StateUpdate( - pipette_location=update_types.PipetteLocationUpdate( - pipette_id=pipette_id, - new_location=update_types.Well( - labware_id=labware_id, well_name=well_name - ), - new_deck_point=DeckPoint(x=position.x, y=position.y, z=position.z), - ), - ), ) @@ -438,15 +447,18 @@ async def test_aspirate_implementation_meniscus( mock_command_note_adder: CommandNoteAdder, ) -> None: """Aspirate should update WellVolumeOffset when called with WellOrigin.MENISCUS.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" location = LiquidHandlingWellLocation( origin=WellOrigin.MENISCUS, offset=WellOffset(x=0, y=0, z=-1), volumeOffset="operationVolume", ) - data = AspirateParams( - pipetteId="abc", - labwareId="123", - wellName="A3", + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, wellLocation=location, volume=50, flowRate=1.23, @@ -454,25 +466,27 @@ async def test_aspirate_implementation_meniscus( decoy.when( state_view.geometry.get_nozzles_per_well( - labware_id="123", - target_well_name="A3", - pipette_id="abc", + labware_id=labware_id, + target_well_name=well_name, + pipette_id=pipette_id, ) ).then_return(2) decoy.when( state_view.geometry.get_wells_covered_by_pipette_with_active_well( - "123", "A3", "abc" + labware_id, well_name, pipette_id ) - ).then_return(["A3", "A4"]) + ).then_return(["covered-well-1", "covered-well-2"]) - decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id="abc")).then_return(True) + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + True + ) decoy.when( await movement.move_to_well( - pipette_id="abc", - labware_id="123", - well_name="A3", + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, well_location=location, current_well=None, force_direct=False, @@ -484,36 +498,39 @@ async def test_aspirate_implementation_meniscus( decoy.when( await pipetting.aspirate_in_place( - pipette_id="abc", + pipette_id=pipette_id, volume=50, flow_rate=1.23, command_note_adder=mock_command_note_adder, ), ).then_return(50) - result = await subject.execute(data) + result = await subject.execute(params) assert result == SuccessData( public=AspirateResult(volume=50, position=DeckPoint(x=1, y=2, z=3)), state_update=update_types.StateUpdate( pipette_location=update_types.PipetteLocationUpdate( - pipette_id="abc", - new_location=update_types.Well(labware_id="123", well_name="A3"), + pipette_id=pipette_id, + new_location=update_types.Well( + labware_id=labware_id, well_name=well_name + ), new_deck_point=DeckPoint(x=1, y=2, z=3), ), liquid_operated=update_types.LiquidOperatedUpdate( - labware_id="123", - well_names=["A3", "A4"], + labware_id=labware_id, + well_names=["covered-well-1", "covered-well-2"], volume_added=-100, ), pipette_aspirated_fluid=update_types.PipetteAspiratedFluidUpdate( - pipette_id="abc", fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50) + pipette_id=pipette_id, + fluid=AspiratedFluid(kind=FluidKind.LIQUID, volume=50), ), ), ) -async def test_stall_error( +async def test_stall_during_final_movement( decoy: Decoy, movement: MovementHandler, pipetting: PipettingHandler, @@ -521,7 +538,7 @@ async def test_stall_error( model_utils: ModelUtils, state_view: StateView, ) -> None: - """It should return an overpressure error if the hardware API indicates that.""" + """It should propagate a stall error that happens when moving to the final position.""" pipette_id = "pipette-id" labware_id = "labware-id" well_name = "well-name" @@ -535,7 +552,7 @@ async def test_stall_error( True ) - data = AspirateParams( + params = AspirateParams( pipetteId=pipette_id, labwareId=labware_id, wellName=well_name, @@ -561,7 +578,7 @@ async def test_stall_error( decoy.when(model_utils.generate_id()).then_return(error_id) decoy.when(model_utils.get_timestamp()).then_return(error_timestamp) - result = await subject.execute(data) + result = await subject.execute(params) assert result == DefinedErrorData( public=StallOrCollisionError.construct( @@ -571,3 +588,154 @@ async def test_stall_error( ), state_update=update_types.StateUpdate(pipette_location=update_types.CLEAR), ) + + +async def test_stall_during_preparation( + decoy: Decoy, + movement: MovementHandler, + pipetting: PipettingHandler, + subject: AspirateImplementation, + model_utils: ModelUtils, +) -> None: + """It should propagate a stall error that happens during the prepare-to-aspirate part.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" + well_location = LiquidHandlingWellLocation( + origin=WellOrigin.BOTTOM, offset=WellOffset(x=0, y=0, z=1) + ) + + error_id = "error-id" + error_timestamp = datetime(year=2020, month=1, day=2) + + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=50, + flowRate=1.23, + ) + + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + False + ) + + decoy.when( + await movement.move_to_well( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + well_location=WellLocation(origin=WellOrigin.TOP), + current_well=None, + force_direct=False, + minimum_z_height=None, + speed=None, + operation_volume=None, + ), + ).then_raise(StallOrCollisionDetectedError()) + decoy.when(model_utils.generate_id()).then_return(error_id) + decoy.when(model_utils.get_timestamp()).then_return(error_timestamp) + + result = await subject.execute(params) + assert result == DefinedErrorData( + public=StallOrCollisionError.construct( + id=error_id, createdAt=error_timestamp, wrappedErrors=[matchers.Anything()] + ), + state_update=update_types.StateUpdate( + pipette_location=update_types.CLEAR, + ), + state_update_if_false_positive=update_types.StateUpdate(), + ) + + +async def test_overpressure_during_preparation( + decoy: Decoy, + movement: MovementHandler, + pipetting: PipettingHandler, + subject: AspirateImplementation, + state_view: StateView, + model_utils: ModelUtils, +) -> None: + """It should propagate an overpressure error that happens during the prepare-to-aspirate part.""" + pipette_id = "pipette-id" + labware_id = "labware-id" + well_name = "well-name" + well_location = LiquidHandlingWellLocation( + origin=WellOrigin.BOTTOM, offset=WellOffset(x=0, y=0, z=1) + ) + + error_id = "error-id" + error_timestamp = datetime(year=2020, month=1, day=2) + + params = AspirateParams( + pipetteId=pipette_id, + labwareId=labware_id, + wellName=well_name, + wellLocation=well_location, + volume=50, + flowRate=1.23, + ) + + decoy.when(pipetting.get_is_ready_to_aspirate(pipette_id=pipette_id)).then_return( + False + ) + + retry_location = Point(1, 2, 3) + decoy.when( + state_view.geometry.get_well_position( + labware_id=labware_id, + well_name=well_name, + well_location=well_location, + operation_volume=-params.volume, + pipette_id=pipette_id, + ) + ).then_return(retry_location) + + prep_location = Point(4, 5, 6) + decoy.when( + await movement.move_to_well( + pipette_id=pipette_id, + labware_id=labware_id, + well_name=well_name, + well_location=WellLocation(origin=WellOrigin.TOP), + current_well=None, + force_direct=False, + minimum_z_height=None, + speed=None, + operation_volume=None, + ), + ).then_return(prep_location) + + decoy.when(await pipetting.prepare_for_aspirate(pipette_id)).then_raise( + PipetteOverpressureError() + ) + decoy.when(model_utils.generate_id()).then_return(error_id) + decoy.when(model_utils.get_timestamp()).then_return(error_timestamp) + + result = await subject.execute(params) + assert result == DefinedErrorData( + public=OverpressureError.construct( + id=error_id, + createdAt=error_timestamp, + wrappedErrors=[matchers.Anything()], + errorInfo={ + "retryLocation": (retry_location.x, retry_location.y, retry_location.z) + }, + ), + state_update=update_types.StateUpdate( + pipette_location=update_types.PipetteLocationUpdate( + pipette_id=pipette_id, + new_location=update_types.Well( + labware_id=labware_id, well_name=well_name + ), + new_deck_point=DeckPoint( + x=prep_location.x, y=prep_location.y, z=prep_location.z + ), + ), + pipette_aspirated_fluid=update_types.PipetteUnknownFluidUpdate( + pipette_id=pipette_id + ), + ), + state_update_if_false_positive=update_types.StateUpdate(), + ) diff --git a/api/tests/opentrons/protocol_engine/state/test_update_types.py b/api/tests/opentrons/protocol_engine/state/test_update_types.py new file mode 100644 index 00000000000..741df813e19 --- /dev/null +++ b/api/tests/opentrons/protocol_engine/state/test_update_types.py @@ -0,0 +1,75 @@ +"""Unit tests for the utilities in `update_types`.""" + + +from opentrons.protocol_engine.state import update_types + + +def test_append() -> None: + """Test `StateUpdate.append()`.""" + state_update = update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=True + ) + ) + + # Populating a new field should leave the original ones unchanged. + result = state_update.append( + update_types.StateUpdate(pipette_location=update_types.CLEAR) + ) + assert result is state_update + assert state_update.absorbance_reader_lid == update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=True + ) + assert state_update.pipette_location == update_types.CLEAR + + # Populating a field that's already been populated should overwrite it. + result = state_update.append( + update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=False + ) + ) + ) + assert result is state_update + assert state_update.absorbance_reader_lid == update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=False + ) + assert state_update.pipette_location == update_types.CLEAR + + +def test_reduce() -> None: + """Test `StateUpdate.reduce()`.""" + assert update_types.StateUpdate.reduce() == update_types.StateUpdate() + + # It should union all the set fields together. + assert update_types.StateUpdate.reduce( + update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=True + ) + ), + update_types.StateUpdate(pipette_location=update_types.CLEAR), + ) == update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=True + ), + pipette_location=update_types.CLEAR, + ) + + # When one field appears multiple times, the last write wins. + assert update_types.StateUpdate.reduce( + update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=True + ) + ), + update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=False + ) + ), + ) == update_types.StateUpdate( + absorbance_reader_lid=update_types.AbsorbanceReaderLidUpdate( + module_id="module_id", is_lid_on=False + ) + )