Skip to content

Commit

Permalink
Merge branch 'edge' into make_push
Browse files Browse the repository at this point in the history
  • Loading branch information
rclarke0 authored Nov 20, 2024
2 parents 42768be + ac051f7 commit eb0d23b
Show file tree
Hide file tree
Showing 49 changed files with 697 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ def _lookup_serial_key(pipette_name: FirmwarePipetteName) -> str:
lookup_name = {
FirmwarePipetteName.p1000_single: "P1KS",
FirmwarePipetteName.p1000_multi: "P1KM",
FirmwarePipetteName.p1000_multi_em: "P1KP",
FirmwarePipetteName.p50_single: "P50S",
FirmwarePipetteName.p50_multi: "P50M",
FirmwarePipetteName.p1000_96: "P1KH",
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/hardware_control/dev_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class PipetteDict(InstrumentDict):
pipette_bounding_box_offsets: PipetteBoundingBoxOffsetDefinition
current_nozzle_map: NozzleMap
lld_settings: Optional[Dict[str, Dict[str, float]]]
plunger_positions: Dict[str, float]
shaft_ul_per_mm: float


class PipetteStateDict(TypedDict):
Expand Down
20 changes: 4 additions & 16 deletions api/src/opentrons/hardware_control/instruments/ot2/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
CommandPreconditionViolated,
)
from opentrons_shared_data.pipette.ul_per_mm import (
piecewise_volume_conversion,
calculate_ul_per_mm,
PIPETTING_FUNCTION_FALLBACK_VERSION,
PIPETTING_FUNCTION_LATEST_VERSION,
)
Expand Down Expand Up @@ -584,21 +584,9 @@ def get_nominal_tip_overlap_dictionary_by_configuration(
# want this to unbounded.
@functools.lru_cache(maxsize=100)
def ul_per_mm(self, ul: float, action: UlPerMmAction) -> float:
if action == "aspirate":
fallback = self._active_tip_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.aspirate.default.get(
self._pipetting_function_version, fallback
)
else:
fallback = self._active_tip_settings.dispense.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.dispense.default.get(
self._pipetting_function_version, fallback
)
return piecewise_volume_conversion(ul, sequence)
return calculate_ul_per_mm(
ul, action, self._active_tip_settings, self._pipetting_function_version
)

def __str__(self) -> str:
return "{} current volume {}ul critical point: {} at {}".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,13 @@ def get_attached_instrument(self, mount: MountType) -> PipetteDict:
"pipette_bounding_box_offsets"
] = instr.config.pipette_bounding_box_offsets
result["lld_settings"] = instr.config.lld_settings
result["plunger_positions"] = {
"top": instr.plunger_positions.top,
"bottom": instr.plunger_positions.bottom,
"blow_out": instr.plunger_positions.blow_out,
"drop_tip": instr.plunger_positions.drop_tip,
}
result["shaft_ul_per_mm"] = instr.config.shaft_ul_per_mm
return cast(PipetteDict, result)

@property
Expand Down
27 changes: 9 additions & 18 deletions api/src/opentrons/hardware_control/instruments/ot3/pipette.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
InvalidInstrumentData,
)
from opentrons_shared_data.pipette.ul_per_mm import (
piecewise_volume_conversion,
calculate_ul_per_mm,
PIPETTING_FUNCTION_FALLBACK_VERSION,
PIPETTING_FUNCTION_LATEST_VERSION,
)
Expand Down Expand Up @@ -529,23 +529,13 @@ def tip_presence_responses(self) -> int:
# want this to unbounded.
@functools.lru_cache(maxsize=100)
def ul_per_mm(self, ul: float, action: UlPerMmAction) -> float:
if action == "aspirate":
fallback = self._active_tip_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.aspirate.default.get(
self._pipetting_function_version, fallback
)
elif action == "blowout":
return self._config.shaft_ul_per_mm
else:
fallback = self._active_tip_settings.dispense.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = self._active_tip_settings.dispense.default.get(
self._pipetting_function_version, fallback
)
return piecewise_volume_conversion(ul, sequence)
return calculate_ul_per_mm(
ul,
action,
self._active_tip_settings,
self._pipetting_function_version,
self._config.shaft_ul_per_mm,
)

def __str__(self) -> str:
return "{} current volume {}ul critical point: {} at {}".format(
Expand Down Expand Up @@ -585,6 +575,7 @@ def as_dict(self) -> "Pipette.DictType":
"versioned_tip_overlap": self.tip_overlap,
"back_compat_names": self._config.pipette_backcompat_names,
"supported_tips": self.liquid_class.supported_tips,
"shaft_ul_per_mm": self._config.shaft_ul_per_mm,
}
)
return self._config_as_dict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ def get_attached_instrument(self, mount: OT3Mount) -> PipetteDict:
"pipette_bounding_box_offsets"
] = instr.config.pipette_bounding_box_offsets
result["lld_settings"] = instr.config.lld_settings
result["plunger_positions"] = {
"top": instr.plunger_positions.top,
"bottom": instr.plunger_positions.bottom,
"blow_out": instr.plunger_positions.blow_out,
"drop_tip": instr.plunger_positions.drop_tip,
}
result["shaft_ul_per_mm"] = instr.config.shaft_ul_per_mm
return cast(PipetteDict, result)

