Skip to content

Commit

Permalink
feat(api): disable pressure sensor capabilities for PEEK pipettes (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj authored and ryanthecoder committed Nov 22, 2024
1 parent 482fd7d commit 4fcf8ab
Show file tree
Hide file tree
Showing 23 changed files with 501 additions and 8 deletions.
8 changes: 8 additions & 0 deletions api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def restore_system_constraints(self) -> AsyncIterator[None]:
def grab_pressure(self, channels: int, mount: OT3Mount) -> AsyncIterator[None]:
...

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
...

def get_pressure_sensor_available(self, pipette_axis: Axis) -> bool:
...

def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None:
...

Expand Down
26 changes: 24 additions & 2 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
PipetteLiquidNotFoundError,
CommunicationError,
PythonException,
UnsupportedHardwareCommand,
)

from .subsystem_manager import SubsystemManager
Expand Down Expand Up @@ -363,6 +364,7 @@ def __init__(
self._configuration.motion_settings, GantryLoad.LOW_THROUGHPUT
)
)
self._pressure_sensor_available: Dict[NodeId, bool] = {}

@asynccontextmanager
async def restore_system_constraints(self) -> AsyncIterator[None]:
Expand All @@ -381,6 +383,16 @@ async def grab_pressure(
async with grab_pressure(channels, tool, self._messenger):
yield

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
pip_node = axis_to_node(pipette_axis)
self._pressure_sensor_available[pip_node] = available

def get_pressure_sensor_available(self, pipette_axis: Axis) -> bool:
pip_node = axis_to_node(pipette_axis)
return self._pressure_sensor_available[pip_node]

def update_constraints_for_calibration_with_gantry_load(
self,
gantry_load: GantryLoad,
Expand Down Expand Up @@ -692,7 +704,8 @@ async def move(

pipettes_moving = moving_pipettes_in_move_group(move_group)

async with self._monitor_overpressure(pipettes_moving):
checked_moving_pipettes = self._pipettes_to_monitor_pressure(pipettes_moving)
async with self._monitor_overpressure(checked_moving_pipettes):
positions = await runner.run(can_messenger=self._messenger)
self._handle_motor_status_response(positions)

Expand Down Expand Up @@ -799,7 +812,8 @@ async def home(
moving_pipettes = [
axis_to_node(ax) for ax in checked_axes if ax in Axis.pipette_axes()
]
async with self._monitor_overpressure(moving_pipettes):
checked_moving_pipettes = self._pipettes_to_monitor_pressure(moving_pipettes)
async with self._monitor_overpressure(checked_moving_pipettes):
positions = await asyncio.gather(*coros)
# TODO(CM): default gear motor homing routine to have some acceleration
if Axis.Q in checked_axes:
Expand All @@ -813,6 +827,9 @@ async def home(
self._handle_motor_status_response(position)
return axis_convert(self._position, 0.0)

def _pipettes_to_monitor_pressure(self, pipettes: List[NodeId]) -> List[NodeId]:
return [pip for pip in pipettes if self._pressure_sensor_available[pip]]

def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
new_group: MoveGroup = []
for step in move_group:
Expand Down Expand Up @@ -1392,6 +1409,11 @@ async def liquid_probe(
) -> float:
head_node = axis_to_node(Axis.by_mount(mount))
tool = sensor_node_for_pipette(OT3Mount(mount.value))
if tool not in self._pipettes_to_monitor_pressure([tool]):
raise UnsupportedHardwareCommand(
"Liquid Presence Detection not available on this pipette."
)

positions = await liquid_probe(
messenger=self._messenger,
tool=tool,
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/hardware_control/dev_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PipetteConfigurations,
SupportedTipsDefinition,
PipetteBoundingBoxOffsetDefinition,
AvailableSensorDefinition,
)
from opentrons_shared_data.gripper import (
GripperModel,
Expand Down Expand Up @@ -100,6 +101,7 @@ class PipetteDict(InstrumentDict):
pipette_bounding_box_offsets: PipetteBoundingBoxOffsetDefinition
current_nozzle_map: NozzleMap
lld_settings: Optional[Dict[str, Dict[str, float]]]
available_sensors: AvailableSensorDefinition


class PipetteStateDict(TypedDict):
Expand Down
23 changes: 23 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from opentrons_shared_data.pipette import (
pipette_load_name_conversions as pipette_load_name,
pipette_definition,
)
from opentrons_shared_data.robot.types import RobotType

Expand Down Expand Up @@ -633,8 +634,13 @@ async def cache_pipette(
self._feature_flags.use_old_aspiration_functions,
)
self._pipette_handler.hardware_instruments[mount] = p

if self._pipette_handler.has_pipette(mount):
self._confirm_pipette_motion_constraints(mount)

if config is not None:
self._set_pressure_sensor_available(mount, instrument_config=config)

# TODO (lc 12-5-2022) Properly support backwards compatibility
# when applicable
return skipped
Expand All @@ -648,6 +654,23 @@ def _confirm_pipette_motion_constraints(
mount, self.gantry_load
)

def get_pressure_sensor_available(self, mount: OT3Mount) -> bool:
pip_axis = Axis.of_main_tool_actuator(mount)
return self._backend.get_pressure_sensor_available(pip_axis)

def _set_pressure_sensor_available(
self,
mount: OT3Mount,
instrument_config: pipette_definition.PipetteConfigurations,
) -> None:
pressure_sensor_available = (
"pressure" in instrument_config.available_sensors.sensors
)
pip_axis = Axis.of_main_tool_actuator(mount)
self._backend.set_pressure_sensor_available(
pipette_axis=pip_axis, available=pressure_sensor_available
)

async def cache_gripper(self, instrument_data: AttachedGripper) -> bool:
"""Set up gripper based on scanned information."""
grip_cal = load_gripper_calibration_offset(instrument_data.get("id"))
Expand Down
15 changes: 15 additions & 0 deletions api/src/opentrons/protocol_api/core/engine/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
from opentrons.protocol_engine.clients import SyncClient as EngineClient
from opentrons.protocols.api_support.definitions import MAX_SUPPORTED_VERSION
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons_shared_data.errors.exceptions import (
UnsupportedHardwareCommand,
)
from opentrons.protocol_api._nozzle_layout import NozzleLayout
from opentrons.hardware_control.nozzle_manager import NozzleConfigurationType
from opentrons.hardware_control.nozzle_manager import NozzleMap
Expand Down Expand Up @@ -86,6 +89,13 @@ def __init__(
self._liquid_presence_detection = bool(
self._engine_client.state.pipettes.get_liquid_presence_detection(pipette_id)
)
if (
self._liquid_presence_detection
and not self._pressure_supported_by_pipette()
):
raise UnsupportedHardwareCommand(
"Pressure sensor not available for this pipette"
)

@property
def pipette_id(self) -> str:
Expand Down Expand Up @@ -847,6 +857,11 @@ def retract(self) -> None:
z_axis = self._engine_client.state.pipettes.get_z_axis(self._pipette_id)
self._engine_client.execute_command(cmd.HomeParams(axes=[z_axis]))

def _pressure_supported_by_pipette(self) -> bool:
return self._engine_client.state.pipettes.get_pipette_supports_pressure(
self.pipette_id
)

def detect_liquid_presence(self, well_core: WellCore, loc: Location) -> bool:
labware_id = well_core.labware_id
well_name = well_core.get_name()
Expand Down
4 changes: 4 additions & 0 deletions api/src/opentrons/protocol_api/core/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ def get_blow_out_flow_rate(self, rate: float = 1.0) -> float:
def get_liquid_presence_detection(self) -> bool:
...

@abstractmethod
def _pressure_supported_by_pipette(self) -> bool:
...

@abstractmethod
def set_liquid_presence_detection(self, enable: bool) -> None:
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,3 +583,7 @@ def liquid_probe_without_recovery(
) -> float:
"""This will never be called because it was added in API 2.20."""
assert False, "liquid_probe_without_recovery only supported in API 2.20 & later"

def _pressure_supported_by_pipette(self) -> bool:
return False

Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,6 @@ def liquid_probe_without_recovery(
) -> float:
"""This will never be called because it was added in API 2.20."""
assert False, "liquid_probe_without_recovery only supported in API 2.20 & later"

def _pressure_supported_by_pipette(self) -> bool:
return False
14 changes: 13 additions & 1 deletion api/src/opentrons/protocol_api/instrument_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CommandPreconditionViolated,
CommandParameterLimitViolated,
UnexpectedTipRemovalError,
UnsupportedHardwareCommand,
)
from opentrons.legacy_broker import LegacyBroker
from opentrons.hardware_control.dev_types import PipetteDict
Expand Down Expand Up @@ -260,6 +261,7 @@ def aspirate(
and self._96_tip_config_valid()
and self._core.get_current_volume() == 0
):
self._raise_if_pressure_not_supported_by_pipette()
self.require_liquid_presence(well=well)

with publisher.publish_context(
Expand Down Expand Up @@ -1694,6 +1696,8 @@ def liquid_presence_detection(self) -> bool:
@liquid_presence_detection.setter
@requires_version(2, 20)
def liquid_presence_detection(self, enable: bool) -> None:
if enable:
self._raise_if_pressure_not_supported_by_pipette()
self._core.set_liquid_presence_detection(enable)

@property
Expand Down Expand Up @@ -2143,6 +2147,7 @@ def detect_liquid_presence(self, well: labware.Well) -> bool:
.. note::
The pressure sensors for the Flex 8-channel pipette are on channels 1 and 8 (positions A1 and H1). For the Flex 96-channel pipette, the pressure sensors are on channels 1 and 96 (positions A1 and H12). Other channels on multi-channel pipettes do not have sensors and cannot detect liquid.
"""
self._raise_if_pressure_not_supported_by_pipette()
loc = well.top()
self._96_tip_config_valid()
return self._core.detect_liquid_presence(well._core, loc)
Expand All @@ -2156,6 +2161,7 @@ def require_liquid_presence(self, well: labware.Well) -> None:
.. note::
The pressure sensors for the Flex 8-channel pipette are on channels 1 and 8 (positions A1 and H1). For the Flex 96-channel pipette, the pressure sensors are on channels 1 and 96 (positions A1 and H12). Other channels on multi-channel pipettes do not have sensors and cannot detect liquid.
"""
self._raise_if_pressure_not_supported_by_pipette()
loc = well.top()
self._96_tip_config_valid()
self._core.liquid_probe_with_recovery(well._core, loc)
Expand All @@ -2170,7 +2176,7 @@ def measure_liquid_height(self, well: labware.Well) -> float:
This is intended for Opentrons internal use only and is not a guaranteed API.
"""

self._raise_if_pressure_not_supported_by_pipette()
loc = well.top()
self._96_tip_config_valid()
height = self._core.liquid_probe_without_recovery(well._core, loc)
Expand All @@ -2192,6 +2198,12 @@ def _raise_if_configuration_not_supported_by_pipette(
)
# SINGLE, QUADRANT and ALL are supported by all pipettes

def _raise_if_pressure_not_supported_by_pipette(self) -> None:
if not self._core._pressure_supported_by_pipette():
raise UnsupportedHardwareCommand(
"Pressure sensor not available for this pipette"
)

def _handle_aspirate_target(
self, target: validation.ValidTarget
) -> tuple[types.Location, Optional[labware.Well], Optional[bool]]:
Expand Down
8 changes: 8 additions & 0 deletions api/src/opentrons/protocol_engine/commands/liquid_probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from opentrons.types import MountType
from opentrons_shared_data.errors.exceptions import (
PipetteLiquidNotFoundError,
UnsupportedHardwareCommand,
)

from ..types import DeckPoint
Expand Down Expand Up @@ -113,6 +114,13 @@ async def _execute_common(
well_name = params.wellName

state_update = update_types.StateUpdate()
if (
"pressure"
not in state_view.pipettes.get_config(pipette_id).available_sensors.sensors
):
raise UnsupportedHardwareCommand(
"Pressure sensor not available for this pipette"
)

# May raise TipNotAttachedError.
aspirated_volume = state_view.pipettes.get_aspirated_volume(pipette_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class LoadedStaticPipetteData:
back_left_corner_offset: Point
front_right_corner_offset: Point
pipette_lld_settings: Optional[Dict[str, Dict[str, float]]]
available_sensors: pipette_definition.AvailableSensorDefinition


class VirtualPipetteDataProvider:
Expand Down Expand Up @@ -280,6 +281,8 @@ def _get_virtual_pipette_static_config_by_model( # noqa: C901
pip_front_right[0], pip_front_right[1], pip_front_right[2]
),
pipette_lld_settings=config.lld_settings,
available_sensors=config.available_sensors
or pipette_definition.AvailableSensorDefinition(sensors=[]),
)

def get_virtual_pipette_static_config(
Expand All @@ -298,6 +301,11 @@ def get_pipette_static_config(
"""Get the config for a pipette, given the state/config object from the HW API."""
back_left_offset = pipette_dict["pipette_bounding_box_offsets"].back_left_corner
front_right_offset = pipette_dict["pipette_bounding_box_offsets"].front_right_corner
available_sensors = (
pipette_dict["available_sensors"]
if "available_sensors" in pipette_dict.keys()
else pipette_definition.AvailableSensorDefinition(sensors=[])
)
return LoadedStaticPipetteData(
model=pipette_dict["model"],
display_name=pipette_dict["display_name"],
Expand Down Expand Up @@ -327,6 +335,7 @@ def get_pipette_static_config(
front_right_offset[0], front_right_offset[1], front_right_offset[2]
),
pipette_lld_settings=pipette_dict["lld_settings"],
available_sensors=available_sensors,
)


Expand Down
9 changes: 9 additions & 0 deletions api/src/opentrons/protocol_engine/state/pipettes.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class StaticPipetteConfig:
bounding_nozzle_offsets: BoundingNozzlesOffsets
default_nozzle_map: NozzleMap # todo(mm, 2024-10-14): unused, remove?
lld_settings: Optional[Dict[str, Dict[str, float]]]
available_sensors: pipette_definition.AvailableSensorDefinition


@dataclasses.dataclass
Expand Down Expand Up @@ -292,6 +293,7 @@ def _update_pipette_config(self, state_update: update_types.StateUpdate) -> None
),
default_nozzle_map=config.nozzle_map,
lld_settings=config.pipette_lld_settings,
available_sensors=config.available_sensors,
)
self._state.flow_rates_by_id[
state_update.pipette_config.pipette_id
Expand Down Expand Up @@ -723,6 +725,13 @@ def get_pipette_bounds_at_specified_move_to_position(
pip_front_left_bound,
)

def get_pipette_supports_pressure(self, pipette_id: str) -> bool:
"""Return if this pipette supports a pressure sensor."""
return (
"pressure"
in self._state.static_config_by_id[pipette_id].available_sensors.sensors
)

def get_liquid_presence_detection(self, pipette_id: str) -> bool:
"""Determine if liquid presence detection is enabled for this pipette."""
try:
Expand Down
Loading

0 comments on commit 4fcf8ab

Please sign in to comment.