Skip to content

Commit

Permalink
disable overpressure monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj committed Nov 20, 2024
1 parent ac051f7 commit 60c0879
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
5 changes: 5 additions & 0 deletions api/src/opentrons/hardware_control/backends/flex_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def restore_system_constraints(self) -> AsyncIterator[None]:
def grab_pressure(self, channels: int, mount: OT3Mount) -> AsyncIterator[None]:
...

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
...

def update_constraints_for_gantry_load(self, gantry_load: GantryLoad) -> None:
...

Expand Down
13 changes: 12 additions & 1 deletion api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ def __init__(
self._configuration.motion_settings, GantryLoad.LOW_THROUGHPUT
)
)
self._pressure_sensor_available: Dict[NodeId, bool] = {}

@asynccontextmanager
async def restore_system_constraints(self) -> AsyncIterator[None]:
Expand All @@ -380,6 +381,12 @@ async def grab_pressure(
async with grab_pressure(channels, tool, self._messenger):
yield

def set_pressure_sensor_available(
self, pipette_axis: Axis, available: bool
) -> None:
pip_node = axis_to_node(pipette_axis)
self._pressure_sensor_available[pip_node] = available

def update_constraints_for_calibration_with_gantry_load(
self,
gantry_load: GantryLoad,
Expand Down Expand Up @@ -871,7 +878,8 @@ async def home(
moving_pipettes = [
axis_to_node(ax) for ax in checked_axes if ax in Axis.pipette_axes()
]
async with self._monitor_overpressure(moving_pipettes):
checked_moving_pipettes = self._pipettes_to_monitor_pressure(moving_pipettes)
async with self._monitor_overpressure(checked_moving_pipettes):
positions = await asyncio.gather(*coros)
# TODO(CM): default gear motor homing routine to have some acceleration
if Axis.Q in checked_axes:
Expand All @@ -886,6 +894,9 @@ async def home(
self._handle_motor_status_response(position)
return axis_convert(self._position, 0.0)

def _pipettes_to_monitor_pressure(self, pipettes: List[NodeId]) -> List[NodeId]:
return [pip for pip in pipettes if self._pressure_sensor_available[pip]]

def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
new_group: MoveGroup = []
for step in move_group:
Expand Down
16 changes: 16 additions & 0 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from opentrons_shared_data.pipette import (
pipette_load_name_conversions as pipette_load_name,
pipette_definition
)
from opentrons_shared_data.robot.types import RobotType

Expand Down Expand Up @@ -634,10 +635,25 @@ async def cache_pipette(
self._feature_flags.use_old_aspiration_functions,
)
self._pipette_handler.hardware_instruments[mount] = p
if config is not None:
self._set_pressure_sensor_available(mount, instrument_config=config)
# TODO (lc 12-5-2022) Properly support backwards compatibility
# when applicable
return skipped

def _set_pressure_sensor_available(
self,
mount: OT3Mount,
instrument_config: pipette_definition.PipetteConfigurations,
) -> None:
pressure_sensor_available = (
"pressure" in instrument_config.available_sensors.sensors
)
pip_axis = Axis.of_main_tool_actuator(mount)
self._backend.set_pressure_sensor_available(
pipette_axis=pip_axis, available=pressure_sensor_available
)

async def cache_gripper(self, instrument_data: AttachedGripper) -> bool:
"""Set up gripper based on scanned information."""
grip_cal = load_gripper_calibration_offset(instrument_data.get("id"))
Expand Down

0 comments on commit 60c0879

Please sign in to comment.