diff --git a/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py b/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py index d427b38dc1e..d63e42a7f90 100644 --- a/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py +++ b/api/src/opentrons/protocol_engine/commands/prepare_to_aspirate.py @@ -1,18 +1,28 @@ """Prepare to aspirate command request, result, and implementation models.""" from __future__ import annotations +from opentrons_shared_data.errors.exceptions import PipetteOverpressureError from pydantic import BaseModel -from typing import TYPE_CHECKING, Optional, Type +from typing import TYPE_CHECKING, Optional, Type, Union from typing_extensions import Literal from .pipetting_common import ( + OverpressureError, PipetteIdMixin, ) -from .command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData +from .command import ( + AbstractCommandImpl, + BaseCommand, + BaseCommandCreate, + DefinedErrorData, + SuccessData, +) from ..errors.error_occurrence import ErrorOccurrence if TYPE_CHECKING: - from ..execution.pipetting import PipettingHandler + from ..execution import PipettingHandler, GantryMover + from ..resources import ModelUtils + PrepareToAspirateCommandType = Literal["prepareToAspirate"] @@ -29,25 +39,60 @@ class PrepareToAspirateResult(BaseModel): pass +_ExecuteReturn = Union[ + SuccessData[PrepareToAspirateResult, None], + DefinedErrorData[OverpressureError], +] + + class PrepareToAspirateImplementation( - AbstractCommandImpl[ - PrepareToAspirateParams, SuccessData[PrepareToAspirateResult, None] - ] + AbstractCommandImpl[PrepareToAspirateParams, _ExecuteReturn] ): """Prepare for aspirate command implementation.""" - def __init__(self, pipetting: PipettingHandler, **kwargs: object) -> None: + def __init__( + self, + pipetting: PipettingHandler, + model_utils: ModelUtils, + gantry_mover: GantryMover, + **kwargs: object, + ) -> None: self._pipetting_handler = pipetting + self._model_utils = model_utils + self._gantry_mover = gantry_mover - async def execute( - self, params: PrepareToAspirateParams - ) -> SuccessData[PrepareToAspirateResult, None]: + async def execute(self, params: PrepareToAspirateParams) -> _ExecuteReturn: """Prepare the pipette to aspirate.""" - await self._pipetting_handler.prepare_for_aspirate( - pipette_id=params.pipetteId, - ) - - return SuccessData(public=PrepareToAspirateResult(), private=None) + current_position = await self._gantry_mover.get_position(params.pipetteId) + try: + await self._pipetting_handler.prepare_for_aspirate( + pipette_id=params.pipetteId, + ) + except PipetteOverpressureError as e: + return DefinedErrorData( + public=OverpressureError( + id=self._model_utils.generate_id(), + createdAt=self._model_utils.get_timestamp(), + wrappedErrors=[ + ErrorOccurrence.from_failed( + id=self._model_utils.generate_id(), + createdAt=self._model_utils.get_timestamp(), + error=e, + ) + ], + errorInfo=( + { + "retryLocation": ( + current_position.x, + current_position.y, + current_position.z, + ) + } + ), + ), + ) + else: + return SuccessData(public=PrepareToAspirateResult(), private=None) class PrepareToAspirate( diff --git a/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py b/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py index b11254af481..45e8db96837 100644 --- a/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py +++ b/api/tests/opentrons/protocol_engine/commands/test_prepare_to_aspirate.py @@ -1,25 +1,41 @@ """Test prepare to aspirate commands.""" - -from decoy import Decoy +from datetime import datetime +from opentrons.types import Point +import pytest +from decoy import Decoy, matchers from opentrons.protocol_engine.execution import ( PipettingHandler, ) -from opentrons.protocol_engine.commands.command import SuccessData +from opentrons.protocol_engine.commands.command import DefinedErrorData, SuccessData from opentrons.protocol_engine.commands.prepare_to_aspirate import ( PrepareToAspirateParams, PrepareToAspirateImplementation, PrepareToAspirateResult, ) +from opentrons.protocol_engine.execution.gantry_mover import GantryMover +from opentrons.protocol_engine.resources.model_utils import ModelUtils +from opentrons.protocol_engine.commands.pipetting_common import OverpressureError +from opentrons_shared_data.errors.exceptions import PipetteOverpressureError + + +@pytest.fixture +def subject( + pipetting: PipettingHandler, + model_utils: ModelUtils, + gantry_mover: GantryMover, +) -> PrepareToAspirateImplementation: + """Get the implementation subject.""" + return PrepareToAspirateImplementation( + pipetting=pipetting, model_utils=model_utils, gantry_mover=gantry_mover + ) async def test_prepare_to_aspirate_implmenetation( - decoy: Decoy, pipetting: PipettingHandler + decoy: Decoy, subject: PrepareToAspirateImplementation, pipetting: PipettingHandler ) -> None: """A PrepareToAspirate command should have an executing implementation.""" - subject = PrepareToAspirateImplementation(pipetting=pipetting) - data = PrepareToAspirateParams(pipetteId="some id") decoy.when(await pipetting.prepare_for_aspirate(pipette_id="some id")).then_return( @@ -28,3 +44,44 @@ async def test_prepare_to_aspirate_implmenetation( result = await subject.execute(data) assert result == SuccessData(public=PrepareToAspirateResult(), private=None) + + +async def test_overpressure_error( + decoy: Decoy, + gantry_mover: GantryMover, + pipetting: PipettingHandler, + subject: PrepareToAspirateImplementation, + model_utils: ModelUtils, +) -> None: + """It should return an overpressure error if the hardware API indicates that.""" + pipette_id = "pipette-id" + + position = Point(x=1, y=2, z=3) + + error_id = "error-id" + error_timestamp = datetime(year=2020, month=1, day=2) + + data = PrepareToAspirateParams( + pipetteId=pipette_id, + ) + + decoy.when( + await pipetting.prepare_for_aspirate( + pipette_id=pipette_id, + ), + ).then_raise(PipetteOverpressureError()) + + decoy.when(model_utils.generate_id()).then_return(error_id) + decoy.when(model_utils.get_timestamp()).then_return(error_timestamp) + decoy.when(await gantry_mover.get_position(pipette_id)).then_return(position) + + result = await subject.execute(data) + + assert result == DefinedErrorData( + public=OverpressureError.construct( + id=error_id, + createdAt=error_timestamp, + wrappedErrors=[matchers.Anything()], + errorInfo={"retryLocation": (position.x, position.y, position.z)}, + ), + )