Skip to content

Commit

Permalink
feat(api): RobotContext: Add pipette helper functions to convert volu…
Browse files Browse the repository at this point in the history
…me and position type (#16682)
  • Loading branch information
Laura-Danielle authored Nov 20, 2024
1 parent b365e27 commit db8b1e5
Show file tree
Hide file tree
Showing 25 changed files with 601 additions and 51 deletions.
2 changes: 2 additions & 0 deletions api/src/opentrons/hardware_control/dev_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class PipetteDict(InstrumentDict):
pipette_bounding_box_offsets: PipetteBoundingBoxOffsetDefinition
current_nozzle_map: NozzleMap
lld_settings: Optional[Dict[str, Dict[str, float]]]
plunger_positions: Dict[str, float]
shaft_ul_per_mm: float


class PipetteStateDict(TypedDict):
Expand Down
20 changes: 4 additions & 16 deletions api/src/opentrons/hardware_control/instruments/ot2/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
CommandPreconditionViolated,
)
from opentrons_shared_data.pipette.ul_per_mm import (
piecewise_volume_conversion,
calculate_ul_per_mm,
PIPETTING_FUNCTION_FALLBACK_VERSION,
PIPETTING_FUNCTION_LATEST_VERSION,
)
Expand Down Expand Up @@ -584,21 +584,9 @@ def get_nominal_tip_overlap_dictionary_by_configuration(
# want this to unbounded.
@functools.lru_cache(maxsize=100)
def ul_per_mm(self, ul: float, action: UlPerMmAction) -> float:
if action == "aspirate":
fallback = self._active_tip_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.aspirate.default.get(
self._pipetting_function_version, fallback
)
else:
fallback = self._active_tip_settings.dispense.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.dispense.default.get(
self._pipetting_function_version, fallback
)
return piecewise_volume_conversion(ul, sequence)
return calculate_ul_per_mm(
ul, action, self._active_tip_settings, self._pipetting_function_version
)

def __str__(self) -> str:
return "{} current volume {}ul critical point: {} at {}".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ def get_attached_instrument(self, mount: MountType) -> PipetteDict:
"pipette_bounding_box_offsets"
] = instr.config.pipette_bounding_box_offsets
result["lld_settings"] = instr.config.lld_settings
result["plunger_positions"] = {
"top": instr.plunger_positions.top,
"bottom": instr.plunger_positions.bottom,
"blow_out": instr.plunger_positions.blow_out,
"drop_tip": instr.plunger_positions.drop_tip,
}
result["shaft_ul_per_mm"] = instr.config.shaft_ul_per_mm
return cast(PipetteDict, result)

@property
Expand Down
27 changes: 9 additions & 18 deletions api/src/opentrons/hardware_control/instruments/ot3/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
InvalidInstrumentData,
)
from opentrons_shared_data.pipette.ul_per_mm import (
piecewise_volume_conversion,
calculate_ul_per_mm,
PIPETTING_FUNCTION_FALLBACK_VERSION,
PIPETTING_FUNCTION_LATEST_VERSION,
)
Expand Down Expand Up @@ -529,23 +529,13 @@ def tip_presence_responses(self) -> int:
# want this to unbounded.
@functools.lru_cache(maxsize=100)
def ul_per_mm(self, ul: float, action: UlPerMmAction) -> float:
if action == "aspirate":
fallback = self._active_tip_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.aspirate.default.get(
self._pipetting_function_version, fallback
)
elif action == "blowout":
return self._config.shaft_ul_per_mm
else:
fallback = self._active_tip_settings.dispense.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.dispense.default.get(
self._pipetting_function_version, fallback
)
return piecewise_volume_conversion(ul, sequence)
return calculate_ul_per_mm(
ul,
action,
self._active_tip_settings,
self._pipetting_function_version,
self._config.shaft_ul_per_mm,
)

def __str__(self) -> str:
return "{} current volume {}ul critical point: {} at {}".format(
Expand Down Expand Up @@ -585,6 +575,7 @@ def as_dict(self) -> "Pipette.DictType":
"versioned_tip_overlap": self.tip_overlap,
"back_compat_names": self._config.pipette_backcompat_names,
"supported_tips": self.liquid_class.supported_tips,
"shaft_ul_per_mm": self._config.shaft_ul_per_mm,
}
)
return self._config_as_dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ def get_attached_instrument(self, mount: OT3Mount) -> PipetteDict:
"pipette_bounding_box_offsets"
] = instr.config.pipette_bounding_box_offsets
result["lld_settings"] = instr.config.lld_settings
result["plunger_positions"] = {
"top": instr.plunger_positions.top,
"bottom": instr.plunger_positions.bottom,
"blow_out": instr.plunger_positions.blow_out,
"drop_tip": instr.plunger_positions.drop_tip,
}
result["shaft_ul_per_mm"] = instr.config.shaft_ul_per_mm
return cast(PipetteDict, result)