@property
Expand Down
21 changes: 20 additions & 1 deletion api/src/opentrons/protocol_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@
)
from .disposal_locations import TrashBin, WasteChute
from ._liquid import Liquid, LiquidClass
from ._types import OFF_DECK
from ._types import (
OFF_DECK,
PLUNGER_BLOWOUT,
PLUNGER_TOP,
PLUNGER_BOTTOM,
PLUNGER_DROPTIP,
ASPIRATE_ACTION,
DISPENSE_ACTION,
BLOWOUT_ACTION,
)
from ._nozzle_layout import (
COLUMN,
PARTIAL_COLUMN,
Expand Down Expand Up @@ -69,12 +78,22 @@
"Liquid",
"LiquidClass",
"Parameters",
# Partial Tip types
"COLUMN",
"PARTIAL_COLUMN",
"SINGLE",
"ROW",
"ALL",
# Deck location types
"OFF_DECK",
# Pipette plunger types
"PLUNGER_BLOWOUT",
"PLUNGER_TOP",
"PLUNGER_BOTTOM",
"PLUNGER_DROPTIP",
"ASPIRATE_ACTION",
"DISPENSE_ACTION",
"BLOWOUT_ACTION",
"RuntimeParameterRequiredError",
"CSVParameter",
# For internal Opentrons use only:
Expand Down
24 changes: 24 additions & 0 deletions api/src/opentrons/protocol_api/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,27 @@ class OffDeckType(enum.Enum):
See :ref:`off-deck-location` for details on using ``OFF_DECK`` with :py:obj:`ProtocolContext.move_labware()`.
"""


class PlungerPositionTypes(enum.Enum):
PLUNGER_TOP = "top"
PLUNGER_BOTTOM = "bottom"
PLUNGER_BLOWOUT = "blow_out"
PLUNGER_DROPTIP = "drop_tip"


PLUNGER_TOP: Final = PlungerPositionTypes.PLUNGER_TOP
PLUNGER_BOTTOM: Final = PlungerPositionTypes.PLUNGER_BOTTOM
PLUNGER_BLOWOUT: Final = PlungerPositionTypes.PLUNGER_BLOWOUT
PLUNGER_DROPTIP: Final = PlungerPositionTypes.PLUNGER_DROPTIP


class PipetteActionTypes(enum.Enum):
ASPIRATE_ACTION = "aspirate"
DISPENSE_ACTION = "dispense"
BLOWOUT_ACTION = "blowout"


ASPIRATE_ACTION: Final = PipetteActionTypes.ASPIRATE_ACTION
DISPENSE_ACTION: Final = PipetteActionTypes.DISPENSE_ACTION
BLOWOUT_ACTION: Final = PipetteActionTypes.BLOWOUT_ACTION
54 changes: 51 additions & 3 deletions api/src/opentrons/protocol_api/core/engine/robot.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import Optional, Dict
from typing import Optional, Dict, Union
from opentrons.hardware_control import SyncHardwareAPI

from opentrons.types import Mount, MountType, Point, AxisType, AxisMapType
from opentrons_shared_data.pipette import types as pip_types
from opentrons.protocol_api._types import PipetteActionTypes, PlungerPositionTypes
from opentrons.protocol_engine import commands as cmd
from opentrons.protocol_engine.clients import SyncClient as EngineClient
from opentrons.protocol_engine.types import DeckPoint, MotorAxis

from opentrons.protocol_api.core.robot import AbstractRobot


_AXIS_TYPE_TO_MOTOR_AXIS = {
AxisType.X: MotorAxis.X,
AxisType.Y: MotorAxis.Y,
Expand Down Expand Up @@ -39,12 +42,57 @@ def __init__(
def _convert_to_engine_mount(self, axis_map: AxisMapType) -> Dict[MotorAxis, float]:
return {_AXIS_TYPE_TO_MOTOR_AXIS[ax]: dist for ax, dist in axis_map.items()}

def get_pipette_type_from_engine(self, mount: Mount) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[pip_types.PipetteNameType]:
"""Get the pipette attached to the given mount."""
engine_mount = MountType[mount.name]
if isinstance(mount, Mount):
engine_mount = MountType[mount.name]
else:
if mount.lower() == "right":
engine_mount = MountType.RIGHT
else:
engine_mount = MountType.LEFT
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
return maybe_pipette.pipetteName if maybe_pipette else None

def get_plunger_position_from_name(
self, mount: Mount, position_name: PlungerPositionTypes
) -> float:
engine_mount = MountType[mount.name]
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
if not maybe_pipette:
return 0.0
return self._engine_client.state.pipettes.lookup_plunger_position_name(
maybe_pipette.id, position_name.value
)

def get_plunger_position_from_volume(
self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str
) -> float:
engine_mount = MountType[mount.name]
maybe_pipette = self._engine_client.state.pipettes.get_by_mount(engine_mount)
if not maybe_pipette:
raise RuntimeError(
f"Cannot load plunger position as no pipette is attached to {mount}"
)
convert_volume = (
self._engine_client.state.pipettes.lookup_volume_to_mm_conversion(
maybe_pipette.id, volume, action.value
)
)
plunger_bottom = (
self._engine_client.state.pipettes.lookup_plunger_position_name(
maybe_pipette.id, "bottom"
)
)
mm = volume / convert_volume
if robot_type == "OT-2 Standard":
position = plunger_bottom + mm
else:
position = plunger_bottom - mm
return round(position, 6)

def move_to(self, mount: Mount, destination: Point, speed: Optional[float]) -> None:
engine_mount = MountType[mount.name]
engine_destination = DeckPoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class LegacyProtocolCore(
LegacyInstrumentCore,
LegacyLabwareCore,
legacy_module_core.LegacyModuleCore,
# None,
]
):
def __init__(
Expand Down
20 changes: 18 additions & 2 deletions api/src/opentrons/protocol_api/core/robot.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
from abc import abstractmethod, ABC
from typing import Optional
from typing import Optional, Union

from opentrons.types import AxisMapType, Mount, Point
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes


class AbstractRobot(ABC):
@abstractmethod
def get_pipette_type_from_engine(self, mount: Mount) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[PipetteNameType]:
...

@abstractmethod
def get_plunger_position_from_volume(
self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str
) -> float:
...

@abstractmethod
def get_plunger_position_from_name(
self, mount: Mount, position_name: PlungerPositionTypes
) -> float:
...

@abstractmethod
Expand Down
42 changes: 36 additions & 6 deletions api/src/opentrons/protocol_api/robot_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .core.common import ProtocolCore, RobotCore
from .module_contexts import ModuleContext
from .labware import Labware
from ._types import PipetteActionTypes, PlungerPositionTypes


class HardwareManager(NamedTuple):
Expand Down Expand Up @@ -200,14 +201,43 @@ def axis_coordinates_for(
raise TypeError("You must specify a location to move to.")

def plunger_coordinates_for_volume(
self, mount: Union[Mount, str], volume: float
) -> None:
raise NotImplementedError()
self, mount: Union[Mount, str], volume: float, action: PipetteActionTypes
) -> AxisMapType:
"""
Build a :py:class:`.types.AxisMapType` for a pipette plunger motor from volume.
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)
if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)

