Skip to content

Commit

Permalink
refactor(api): Remove concurrency from LegacyContextPlugin (#16098)
Browse files Browse the repository at this point in the history
## Overview

This removes a potential source of deadlocks and race conditions.

I originally thought this would help with EXEC-417. I'm not sure it
actually will at this point, but maybe we should still do it.

## Background

When the robot executes a Python protocol, the `ProtocolEngine` and
higher-level HTTP stuff live in the main thread, and the user's Python
script runs in a worker thread. For older (`apiLevel`≤2.13) protocols,
`LegacyContextPlugin` transports progress reports from the worker thread
to the main thread.

When we initially built all of this, we had [severe performance
problems](#9133) where the
main thread would get bogged down serializing a big HTTP response or
something, and then the backpressure through `LegacyContextPlugin` would
stall the protocol thread, causing visible jankiness in robot motion. We
did a bunch of stuff to try to fix this. One fix was to [insert an
infinite non-blocking
queue](#9238) between the two
threads to remove the backpressure.

I strongly suspect that that fix is unnecessary today. As evidence: It
only ever applied to older, `apiLevel`≤2.13, protocols. Newer,
`apiLevel`≥2.14 ones, have always run through a different pipeline,
which still has backpressure. And the newer pipeline is apparently not
perceived to have janky motion.

Removing the fix would remove concurrency, which would be a meaningful
simplification. For instance, I've seen hints that [this
part](https://github.com/Opentrons/opentrons/blob/45e14ca26359720740db744124b464bdcc84264c/api/src/opentrons/protocol_engine/execution/hardware_stopper.py#L70)
of run cleanup is racy, because it depends on whether all the
`pickUpTip`/`dropTip` commands got consumed from the queue by the time
we reach there.

## Test Plan and Hands on Testing

* [x] On an OT-2, run some protocols that are movement-intensive (e.g.
lots of `touch_tip()`s) and have `apiLevel` ≤ 2.13. Click around in the
app and induce some network requests. There might be some jankiness, but
make sure it's not severe. Really, I think we just need to make sure
it's not any worse than `apiLevel` ≥ 2.14.
* I'm noticing jank lasting ~0-1 seconds when you: navigate to the
device details page (understandable); navigate through the device
settings (weird); and navigate to the device list page (concerning). All
of this applies equally to `apiLevel≥2.14`, so if we're concerned about
this, retaining this non-blocking queue in `LegacyContextPlugin` is not
the correct solution.

## Changelog

* Delete the non-blocking queue between the Python protocol thread .
Replace it with simple inter-thread function calls.

## Review requests

Is this a good idea?

## Risk assessment

Medium. There is a risk that this *is* covering up enduring jankiness
problems, and removing it will bring the problems back. This may not be
obvious to us in the office because we're more focused on Flexes, which
don't run `apiLevel`≤2.13 protocols.

But there's also risk to having the concurrency, so... 🤷
  • Loading branch information
SyntaxColoring authored Sep 18, 2024
1 parent 534fb53 commit 6451bdb
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 447 deletions.
98 changes: 27 additions & 71 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols."""
from __future__ import annotations

from asyncio import create_task, Task
import asyncio
from contextlib import ExitStack
from typing import List, Optional
from typing import Optional

from opentrons.legacy_commands.types import CommandMessage as LegacyCommand
from opentrons.legacy_broker import LegacyBroker
Expand All @@ -12,7 +12,6 @@
from opentrons.util.broker import ReadOnlyBroker

from .legacy_command_mapper import LegacyCommandMapper
from .thread_async_queue import ThreadAsyncQueue


class LegacyContextPlugin(AbstractPlugin):
Expand All @@ -21,59 +20,36 @@ class LegacyContextPlugin(AbstractPlugin):
In the legacy ProtocolContext, protocol execution is accomplished
by direct communication with the HardwareControlAPI, as opposed to an
intermediate layer like the ProtocolEngine. This plugin wraps up
and hides this behavior, so the ProtocolEngine can monitor and control
and hides this behavior, so the ProtocolEngine can monitor
the run of a legacy protocol without affecting the execution of
the protocol commands themselves.
This plugin allows a ProtocolEngine to:
1. Play/pause the protocol run using the HardwareControlAPI, as was done before
the ProtocolEngine existed.
2. Subscribe to what is being done with the legacy ProtocolContext,
and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
This plugin allows a ProtocolEngine to subscribe to what is being done with the
legacy ProtocolContext, and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
"""

def __init__(
self,
engine_loop: asyncio.AbstractEventLoop,
broker: LegacyBroker,
equipment_broker: ReadOnlyBroker[LoadInfo],
legacy_command_mapper: Optional[LegacyCommandMapper] = None,
) -> None:
"""Initialize the plugin with its dependencies."""
self._engine_loop = engine_loop

self._broker = broker
self._equipment_broker = equipment_broker
self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper()

# We use a non-blocking queue to communicate activity
# from the APIv2 protocol, which is running in its own thread,
# to the ProtocolEngine, which is running in the main thread's async event loop.
#
# The queue being non-blocking lets the protocol communicate its activity
# instantly *even if the event loop is currently occupied by something else.*
# Various things can accidentally occupy the event loop for too long.
# So if the protocol had to wait for the event loop to be free
# every time it reported some activity,
# it could visibly stall for a moment, making its motion jittery.
#
# TODO(mm, 2024-03-22): See if we can remove this non-blockingness now.
# It was one of several band-aids introduced in ~v5.0.0 to mitigate performance
# problems. v6.3.0 started running some Python protocols directly through
# Protocol Engine, without this plugin, and without any non-blocking queue.
# If performance is sufficient for those, that probably means the
# performance problems have been resolved in better ways elsewhere
# and we don't need this anymore.
self._actions_to_dispatch = ThreadAsyncQueue[List[pe_actions.Action]]()
self._action_dispatching_task: Optional[Task[None]] = None

self._subscription_exit_stack: Optional[ExitStack] = None

def setup(self) -> None:
"""Set up the plugin.
* Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
* Kick off a background task to inform Protocol Engine of that activity.
Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
"""
# Subscribe to activity on the APIv2 context,
# and arrange to unsubscribe when this plugin is torn down.
Expand All @@ -97,24 +73,16 @@ def setup(self) -> None:
# to clean up these subscriptions.
self._subscription_exit_stack = exit_stack.pop_all()

# Kick off a background task to report activity to the ProtocolEngine.
self._action_dispatching_task = create_task(self._dispatch_all_actions())

# todo(mm, 2024-08-21): This no longer needs to be async.
async def teardown(self) -> None:
"""Tear down the plugin, undoing the work done in `setup()`.
Called by Protocol Engine.
At this point, the APIv2 protocol script must have exited.
"""
self._actions_to_dispatch.done_putting()
try:
if self._action_dispatching_task is not None:
await self._action_dispatching_task
self._action_dispatching_task = None
finally:
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
Expand All @@ -127,34 +95,22 @@ def _handle_legacy_command(self, command: LegacyCommand) -> None:
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_command(command=command)
self._actions_to_dispatch.put(pe_actions)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()

def _handle_equipment_loaded(self, load_info: LoadInfo) -> None:
"""Handle an equipment load reported by the legacy APIv2 protocol.
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_equipment_load(load_info=load_info)
self._actions_to_dispatch.put(pe_actions)

async def _dispatch_all_actions(self) -> None:
"""Dispatch all actions to the `ProtocolEngine`.
Exits only when `self._actions_to_dispatch` is closed
(or an unexpected exception is raised).
"""
async for action_batch in self._actions_to_dispatch.get_async_until_closed():
# It's critical that we dispatch this batch of actions as one atomic
# sequence, without yielding to the event loop.
# Although this plugin only means to use the ProtocolEngine as a way of
# passively exposing the protocol's progress, the ProtocolEngine is still
# theoretically active, which means it's constantly watching in the
# background to execute any commands that it finds `queued`.
#
# For example, one of these action batches will often want to
# instantaneously create a running command by having a queue action
# immediately followed by a run action. We cannot let the
# ProtocolEngine's background task see the command in the `queued` state,
# or it will try to execute it, which the legacy protocol is already doing.
for action in action_batch:
self.dispatch(action)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()

async def _dispatch_action_list(self, actions: list[pe_actions.Action]) -> None:
for action in actions:
self.dispatch(action)
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Protocol run control and management."""
import asyncio
from typing import List, NamedTuple, Optional, Union

from abc import ABC, abstractmethod
Expand Down Expand Up @@ -220,7 +221,9 @@ async def load(
equipment_broker = Broker[LoadInfo]()
self._protocol_engine.add_plugin(
LegacyContextPlugin(
broker=self._broker, equipment_broker=equipment_broker
engine_loop=asyncio.get_running_loop(),
broker=self._broker,
equipment_broker=equipment_broker,
)
)
self._hardware_api.should_taskify_movement_execution(taskify=True)
Expand Down
174 changes: 0 additions & 174 deletions api/src/opentrons/protocol_runner/thread_async_queue.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for the PythonAndLegacyRunner's LegacyContextPlugin."""
import asyncio
import pytest
from anyio import to_thread
from decoy import Decoy, matchers
Expand Down Expand Up @@ -60,7 +61,7 @@ def mock_action_dispatcher(decoy: Decoy) -> pe_actions.ActionDispatcher:


@pytest.fixture
def subject(
async def subject(
mock_legacy_broker: LegacyBroker,
mock_equipment_broker: ReadOnlyBroker[LoadInfo],
mock_legacy_command_mapper: LegacyCommandMapper,
Expand All @@ -69,6 +70,7 @@ def subject(
) -> LegacyContextPlugin:
"""Get a configured LegacyContextPlugin with its dependencies mocked out."""
plugin = LegacyContextPlugin(
engine_loop=asyncio.get_running_loop(),
broker=mock_legacy_broker,
equipment_broker=mock_equipment_broker,
legacy_command_mapper=mock_legacy_command_mapper,
Expand Down
Loading

0 comments on commit 6451bdb

Please sign in to comment.