Skip to content

Commit

Permalink
refactor(robot-sever,api): make ChangeNotifier thread safe (#15089)
Browse files Browse the repository at this point in the history
Closes EXEC-404
  • Loading branch information
mjhuff authored May 3, 2024
1 parent 72f3200 commit 3b2e900
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 138 deletions.
19 changes: 0 additions & 19 deletions api/src/opentrons/protocol_engine/state/change_notifier.py

This file was deleted.

2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_engine/state/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from opentrons_shared_data.deck.dev_types import DeckDefinitionV5

from opentrons.protocol_engine.types import ModuleOffsetData
from opentrons.util.change_notifier import ChangeNotifier

from ..resources import DeckFixedLabware
from ..actions import Action, ActionHandler
from .abstract_store import HasState, HandlesActions
from .change_notifier import ChangeNotifier
from .commands import CommandState, CommandStore, CommandView
from .addressable_areas import (
AddressableAreaState,
Expand Down
47 changes: 47 additions & 0 deletions api/src/opentrons/util/change_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Simple state change notification interface."""
import asyncio


class ChangeNotifier:
"""An interface to emit or subscribe to state change notifications."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier with an internal Event."""
self._event = asyncio.Event()

def notify(self) -> None:
"""Notify all `wait`'ers that the state has changed."""
self._event.set()

async def wait(self) -> None:
"""Wait until the next state change notification."""
self._event.clear()
await self._event.wait()


class ChangeNotifier_ts(ChangeNotifier):
"""ChangeNotifier initialized with Event_ts."""

def __init__(self) -> None:
"""Initialize the ChangeNotifier_Ts with an internal Event_ts."""
super().__init__()
self._event = Event_ts()


class Event_ts(asyncio.Event):
"""asyncio.Event with threadsafe methods."""

def __init__(self) -> None:
"""Initialize Event_ts with the active event_loop or event_loop_policy if not active."""
super().__init__()
if self._loop is None:
self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()

def set(self) -> None:
"""Primarily intended for calling from a thread not responsible for the event loop.
Calling set() from the event loop thread will actually delay the execution of the set() until the
calling method either yields, awaits, or exits altogether. This is usually fine but might occasionally cause
unexpected behavior.
"""
self._loop.call_soon_threadsafe(super().set)
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from decoy import Decoy

from opentrons_shared_data.deck.dev_types import DeckDefinitionV5
from opentrons.util.change_notifier import ChangeNotifier

from opentrons.protocol_engine.actions import PlayAction
from opentrons.protocol_engine.state import State, StateStore, Config
from opentrons.protocol_engine.state.change_notifier import ChangeNotifier
from opentrons.protocol_engine.types import DeckType


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for the ChangeNotifier interface."""
import asyncio
import pytest
from opentrons.protocol_engine.state.change_notifier import ChangeNotifier
from opentrons.util.change_notifier import ChangeNotifier


async def test_single_subscriber() -> None:
Expand All @@ -24,12 +24,12 @@ async def test_multiple_subscribers(_test_repetition: int) -> None:
"""Test that multiple subscribers can wait for a notification.
This test checks that the subscribers are awoken in the order they
subscribed. This may or may not be guarenteed according to the
subscribed. This may or may not be guaranteed according to the
implementations of both ChangeNotifier and the event loop.
This test functions as a canary, given that our code may relies
This test functions as a canary, given that our code may rely
on this ordering for determinism.
This test runs multiple times to check for flakyness.
This test runs multiple times to check for flakiness.
"""
subject = ChangeNotifier()
results = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
get_deck_configuration_store,
)
from robot_server.deck_configuration.store import DeckConfigurationStore
from robot_server.service.notifications import get_notify_publishers
from robot_server.service.notifications import get_pe_notify_publishers

log = logging.getLogger(__name__)
base_router = APIRouter()
Expand Down Expand Up @@ -156,7 +156,7 @@ async def create_run(
deck_configuration_store: DeckConfigurationStore = Depends(
get_deck_configuration_store
),
notify_publishers: Callable[[], None] = Depends(get_notify_publishers),
notify_publishers: Callable[[], None] = Depends(get_pe_notify_publishers),
) -> PydanticResponse[SimpleBody[MaintenanceRun]]:
"""Create a new maintenance run.
Expand Down
4 changes: 2 additions & 2 deletions robot-server/robot_server/runs/router/base_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
get_deck_configuration_store,
)
from robot_server.deck_configuration.store import DeckConfigurationStore
from robot_server.service.notifications import get_notify_publishers
from robot_server.service.notifications import get_pe_notify_publishers

log = logging.getLogger(__name__)
base_router = APIRouter()
Expand Down Expand Up @@ -144,7 +144,7 @@ async def create_run(
deck_configuration_store: DeckConfigurationStore = Depends(
get_deck_configuration_store
),
notify_publishers: Callable[[], None] = Depends(get_notify_publishers),
notify_publishers: Callable[[], None] = Depends(get_pe_notify_publishers),
) -> PydanticResponse[SimpleBody[Union[Run, BadRun]]]:
"""Create a new run.
Expand Down
6 changes: 2 additions & 4 deletions robot-server/robot_server/service/notifications/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
get_notification_client,
clean_up_notification_client,
)
from .publisher_notifier import PublisherNotifier, get_notify_publishers
from .publisher_notifier import PublisherNotifier, get_pe_notify_publishers
from .publishers import (
MaintenanceRunsPublisher,
RunsPublisher,
Expand All @@ -15,7 +15,6 @@
get_runs_publisher,
get_deck_configuration_publisher,
)
from .change_notifier import ChangeNotifier
from .topics import Topics

__all__ = [
Expand All @@ -30,12 +29,11 @@
"clean_up_notification_client",
# for use by FastAPI
"get_notification_client",
"get_notify_publishers",
"get_pe_notify_publishers",
"get_maintenance_runs_publisher",
"get_runs_publisher",
"get_deck_configuration_publisher",
# for testing
"PublisherNotifier",
"ChangeNotifier",
"Topics",
]
23 changes: 0 additions & 23 deletions robot-server/robot_server/service/notifications/change_notifier.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from server_utils.fastapi_utils.app_state import AppState

from .notification_client import initialize_notification_client
from .publisher_notifier import initialize_publisher_notifier
from .publisher_notifier import initialize_pe_publisher_notifier


async def initialize_notifications(app_state: AppState) -> None:
"""Initialize the notification system for the given app state."""
initialize_notification_client(app_state)
await initialize_publisher_notifier(app_state)
await initialize_pe_publisher_notifier(app_state)
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
"""Provides an interface for alerting notification publishers to events and related lifecycle utilities."""
import asyncio
from fastapi import Depends
from typing import Optional, Callable, List, Awaitable
from typing import Optional, Callable, List, Awaitable, Union

from server_utils.fastapi_utils.app_state import (
AppState,
AppStateAccessor,
get_app_state,
)

from .change_notifier import ChangeNotifier
from opentrons.util.change_notifier import ChangeNotifier, ChangeNotifier_ts


class PublisherNotifier:
"""An interface that invokes notification callbacks whenever a generic notify event occurs."""

def __init__(
self,
change_notifier: Optional[ChangeNotifier] = None,
):
self._change_notifier = change_notifier or ChangeNotifier()
self._pe_notifier: Optional[asyncio.Task[None]] = None
def __init__(self, change_notifier: Union[ChangeNotifier, ChangeNotifier_ts]):
self._change_notifier = change_notifier
self._notifier: Optional[asyncio.Task[None]] = None
self._callbacks: List[Callable[[], Awaitable[None]]] = []

def register_publish_callbacks(
Expand All @@ -31,7 +28,7 @@ def register_publish_callbacks(

async def _initialize(self) -> None:
"""Initializes an instance of PublisherNotifier. This method should only be called once."""
self._pe_notifier = asyncio.create_task(self._wait_for_event())
self._notifier = asyncio.create_task(self._wait_for_event())

def _notify_publishers(self) -> None:
"""A generic notifier, alerting all `waiters` of a change."""
Expand All @@ -45,37 +42,39 @@ async def _wait_for_event(self) -> None:
await callback()


_publisher_notifier_accessor: AppStateAccessor[PublisherNotifier] = AppStateAccessor[
_pe_publisher_notifier_accessor: AppStateAccessor[PublisherNotifier] = AppStateAccessor[
PublisherNotifier
]("publisher_notifier")


def get_publisher_notifier(
def get_pe_publisher_notifier(
app_state: AppState = Depends(get_app_state),
) -> PublisherNotifier:
"""Intended for use by various publishers only."""
publisher_notifier = _publisher_notifier_accessor.get_from(app_state)
"""Intended for use by various publishers only. Intended for protocol engine."""
publisher_notifier = _pe_publisher_notifier_accessor.get_from(app_state)
assert publisher_notifier is not None

return publisher_notifier


def get_notify_publishers(
def get_pe_notify_publishers(
app_state: AppState = Depends(get_app_state),
) -> Callable[[], None]:
"""Provides access to the callback used to notify publishers of changes."""
publisher_notifier = _publisher_notifier_accessor.get_from(app_state)
"""Provides access to the callback used to notify publishers of changes. Intended for protocol engine."""
publisher_notifier = _pe_publisher_notifier_accessor.get_from(app_state)
assert isinstance(publisher_notifier, PublisherNotifier)

return publisher_notifier._notify_publishers


async def initialize_publisher_notifier(app_state: AppState) -> None:
async def initialize_pe_publisher_notifier(app_state: AppState) -> None:
"""Create a new `NotificationClient` and store it on `app_state`.
Intended to be called just once, when the server starts up.
"""
publisher_notifier: PublisherNotifier = PublisherNotifier()
_publisher_notifier_accessor.set_on(app_state, publisher_notifier)
publisher_notifier: PublisherNotifier = PublisherNotifier(
change_notifier=ChangeNotifier()
)
_pe_publisher_notifier_accessor.set_on(app_state, publisher_notifier)

await publisher_notifier._initialize()
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
get_app_state,
)
from ..notification_client import NotificationClient, get_notification_client
from ..publisher_notifier import PublisherNotifier, get_publisher_notifier
from ..publisher_notifier import PublisherNotifier, get_pe_publisher_notifier
from ..topics import Topics


Expand Down Expand Up @@ -151,7 +151,7 @@ async def _handle_engine_status_change(self) -> None:
async def get_runs_publisher(
app_state: AppState = Depends(get_app_state),
notification_client: NotificationClient = Depends(get_notification_client),
publisher_notifier: PublisherNotifier = Depends(get_publisher_notifier),
publisher_notifier: PublisherNotifier = Depends(get_pe_publisher_notifier),
) -> RunsPublisher:
"""Get a singleton RunsPublisher to publish runs topics."""
runs_publisher = _runs_publisher_accessor.get_from(app_state)
Expand Down
56 changes: 0 additions & 56 deletions robot-server/tests/service/notifications/test_change_notifier.py

This file was deleted.

Loading

0 comments on commit 3b2e900

Please sign in to comment.