diff --git a/api/src/opentrons/hardware_control/backends/flex_protocol.py b/api/src/opentrons/hardware_control/backends/flex_protocol.py index c5294938fa0..03ceb26f2b1 100644 --- a/api/src/opentrons/hardware_control/backends/flex_protocol.py +++ b/api/src/opentrons/hardware_control/backends/flex_protocol.py @@ -60,6 +60,11 @@ def restore_system_constraints(self) -> AsyncIterator[None]: def grab_pressure(self, channels: int, mount: OT3Mount) -> AsyncIterator[None]: ... + def set_pressure_sensor_available( + self, pipette_axis: Axis, available: bool + ) -> None: + ... + def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None: ... diff --git a/api/src/opentrons/hardware_control/backends/ot3controller.py b/api/src/opentrons/hardware_control/backends/ot3controller.py index 1251fcc4adb..8cbef2461a7 100644 --- a/api/src/opentrons/hardware_control/backends/ot3controller.py +++ b/api/src/opentrons/hardware_control/backends/ot3controller.py @@ -362,6 +362,7 @@ def __init__( self._configuration.motion_settings, GantryLoad.LOW_THROUGHPUT ) ) + self._pressure_sensor_available: Dict[NodeId, bool] = {} @asynccontextmanager async def restore_system_constraints(self) -> AsyncIterator[None]: @@ -380,6 +381,12 @@ async def grab_pressure( async with grab_pressure(channels, tool, self._messenger): yield + def set_pressure_sensor_available( + self, pipette_axis: Axis, available: bool + ) -> None: + pip_node = axis_to_node(pipette_axis) + self._pressure_sensor_available[pip_node] = available + def update_constraints_for_calibration_with_gantry_load( self, gantry_load: GantryLoad, @@ -871,7 +878,8 @@ async def home( moving_pipettes = [ axis_to_node(ax) for ax in checked_axes if ax in Axis.pipette_axes() ] - async with self._monitor_overpressure(moving_pipettes): + checked_moving_pipettes = self._pipettes_to_monitor_pressure(moving_pipettes) + async with self._monitor_overpressure(checked_moving_pipettes): positions = await asyncio.gather(*coros) # TODO(CM): default gear motor homing routine to have some acceleration if Axis.Q in checked_axes: @@ -886,6 +894,9 @@ async def home( self._handle_motor_status_response(position) return axis_convert(self._position, 0.0) + def _pipettes_to_monitor_pressure(self, pipettes: List[NodeId]) -> List[NodeId]: + return [pip for pip in pipettes if self._pressure_sensor_available[pip]] + def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup: new_group: MoveGroup = [] for step in move_group: diff --git a/api/src/opentrons/hardware_control/ot3api.py b/api/src/opentrons/hardware_control/ot3api.py index bd828cd525f..85080f3ef66 100644 --- a/api/src/opentrons/hardware_control/ot3api.py +++ b/api/src/opentrons/hardware_control/ot3api.py @@ -32,6 +32,7 @@ ) from opentrons_shared_data.pipette import ( pipette_load_name_conversions as pipette_load_name, + pipette_definition ) from opentrons_shared_data.robot.types import RobotType @@ -634,10 +635,25 @@ async def cache_pipette( self._feature_flags.use_old_aspiration_functions, ) self._pipette_handler.hardware_instruments[mount] = p + if config is not None: + self._set_pressure_sensor_available(mount, instrument_config=config) # TODO (lc 12-5-2022) Properly support backwards compatibility # when applicable return skipped + def _set_pressure_sensor_available( + self, + mount: OT3Mount, + instrument_config: pipette_definition.PipetteConfigurations, + ) -> None: + pressure_sensor_available = ( + "pressure" in instrument_config.available_sensors.sensors + ) + pip_axis = Axis.of_main_tool_actuator(mount) + self._backend.set_pressure_sensor_available( + pipette_axis=pip_axis, available=pressure_sensor_available + ) + async def cache_gripper(self, instrument_data: AttachedGripper) -> bool: """Set up gripper based on scanned information.""" grip_cal = load_gripper_calibration_offset(instrument_data.get("id"))