Skip to content

Commit

Permalink
refactor(api): split backend tip_action into two functions (#13381)
Browse files Browse the repository at this point in the history
  • Loading branch information
caila-marashaj authored and TamarZanzouri committed Sep 13, 2023
1 parent 3d48a00 commit 26469d4
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 96 deletions.
44 changes: 24 additions & 20 deletions api/src/opentrons/hardware_control/backends/ot3controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
create_gripper_jaw_home_group,
create_gripper_jaw_hold_group,
create_tip_action_group,
create_gear_motor_home_group,
create_tip_motor_home_group,
motor_nodes,
LIMIT_SWITCH_OVERTRAVEL_DISTANCE,
map_pipette_type_to_sensor_id,
Expand Down Expand Up @@ -641,12 +641,11 @@ async def home(
positions = await asyncio.gather(*coros)
# TODO(CM): default gear motor homing routine to have some acceleration
if Axis.Q in checked_axes:
await self.tip_action(
await self.home_tip_motors(
distance=self.axis_bounds[Axis.Q][1] - self.axis_bounds[Axis.Q][0],
velocity=self._configuration.motion_settings.max_speed_discontinuity.high_throughput[
Axis.to_kind(Axis.Q)
],
tip_action="home",
)
for position in positions:
self._handle_motor_status_response(position)
Expand All @@ -664,26 +663,31 @@ def _filter_move_group(self, move_group: MoveGroup) -> MoveGroup:
)
return new_group

async def home_tip_motors(
self,
distance: float,
velocity: float,
back_off: bool = True,
) -> None:
move_group = create_tip_motor_home_group(distance, velocity, back_off)

runner = MoveGroupRunner(
move_groups=[move_group],
ignore_stalls=True if not ff.stall_detection_enabled() else False,
)
positions = await runner.run(can_messenger=self._messenger)
if NodeId.pipette_left in positions:
self._gear_motor_position = {
NodeId.pipette_left: positions[NodeId.pipette_left][0]
}
else:
log.debug("no position returned from NodeId.pipette_left")

async def tip_action(
self,
moves: Optional[List[Move[Axis]]] = None,
distance: Optional[float] = None,
velocity: Optional[float] = None,
tip_action: str = "home",
back_off: Optional[bool] = False,
moves: List[Move[Axis]],
) -> None:
# TODO: split this into two functions for homing and 'clamp'
move_group = []
# make sure either moves or distance and velocity is not None
assert bool(moves) ^ (bool(distance) and bool(velocity))
if moves is not None:
move_group = create_tip_action_group(
moves, [NodeId.pipette_left], tip_action
)
elif distance is not None and velocity is not None:
move_group = create_gear_motor_home_group(
float(distance), float(velocity), back_off
)
move_group = create_tip_action_group(moves, [NodeId.pipette_left], "clamp")

runner = MoveGroupRunner(
move_groups=[move_group],
Expand Down
14 changes: 9 additions & 5 deletions api/src/opentrons/hardware_control/backends/ot3simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,15 @@ async def get_tip_present_state(self, mount: OT3Mount) -> int:

async def tip_action(
self,
moves: Optional[List[Move[Axis]]] = None,
distance: Optional[float] = None,
velocity: Optional[float] = None,
tip_action: str = "home",
back_off: Optional[bool] = False,
moves: List[Move[Axis]],
) -> None:
pass

async def home_tip_motors(
self,
distance: float,
velocity: float,
back_off: bool = True,
) -> None:
pass

Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/hardware_control/backends/ot3utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def create_tip_action_group(
return move_group


def create_gear_motor_home_group(
def create_tip_motor_home_group(
distance: float,
velocity: float,
backoff: Optional[bool] = False,
Expand Down
91 changes: 41 additions & 50 deletions api/src/opentrons/hardware_control/ot3api.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,41 @@ async def home_plunger(self, mount: Union[top_types.Mount, OT3Mount]) -> None:
checked_mount, rate=1.0, acquire_lock=False
)

async def home_gear_motors(self) -> None:
homing_velocity = self._config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.Q]

# if position is not known, move toward limit switch at a constant velocity
if not any(self._backend.gear_motor_position):
await self._backend.home_tip_motors(
distance=self._backend.axis_bounds[Axis.Q][1],
velocity=homing_velocity,
)
return

current_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[
Axis.P_L
]

if current_pos_float > self._config.safe_home_distance:
fast_home_moves = self._build_moves(
{Axis.Q: current_pos_float}, {Axis.Q: self._config.safe_home_distance}
)
# move toward home until a safe distance
await self._backend.tip_action(moves=fast_home_moves[0])

# update current position
current_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[
Axis.P_L
]

# move until the limit switch is triggered, with no acceleration
await self._backend.home_tip_motors(
distance=(current_pos_float + self._config.safe_home_distance),
velocity=homing_velocity,
)

@lru_cache(1)
def _carriage_offset(self) -> top_types.Point:
return top_types.Point(*self._config.carriage_offset)
Expand Down Expand Up @@ -1790,17 +1825,10 @@ async def _motor_pick_up_tip(
self._current_position,
)
await self._move(target_down)
homing_velocity = self._config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.Q]
# check if position is known before pick up tip
if not any(self._backend.gear_motor_position):
# home gear motor if position not known
await self._backend.tip_action(
distance=self._backend.axis_bounds[Axis.Q][1],
velocity=homing_velocity,
tip_action="home",
)
await self.home_gear_motors()
pipette_axis = Axis.of_main_tool_actuator(mount)
gear_origin_float = axis_convert(self._backend.gear_motor_position, 0.0)[
pipette_axis
Expand All @@ -1809,23 +1837,9 @@ async def _motor_pick_up_tip(
clamp_moves = self._build_moves(
{Axis.Q: gear_origin_float}, {Axis.Q: clamp_move_target}
)
await self._backend.tip_action(moves=clamp_moves[0], tip_action="clamp")

gear_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[
Axis.P_L
]
await self._backend.tip_action(moves=clamp_moves[0])

fast_home_moves = self._build_moves(
{Axis.Q: gear_pos_float}, {Axis.Q: self._config.safe_home_distance}
)
# move toward home until a safe distance
await self._backend.tip_action(moves=fast_home_moves[0], tip_action="clamp")
# move the rest of the way home with no acceleration
await self._backend.tip_action(
distance=(self._config.safe_home_distance + pipette_spec.home_buffer),
velocity=homing_velocity,
tip_action="home",
)
await self.home_gear_motors()

async def pick_up_tip(
self,
Expand Down Expand Up @@ -1894,9 +1908,6 @@ async def drop_tip(
realmount = OT3Mount.from_mount(mount)
spec, _remove = self._pipette_handler.plan_check_drop_tip(realmount, home_after)

homing_velocity = self._config.motion_settings.max_speed_discontinuity[
GantryLoad.HIGH_THROUGHPUT
][OT3AxisKind.Q]
for move in spec.drop_moves:
await self._backend.set_active_current(move.current)

Expand All @@ -1905,37 +1916,17 @@ async def drop_tip(
# Not sure why
if not any(self._backend.gear_motor_position):
# home gear motor if position not known
await self._backend.tip_action(
distance=self._backend.axis_bounds[Axis.Q][1],
velocity=homing_velocity,
tip_action="home",
)
await self.home_gear_motors()

gear_start_position = axis_convert(
self._backend.gear_motor_position, 0.0
)[Axis.P_L]
drop_moves = self._build_moves(
{Axis.Q: gear_start_position}, {Axis.Q: move.target_position}
)
await self._backend.tip_action(moves=drop_moves[0], tip_action="clamp")
await self._backend.tip_action(moves=drop_moves[0])

gear_pos_float = axis_convert(self._backend.gear_motor_position, 0.0)[
Axis.P_L
]

fast_home_moves = self._build_moves(
{Axis.Q: gear_pos_float}, {Axis.Q: self._config.safe_home_distance}
)
# move toward home until a safe distance
await self._backend.tip_action(
moves=fast_home_moves[0], tip_action="clamp"
)
# move the rest of the way home with no acceleration
await self._backend.tip_action(
distance=(self._config.safe_home_distance + move.home_buffer),
velocity=homing_velocity,
tip_action="home",
)
await self.home_gear_motors()

else:
target_pos = target_position_from_plunger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ async def test_tip_action(
controller: OT3Controller,
mock_move_group_run: mock.AsyncMock,
) -> None:
await controller.tip_action(distance=33, velocity=-5.5, tip_action="home")
await controller.home_tip_motors(distance=33, velocity=-5.5, back_off=False)
for call in mock_move_group_run.call_args_list:
move_group_runner = call[0][0]
for move_group in move_group_runner._move_groups:
Expand All @@ -713,9 +713,7 @@ async def test_tip_action(

mock_move_group_run.reset_mock()

await controller.tip_action(
distance=33, velocity=-5.5, tip_action="home", back_off=True
)
await controller.home_tip_motors(distance=33, velocity=-5.5, back_off=True)
for call in mock_move_group_run.call_args_list:
move_group_runner = call[0][0]
move_groups = move_group_runner._move_groups
Expand Down
33 changes: 21 additions & 12 deletions api/tests/opentrons/hardware_control/test_ot3_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ def mock_ungrip(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]:
yield mock_move


@pytest.fixture
def mock_home_gear_motors(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]:
with patch.object(
ot3_hardware.managed_obj,
"home_gear_motors",
AsyncMock(
spec=ot3_hardware.managed_obj.home_gear_motors,
wraps=ot3_hardware.managed_obj.home_gear_motors,
),
) as mock_home_gear:
yield mock_home_gear


@pytest.fixture
def mock_hold_jaw_width(ot3_hardware: ThreadManager[OT3API]) -> Iterator[AsyncMock]:
with patch.object(
Expand Down Expand Up @@ -1318,6 +1331,7 @@ async def test_pick_up_tip_full_tiprack(
mock_instrument_handlers: Tuple[Mock],
mock_ungrip: AsyncMock,
mock_move_to_plunger_bottom: AsyncMock,
mock_home_gear_motors: AsyncMock,
) -> None:
mock_ungrip.return_value = None
await ot3_hardware.home()
Expand Down Expand Up @@ -1355,8 +1369,6 @@ def _fake_function():
def _update_gear_motor_pos(
moves: Optional[List[Move[Axis]]] = None,
distance: Optional[float] = None,
velocity: Optional[float] = None,
tip_action: str = "home",
) -> None:
if NodeId.pipette_left not in backend._gear_motor_position:
backend._gear_motor_position = {NodeId.pipette_left: 0.0}
Expand All @@ -1376,19 +1388,18 @@ def _update_gear_motor_pos(
OT3Mount.LEFT, 40.0, None, None
)
# first call should be "clamp", moving down
assert tip_action.call_args_list[0][-1]["tip_action"] == "clamp"
assert tip_action.call_args_list[0][-1]["moves"][0].unit_vector == {Axis.Q: 1}
# next call should be "clamp", moving back up
assert tip_action.call_args_list[1][-1]["tip_action"] == "clamp"
assert tip_action.call_args_list[1][-1]["moves"][0].unit_vector == {Axis.Q: -1}
# last call should be "home"
assert tip_action.call_args_list[2][-1]["tip_action"] == "home"
assert len(tip_action.call_args_list) == 3
assert len(tip_action.call_args_list) == 2
# home should be called after tip_action is done
assert len(mock_home_gear_motors.call_args_list) == 1


async def test_drop_tip_full_tiprack(
ot3_hardware: ThreadManager[OT3API],
mock_instrument_handlers: Tuple[Mock],
mock_home_gear_motors: AsyncMock,
) -> None:
_, pipette_handler = mock_instrument_handlers
backend = ot3_hardware.managed_obj._backend
Expand Down Expand Up @@ -1439,14 +1450,12 @@ def _update_gear_motor_pos(
await ot3_hardware.drop_tip(Mount.LEFT, home_after=True)
pipette_handler.plan_check_drop_tip.assert_called_once_with(OT3Mount.LEFT, True)
# first call should be "clamp", moving down
assert tip_action.call_args_list[0][-1]["tip_action"] == "clamp"
assert tip_action.call_args_list[0][-1]["moves"][0].unit_vector == {Axis.Q: 1}
# next call should be "clamp", moving back up
assert tip_action.call_args_list[1][-1]["tip_action"] == "clamp"
assert tip_action.call_args_list[1][-1]["moves"][0].unit_vector == {Axis.Q: -1}
# last call should be "home"
assert tip_action.call_args_list[2][-1]["tip_action"] == "home"
assert len(tip_action.call_args_list) == 3
assert len(tip_action.call_args_list) == 2
# home should be called after tip_action is done
assert len(mock_home_gear_motors.call_args_list) == 1


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,7 @@ async def move_tip_motor_relative_ot3(

tip_motor_move = api._build_moves(current_gear_pos_dict, target_pos_dict)

_move_coro = api._backend.tip_action(
moves=tip_motor_move[0],
tip_action="clamp",
)
_move_coro = api._backend.tip_action(moves=tip_motor_move[0])
if motor_current is None:
await _move_coro
else:
Expand Down

0 comments on commit 26469d4

Please sign in to comment.