@property
Expand Down
21 changes: 20 additions & 1 deletion api/src/opentrons/protocol_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@
)
from .disposal_locations import TrashBin, WasteChute
from ._liquid import Liquid, LiquidClass
from ._types import OFF_DECK
from ._types import (
OFF_DECK,
PLUNGER_BLOWOUT,
PLUNGER_TOP,
PLUNGER_BOTTOM,
PLUNGER_DROPTIP,
ASPIRATE_ACTION,
DISPENSE_ACTION,
BLOWOUT_ACTION,
)
from ._nozzle_layout import (
COLUMN,
PARTIAL_COLUMN,
Expand Down Expand Up @@ -69,12 +78,22 @@
"Liquid",
"LiquidClass",
"Parameters",
# Partial Tip types
"COLUMN",
"PARTIAL_COLUMN",
"SINGLE",
"ROW",
"ALL",
# Deck location types
"OFF_DECK",
# Pipette plunger types
"PLUNGER_BLOWOUT",
"PLUNGER_TOP",
"PLUNGER_BOTTOM",
"PLUNGER_DROPTIP",
"ASPIRATE_ACTION",
"DISPENSE_ACTION",
"BLOWOUT_ACTION",
"RuntimeParameterRequiredError",
"CSVParameter",
# For internal Opentrons use only:
Expand Down
24 changes: 24 additions & 0 deletions api/src/opentrons/protocol_api/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,27 @@ class OffDeckType(enum.Enum):
See :ref:`off-deck-location` for details on using ``OFF_DECK`` with :py:obj:`ProtocolContext.move_labware()`.
"""


class PlungerPositionTypes(enum.Enum):
PLUNGER_TOP = "top"
PLUNGER_BOTTOM = "bottom"
PLUNGER_BLOWOUT = "blow_out"
PLUNGER_DROPTIP = "drop_tip"


PLUNGER_TOP: Final = PlungerPositionTypes.PLUNGER_TOP
PLUNGER_BOTTOM: Final = PlungerPositionTypes.PLUNGER_BOTTOM
PLUNGER_BLOWOUT: Final = PlungerPositionTypes.PLUNGER_BLOWOUT
PLUNGER_DROPTIP: Final = PlungerPositionTypes.PLUNGER_DROPTIP


class PipetteActionTypes(enum.Enum):
ASPIRATE_ACTION = "aspirate"
DISPENSE_ACTION = "dispense"
BLOWOUT_ACTION = "blowout"


ASPIRATE_ACTION: Final = PipetteActionTypes.ASPIRATE_ACTION
DISPENSE_ACTION: Final = PipetteActionTypes.DISPENSE_ACTION
BLOWOUT_ACTION: Final = PipetteActionTypes.BLOWOUT_ACTION
54 changes: 51 additions & 3 deletions api/src/opentrons/protocol_api/core/engine/robot.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import Optional, Dict
from typing import Optional, Dict, Union
from opentrons.hardware_control import SyncHardwareAPI

from opentrons.types import Mount, MountType, Point, AxisType, AxisMapType
from opentrons_shared_data.pipette import types as pip_types
from opentrons.protocol_api._types import PipetteActionTypes, PlungerPositionTypes
from opentrons.protocol_engine import commands as cmd
from opentrons.protocol_engine.clients import SyncClient as EngineClient
from opentrons.protocol_engine.types import DeckPoint, MotorAxis

from opentrons.protocol_api.core.robot import AbstractRobot


_AXIS_TYPE_TO_MOTOR_AXIS = {
AxisType.X: MotorAxis.X,
AxisType.Y: MotorAxis.Y,
Expand Down Expand Up @@ -39,12 +42,57 @@ def __init__(
def _convert_to_engine_mount(self, axis_map: AxisMapType) -> Dict[MotorAxis, float]:
return {_AXIS_TYPE_TO_MOTOR_AXIS[ax]: dist for ax, dist in axis_map.items()}

def get_pipette_type_from_engine(self, mount: Mount) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[pip_types.PipetteNameType]:
"""Get the pipette attached to the given mount."""
engine_mount = MountType[mount.name]
if isinstance(mount, Mount):
engine_mount = MountType[mount.name]
else:
if mount.lower() == "right":
engine_mount = MountType.RIGHT
else:
engine_mount = MountType.LEFT
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
return maybe_pipette.pipetteName if maybe_pipette else None

def get_plunger_position_from_name(
self, mount: Mount, position_name: PlungerPositionTypes
) -> float:
engine_mount = MountType[mount.name]
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
if not maybe_pipette:
return 0.0
return self._engine_client.state.pipettes.lookup_plunger_position_name(
maybe_pipette.id, position_name.value
)

def get_plunger_position_from_volume(
self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str
) -> float:
engine_mount = MountType[mount.name]
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
if not maybe_pipette:
raise RuntimeError(
f"Cannot load plunger position as no pipette is attached to {mount}"
)
convert_volume = (
self._engine_client.state.pipettes.lookup_volume_to_mm_conversion(
maybe_pipette.id, volume, action.value
)
)
plunger_bottom = (
self._engine_client.state.pipettes.lookup_plunger_position_name(
maybe_pipette.id, "bottom"
)
)
mm = volume / convert_volume
if robot_type == "OT-2 Standard":
position = plunger_bottom + mm
else:
position = plunger_bottom - mm
return round(position, 6)

def move_to(self, mount: Mount, destination: Point, speed: Optional[float]) -> None:
engine_mount = MountType[mount.name]
engine_destination = DeckPoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class LegacyProtocolCore(
LegacyInstrumentCore,
LegacyLabwareCore,
legacy_module_core.LegacyModuleCore,
# None,
]
):
def __init__(
Expand Down
20 changes: 18 additions & 2 deletions api/src/opentrons/protocol_api/core/robot.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
from abc import abstractmethod, ABC
from typing import Optional
from typing import Optional, Union

from opentrons.types import AxisMapType, Mount, Point
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes


class AbstractRobot(ABC):
@abstractmethod
def get_pipette_type_from_engine(self, mount: Mount) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[PipetteNameType]:
...

@abstractmethod
def get_plunger_position_from_volume(
self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str
) -> float:
...

@abstractmethod
def get_plunger_position_from_name(
self, mount: Mount, position_name: PlungerPositionTypes
) -> float:
...

@abstractmethod
Expand Down
42 changes: 36 additions & 6 deletions api/src/opentrons/protocol_api/robot_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .core.common import ProtocolCore, RobotCore
from .module_contexts import ModuleContext
from .labware import Labware
from ._types import PipetteActionTypes, PlungerPositionTypes


class HardwareManager(NamedTuple):
Expand Down Expand Up @@ -200,14 +201,43 @@ def axis_coordinates_for(
raise TypeError("You must specify a location to move to.")

def plunger_coordinates_for_volume(
self, mount: Union[Mount, str], volume: float
) -> None:
raise NotImplementedError()
self, mount: Union[Mount, str], volume: float, action: PipetteActionTypes
) -> AxisMapType:
"""
Build a :py:class:`.types.AxisMapType` for a pipette plunger motor from volume.
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)
if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)