pipette_position = self._core.get_plunger_position_from_volume(
mount, volume, action, self._protocol_core.robot_type
)
return {pipette_axis: pipette_position}

def plunger_coordinates_for_named_position(
self, mount: Union[Mount, str], position_name: str
) -> None:
raise NotImplementedError()
self, mount: Union[Mount, str], position_name: PlungerPositionTypes
) -> AxisMapType:
"""
Build a :py:class:`.types.AxisMapType` for a pipette plunger motor from position_name.
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)
if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)
pipette_position = self._core.get_plunger_position_from_name(
mount, position_name
)
return {pipette_axis: pipette_position}

def build_axis_map(self, axis_map: StringAxisMap) -> AxisMapType:
"""Take in a :py:class:`.types.StringAxisMap` and output a :py:class:`.types.AxisMapType`.
Expand Down
6 changes: 5 additions & 1 deletion api/src/opentrons/protocol_engine/execution/gantry_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
HardwareAxis.Q: MotorAxis.AXIS_96_CHANNEL_CAM,
}


# The height of the bottom of the pipette nozzle at home position without any tips.
# We rely on this being the same for every OT-3 pipette.
#
Expand Down Expand Up @@ -305,7 +306,6 @@ async def move_mount_to(
) -> Point:
"""Move the given hardware mount to a waypoint."""
assert len(waypoints) > 0, "Must have at least one waypoint"
log.info(f"Moving mount {mount}")
for waypoint in waypoints:
log.info(f"The current waypoint moving is {waypoint}")
await self._hardware_api.move_to(
Expand Down Expand Up @@ -340,6 +340,10 @@ async def move_axes(
mount, refresh=True
)
log.info(f"The current position of the robot is: {current_position}.")
converted_current_position_deck = (
self._hardware_api.get_deck_from_machine(current_position)
)
log.info(f"The current position of the robot is: {current_position}.")

pos_hw = target_axis_map_from_relative(pos_hw, current_position)
log.info(
Expand Down
Loading

0 comments on commit eb0d23b

Please sign in to comment.