pipette_position = self._core.get_plunger_position_from_volume(
mount, volume, action, self._protocol_core.robot_type
)
return {pipette_axis: pipette_position}

def plunger_coordinates_for_named_position(
self, mount: Union[Mount, str], position_name: str
) -> None:
raise NotImplementedError()
self, mount: Union[Mount, str], position_name: PlungerPositionTypes
) -> AxisMapType:
"""
Build a :py:class:`.types.AxisMapType` for a pipette plunger motor from position_name.
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)
if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)
pipette_position = self._core.get_plunger_position_from_name(
mount, position_name
)
return {pipette_axis: pipette_position}

def build_axis_map(self, axis_map: StringAxisMap) -> AxisMapType:
"""Take in a :py:class:`.types.StringAxisMap` and output a :py:class:`.types.AxisMapType`.
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/gantry_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
HardwareAxis.Q: MotorAxis.AXIS_96_CHANNEL_CAM,
}


# The height of the bottom of the pipette nozzle at home position without any tips.
# We rely on this being the same for every OT-3 pipette.
#
Expand Down Expand Up @@ -305,7 +306,6 @@ async def move_mount_to(
) -> Point:
"""Move the given hardware mount to a waypoint."""
assert len(waypoints) > 0, "Must have at least one waypoint"
log.info(f"Moving mount {mount}")
for waypoint in waypoints:
log.info(f"The current waypoint moving is {waypoint}")
await self._hardware_api.move_to(
Expand Down Expand Up @@ -340,6 +340,10 @@ async def move_axes(
mount, refresh=True
)
log.info(f"The current position of the robot is: {current_position}.")
converted_current_position_deck = (
self._hardware_api.get_deck_from_machine(current_position)
)
log.info(f"The current position of the robot is: {current_position}.")

pos_hw = target_axis_map_from_relative(pos_hw, current_position)
log.info(
Expand Down
Loading

0 comments on commit db8b1e5

Please sign in to comment.