diff --git a/packages/uipath-platform/pyproject.toml b/packages/uipath-platform/pyproject.toml index 505d139f0..1358b712e 100644 --- a/packages/uipath-platform/pyproject.toml +++ b/packages/uipath-platform/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-platform" -version = "0.1.87" +version = "0.1.88" description = "HTTP client library for programmatic access to UiPath Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/packages/uipath-platform/src/uipath/platform/governance/__init__.py b/packages/uipath-platform/src/uipath/platform/governance/__init__.py index e1f587606..1d9bdf7ee 100644 --- a/packages/uipath-platform/src/uipath/platform/governance/__init__.py +++ b/packages/uipath-platform/src/uipath/platform/governance/__init__.py @@ -10,6 +10,15 @@ from .compensate import FiredRule, GovernRequest from .policy import PolicyContext, PolicyResponse +# ``_live_track_event_dispatcher.LiveTrackEventDispatcher`` is intentionally +# **not** re-exported. It is host-wiring glue (the runtime sink's +# non-blocking ``track_event`` adapter), not a customer-facing API. +# Internal callers import it via the explicit private path: +# +# from uipath.platform.governance._live_track_event_dispatcher import ( +# LiveTrackEventDispatcher, +# ) + __all__ = [ "FiredRule", "GovernRequest", diff --git a/packages/uipath-platform/src/uipath/platform/governance/_live_track_event_dispatcher.py b/packages/uipath-platform/src/uipath/platform/governance/_live_track_event_dispatcher.py new file mode 100644 index 000000000..432fd91c5 --- /dev/null +++ b/packages/uipath-platform/src/uipath/platform/governance/_live_track_event_dispatcher.py @@ -0,0 +1,287 @@ +"""Non-blocking dispatcher for governance track-event telemetry. + +Wraps :meth:`UiPathPlatformGovernanceProvider.track_event_async` on a +private background ``asyncio`` event loop so sync callers can fire +telemetry events without blocking on the underlying ``POST /runtime/log`` +HTTP round-trip. + +:meth:`LiveTrackEventDispatcher.dispatch` is a sync fire-and-forget +method that mirrors the kwargs of ``track_event_async``. Internally it +schedules the async HTTP call onto a dedicated background loop, so the +calling thread never blocks on network I/O and the underlying HTTP call +remains async end-to-end. + +Design notes: + +- **Async HTTP inside, sync interface outside.** ``dispatch`` is a + sync function. Internally it enqueues a coroutine that awaits + ``provider.track_event_async``. + +- **Loop affinity.** ``httpx.AsyncClient`` lazy-binds its connection + pool to the first event loop that awaits on it. This dispatcher + assumes it owns the provider's async HTTP path — nothing else in + the process should await ``track_event_async`` (or any other + ``*_async`` method on the same underlying service) on a *different* + loop. See "one dispatcher per provider" below. + +- **Backpressure.** A ``BoundedSemaphore`` caps in-flight coroutines; + submissions that exceed the cap are dropped with a warning so + memory stays bounded when the backend is slow. + +- **Fire-and-forget contract.** Coroutine exceptions are observed on + the returned ``concurrent.futures.Future`` (to suppress asyncio's + "exception was never retrieved" warning) and logged at debug — they + cannot reach the caller because ``dispatch`` returns before the + coroutine runs. + +One dispatcher per provider. The dispatcher's background loop must be +the only loop that awaits the provider's async methods. +""" + +from __future__ import annotations + +import asyncio +import concurrent.futures +import logging +import threading +from typing import Any + +from ._governance_provider import UiPathPlatformGovernanceProvider + +logger = logging.getLogger(__name__) + + +class LiveTrackEventDispatcher: + """Non-blocking sync adapter around ``provider.track_event_async``. + + Schedules governance telemetry events on a private background + ``asyncio`` loop so the calling thread is never blocked on the + platform's ``/runtime/log`` HTTP call — and the HTTP call itself + is awaited (not run on a sync thread pool). + + .. code-block:: python + + provider = UiPathPlatformGovernanceProvider(config=..., execution_context=...) + dispatcher = LiveTrackEventDispatcher(provider) + dispatcher.dispatch(event_name="agent.started") + # ... + dispatcher.shutdown() # at process exit + + ``dispatch`` has the same kwargs as + :meth:`UiPathPlatformGovernanceProvider.track_event_async` so it is + a drop-in sync callable for anywhere the async method would go. + """ + + _DEFAULT_MAX_INFLIGHT = 40 + + def __init__( + self, + provider: UiPathPlatformGovernanceProvider, + *, + max_inflight: int = _DEFAULT_MAX_INFLIGHT, + ) -> None: + """Construct a dispatcher bound to one provider. + + Starts a daemon thread that runs a private ``asyncio`` event + loop. All HTTP awaits happen on that loop; nothing else in the + process should await the provider's async methods on a + different loop (see the module docstring on loop affinity). + + Args: + provider: The platform governance provider whose + ``track_event_async`` will be awaited on the background + loop. + max_inflight: Cap on concurrent in-flight coroutines. When + exceeded, further ``dispatch`` calls are dropped with a + warning so memory stays bounded under a slow backend. + Default 40 is sized for a bursty-but-not-sustained + event stream. + """ + self._provider = provider + self._max_inflight = max_inflight + self._inflight = threading.BoundedSemaphore(max_inflight) + self._shutdown_event = threading.Event() + self._futures_lock = threading.Lock() + self._futures: set[concurrent.futures.Future[None]] = set() + + self._loop = asyncio.new_event_loop() + self._loop_ready = threading.Event() + self._loop_thread = threading.Thread( + target=self._run_loop, + name="governance-track-event-loop", + daemon=True, + ) + self._loop_thread.start() + # Block until the loop is running so the first ``dispatch`` cannot + # race with startup and hit "loop not running" errors. + self._loop_ready.wait() + + def _run_loop(self) -> None: + """Body of the background loop thread — runs until ``shutdown``.""" + asyncio.set_event_loop(self._loop) + self._loop_ready.set() + try: + self._loop.run_forever() + finally: + # After ``run_forever`` returns (from ``stop()``), any tasks + # that were still awaiting mid-flight need to be cancelled + # and finalized before the loop can close cleanly. Without + # this, ``loop.close()`` warns "Task was destroyed but it is + # pending" for every unfinished awaiter. + try: + pending = asyncio.all_tasks(self._loop) + for task in pending: + task.cancel() + if pending: + self._loop.run_until_complete( + asyncio.gather(*pending, return_exceptions=True) + ) + except Exception as exc: # noqa: BLE001 - teardown must not raise + logger.debug("Loop cleanup swallowed exception: %s", exc) + finally: + try: + self._loop.close() + except Exception as exc: # noqa: BLE001 + logger.debug("Loop close swallowed exception: %s", exc) + + def dispatch( + self, + *, + event_name: str, + data: dict[str, Any] | None = None, + operation_id: str | None = None, + ) -> None: + """Schedule a track-event call on the background loop — returns immediately. + + The kwargs mirror + :meth:`UiPathPlatformGovernanceProvider.track_event_async` so + this method is a drop-in sync callable for the async provider + method. + + Failure modes — all silent, never raised to the caller: + + - **Post-shutdown**: dispatch after :meth:`shutdown` returns + silently; the provider is not called. + - **Saturated in-flight cap**: when ``max_inflight`` coroutines + are already scheduled, the call is dropped with a warning. + Telemetry must never grow memory without bound when the + backend is slow. + - **Loop unavailable**: ``asyncio.run_coroutine_threadsafe`` + raises ``RuntimeError`` if the loop is stopped/closed + (late-firing atexit path); the dispatcher rolls back the + semaphore slot, closes the coroutine, and logs at debug. + - **Coroutine exception**: the provider's HTTP call may raise + for any reason (serialization, 5xx, transport). ``_run`` + catches, logs at debug with ``exc_info=True``, and the + done-callback observes the future to suppress asyncio's + "exception was never retrieved" warning. + """ + if self._shutdown_event.is_set(): + logger.debug( + "Dispatcher shut down; dropping track_event (event_name=%s)", + event_name, + ) + return + + if not self._inflight.acquire(blocking=False): + logger.warning( + "Telemetry pool saturated (>%d in flight); dropping track_event " + "(event_name=%s)", + self._max_inflight, + event_name, + ) + return + + coro = self._run(event_name=event_name, data=data, operation_id=operation_id) + try: + future = asyncio.run_coroutine_threadsafe(coro, self._loop) + except RuntimeError as exc: + # Loop is stopped/closed — release the slot we took and + # close the coroutine so it doesn't warn at GC time. + coro.close() + self._inflight.release() + logger.debug( + "Telemetry loop unavailable (event_name=%s): %s", + event_name, + exc, + ) + return + + with self._futures_lock: + self._futures.add(future) + future.add_done_callback(self._on_future_done) + + async def _run( + self, + *, + event_name: str, + data: dict[str, Any] | None, + operation_id: str | None, + ) -> None: + """Coroutine body — the async HTTP call itself.""" + try: + await self._provider.track_event_async( + event_name=event_name, + data=data, + operation_id=operation_id, + ) + except Exception as exc: # noqa: BLE001 - fire-and-forget contract + logger.debug("Failed to dispatch track_event: %s", exc, exc_info=True) + + def _on_future_done(self, future: concurrent.futures.Future[None]) -> None: + """Observe the future, drop it from the pending set, release the slot. + + Uses ``future.exception()`` to observe the outcome so asyncio + doesn't warn "exception was never retrieved" at GC time. + ``concurrent.futures.Future.exception()`` *raises* + ``CancelledError`` when the future was cancelled (the observe- + without-raise semantics apply only to :class:`asyncio.Future`, + not this ``concurrent.futures`` type), so the observation is + wrapped in a targeted catch. The accounting — semaphore release + and pending-set discard — runs in ``finally`` so success, + failure, and cancellation all clean up correctly. + """ + try: + future.exception() + except concurrent.futures.CancelledError: + # Cancellation during shutdown is expected; the underlying + # coroutine's own exception (if any) was already logged by + # ``_run``. + pass + finally: + with self._futures_lock: + self._futures.discard(future) + self._inflight.release() + + def shutdown(self, *, wait: bool = True, timeout: float = 30.0) -> None: + """Stop accepting new submissions; optionally drain pending, then stop the loop. + + Call at process exit to avoid losing in-flight telemetry. + Safe to call more than once — subsequent calls are no-ops. + + Args: + wait: When ``True`` (default), block until pending + coroutines finish (bounded by ``timeout``) before + stopping the loop. When ``False``, stop immediately; + in-flight coroutines are cancelled by the loop's + teardown path. + timeout: Maximum seconds to wait for pending coroutines + when ``wait=True``. Coroutines still in flight after + the timeout are cancelled by loop teardown. + """ + if self._shutdown_event.is_set(): + return + self._shutdown_event.set() + + if wait: + with self._futures_lock: + pending = list(self._futures) + if pending: + concurrent.futures.wait(pending, timeout=timeout) + + try: + self._loop.call_soon_threadsafe(self._loop.stop) + except RuntimeError: + # Loop already stopped. + pass + self._loop_thread.join(timeout=5.0) diff --git a/packages/uipath-platform/tests/services/test_live_track_event_dispatcher.py b/packages/uipath-platform/tests/services/test_live_track_event_dispatcher.py new file mode 100644 index 000000000..8e6b0ebb7 --- /dev/null +++ b/packages/uipath-platform/tests/services/test_live_track_event_dispatcher.py @@ -0,0 +1,581 @@ +"""Tests for LiveTrackEventDispatcher. + +The dispatcher schedules ``provider.track_event_async`` on a private +background asyncio loop so a sync caller never blocks on the +underlying HTTP. Tests focus on: + +- ``dispatch`` returns immediately (doesn't block on the coroutine) +- The provider's ``track_event_async`` is awaited with the same kwargs +- Exceptions in the coroutine are swallowed (fire-and-forget contract) +- Multiple concurrent submissions all reach the provider +- ``shutdown`` drains pending coroutines +- Post-shutdown dispatch is silent and does not call the provider +- Saturated in-flight cap drops submissions rather than queueing +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from collections.abc import Generator +from unittest.mock import MagicMock + +import pytest + +from uipath.platform.governance import UiPathPlatformGovernanceProvider +from uipath.platform.governance._live_track_event_dispatcher import ( + LiveTrackEventDispatcher, +) + +_DISPATCHER_MODULE = "uipath.platform.governance._live_track_event_dispatcher" +_DISPATCHER_LOGGER = _DISPATCHER_MODULE + + +@pytest.fixture +def provider() -> MagicMock: + """Mock provider — ``track_event_async`` becomes an ``AsyncMock`` via spec.""" + # ``MagicMock(spec=...)`` auto-detects coroutine functions on the spec + # and creates ``AsyncMock`` attributes for them, so + # ``provider.track_event_async(...)`` returns an awaitable. + return MagicMock(spec=UiPathPlatformGovernanceProvider) + + +@pytest.fixture +def dispatcher( + provider: MagicMock, +) -> Generator[LiveTrackEventDispatcher, None, None]: + """Dispatcher with a small in-flight cap for fast tests.""" + d = LiveTrackEventDispatcher(provider, max_inflight=4) + yield d + d.shutdown() + + +def _wait_for(predicate, *, timeout: float = 2.0, interval: float = 0.01) -> bool: + """Spin-wait helper — returns True when predicate passes, False on timeout.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(interval) + return False + + +# --------------------------------------------------------------------------- +# dispatch is non-blocking +# --------------------------------------------------------------------------- + + +def test_dispatch_returns_before_provider_completes( + provider: MagicMock, + dispatcher: LiveTrackEventDispatcher, +) -> None: + """dispatch must not wait for the coroutine — the calling thread must not block.""" + started = threading.Event() + + async def _slow_track_event(**_: object) -> None: + started.set() + # Sleep long enough that a blocking dispatch would fail the timing bound. + await asyncio.sleep(0.5) + + provider.track_event_async.side_effect = _slow_track_event + + t0 = time.monotonic() + dispatcher.dispatch(event_name="agent.started") + elapsed = time.monotonic() - t0 + + # dispatch should return in well under 100ms even though the coroutine + # is sleeping for 500ms. + assert elapsed < 0.1, f"dispatch blocked for {elapsed:.3f}s" + + # Coroutine did start (proves the submission landed on the loop). + assert started.wait(timeout=2.0), "coroutine never started" + + +# --------------------------------------------------------------------------- +# provider receives the exact kwargs +# --------------------------------------------------------------------------- + + +def test_dispatch_forwards_kwargs_to_provider( + provider: MagicMock, + dispatcher: LiveTrackEventDispatcher, +) -> None: + """The dispatcher is a thin adapter — every kwarg must reach track_event_async.""" + dispatcher.dispatch( + event_name="agent.tool_call", + data={"tool": "browser.open", "url": "https://example.com"}, + operation_id="op-abc-123", + ) + + # Wait for the coroutine to be awaited. + assert _wait_for(lambda: provider.track_event_async.await_count >= 1), ( + "track_event_async never awaited" + ) + + provider.track_event_async.assert_awaited_once_with( + event_name="agent.tool_call", + data={"tool": "browser.open", "url": "https://example.com"}, + operation_id="op-abc-123", + ) + + +def test_dispatch_passes_none_data_and_operation_id( + provider: MagicMock, + dispatcher: LiveTrackEventDispatcher, +) -> None: + """Defaults — ``data`` and ``operation_id`` flow through as ``None``.""" + dispatcher.dispatch(event_name="agent.idle") + + assert _wait_for(lambda: provider.track_event_async.await_count >= 1) + provider.track_event_async.assert_awaited_once_with( + event_name="agent.idle", + data=None, + operation_id=None, + ) + + +# --------------------------------------------------------------------------- +# exceptions are swallowed +# --------------------------------------------------------------------------- + + +def test_worker_exception_does_not_propagate( + provider: MagicMock, + dispatcher: LiveTrackEventDispatcher, +) -> None: + """Fire-and-forget — dispatch returns before the coroutine runs, so an + exception raised inside it cannot reach the caller. The dispatcher + must catch and log internally rather than letting the future + finalize with an unobserved exception. + """ + provider.track_event_async.side_effect = RuntimeError("simulated backend 5xx") + + # If the dispatcher leaked the exception, this call would raise. + dispatcher.dispatch(event_name="agent.deny") + + # And subsequent calls keep working — one bad event doesn't poison + # the loop. + provider.track_event_async.side_effect = None + dispatcher.dispatch(event_name="agent.deny") + + assert _wait_for(lambda: provider.track_event_async.await_count >= 2) + assert provider.track_event_async.await_count == 2 + + +# --------------------------------------------------------------------------- +# concurrency +# --------------------------------------------------------------------------- + + +def test_multiple_dispatches_all_reach_provider(provider: MagicMock) -> None: + """A burst of submissions must all be delivered, in any order.""" + # Use a dedicated dispatcher with headroom well above the burst + # size so the fast-burst doesn't race the loop into saturation. + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=64) + try: + for i in range(20): + dispatcher.dispatch(event_name=f"event.{i}") + + assert _wait_for(lambda: provider.track_event_async.await_count == 20) + + seen = { + call.kwargs["event_name"] + for call in provider.track_event_async.await_args_list + } + assert seen == {f"event.{i}" for i in range(20)} + finally: + dispatcher.shutdown() + + +# --------------------------------------------------------------------------- +# shutdown +# --------------------------------------------------------------------------- + + +def test_shutdown_waits_for_pending(provider: MagicMock) -> None: + """``shutdown(wait=True)`` must let in-flight coroutines finish before + returning so process teardown doesn't lose telemetry. + """ + completed: list[str] = [] + + async def _record(*, event_name: str, **_: object) -> None: + # Small await so submissions overlap the shutdown call. + await asyncio.sleep(0.05) + completed.append(event_name) + + provider.track_event_async.side_effect = _record + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=10) + + for i in range(5): + dispatcher.dispatch(event_name=f"event.{i}") + + dispatcher.shutdown(wait=True) + + # Every submission ran to completion by the time shutdown returned. + assert sorted(completed) == [f"event.{i}" for i in range(5)] + + +def test_shutdown_is_idempotent(provider: MagicMock) -> None: + """Calling shutdown twice must not raise — process teardown paths + sometimes invoke close/shutdown from multiple atexit hooks. + """ + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=4) + dispatcher.shutdown() + dispatcher.shutdown() # second call: no crash, no exception + + +# --------------------------------------------------------------------------- +# fire-and-forget safety: dispatch never raises +# --------------------------------------------------------------------------- + + +def test_dispatch_after_shutdown_is_silent(provider: MagicMock) -> None: + """After :meth:`shutdown` the dispatcher must silently drop late + dispatches — a late dispatch (e.g. from an atexit cleanup after + the loop already stopped) cannot be allowed to raise. + """ + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=4) + dispatcher.shutdown(wait=True) + + # If the dispatcher leaked an exception, this call would raise. + dispatcher.dispatch(event_name="agent.late") + + # And the provider must not have been awaited — the loop is down. + assert not provider.track_event_async.await_count + + +def test_shutdown_no_wait_cancels_inflight_cleanly(provider: MagicMock) -> None: + """``shutdown(wait=False)`` while coroutines are mid-await must cancel + them and let :meth:`_on_future_done` complete its accounting on the + cancellation path — no semaphore leak, no leftover future in the + pending set. + + Regression: ``concurrent.futures.Future.exception()`` *raises* + ``CancelledError`` when the future was cancelled. If the callback + doesn't wrap the observation in a targeted ``except``, the discard + + release calls are skipped → semaphore slots leak (silently, + because ``Future._invoke_callbacks`` swallows the exception into + the logger). Without this guard the leak isn't visible at a + process-level assertion — the loop still stops — so the test must + check the semaphore state directly. + """ + n = 3 + release = threading.Event() + + async def _hang_forever(**_: object) -> None: + # Yield to the loop but never complete — cancelled at teardown. + while not release.is_set(): + await asyncio.sleep(0.02) + + provider.track_event_async.side_effect = _hang_forever + + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=n) + try: + # Fill every in-flight slot so shutdown teardown will cancel + # every one of them. + for i in range(n): + dispatcher.dispatch(event_name=f"event.hang.{i}") + assert _wait_for(lambda: provider.track_event_async.await_count >= 1) + + # wait=False: don't drain, just stop the loop. The loop's finally + # block cancels the pending tasks; each cancellation triggers + # ``_on_future_done`` on the cancelled ``concurrent.futures.Future``. + dispatcher.shutdown(wait=False) + + # Loop thread joined. + assert not dispatcher._loop_thread.is_alive(), ( + "loop thread did not stop after shutdown(wait=False)" + ) + + # Accounting cleanly reset — the callback ran to its finally + # even on the CancelledError path. + # 1. Pending set drained: every cancelled future was discarded. + assert not dispatcher._futures, ( + f"cancellation left {len(dispatcher._futures)} future(s) in " + f"the pending set — semaphore slot(s) leaked" + ) + # 2. Every in-flight slot released: we can immediately re-acquire + # all ``n`` semaphore slots without blocking. + acquired = 0 + for _ in range(n): + if dispatcher._inflight.acquire(blocking=False): + acquired += 1 + for _ in range(acquired): + dispatcher._inflight.release() + assert acquired == n, ( + f"expected all {n} semaphore slots free after cancellation, " + f"only {acquired} were released" + ) + finally: + release.set() + + +def test_dispatch_drops_when_inflight_saturated(provider: MagicMock) -> None: + """When the in-flight cap is reached, further dispatches are dropped + rather than queueing unboundedly. The drop must NOT call the + provider for the saturated submission. + """ + release = threading.Event() + + async def _blocked(**_: object) -> None: + # Poll a threading.Event without blocking the loop — a bare + # ``release.wait()`` would freeze the whole event loop; the + # small await yields between checks so other coroutines can + # progress and the semaphore state can be inspected. + while not release.is_set(): + await asyncio.sleep(0.02) + + provider.track_event_async.side_effect = _blocked + + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=2) + try: + # Fill the cap. + dispatcher.dispatch(event_name="event.1") + dispatcher.dispatch(event_name="event.2") + + # Wait for at least one coroutine to be awaited (so the + # semaphore is held by an in-flight task, not just queued). + assert _wait_for(lambda: provider.track_event_async.await_count >= 1) + + # Third submission should be dropped — semaphore is exhausted. + dispatcher.dispatch(event_name="event.dropped") + + # event.dropped never reaches the provider. + assert "event.dropped" not in { + call.kwargs.get("event_name") + for call in provider.track_event_async.call_args_list + } + finally: + release.set() + dispatcher.shutdown(wait=True) + + +# --------------------------------------------------------------------------- +# construction defaults +# --------------------------------------------------------------------------- + + +def test_default_max_inflight_matches_module_constant(provider: MagicMock) -> None: + """Constructor default equals the documented module constant. + + Guards against silent drift between the docstring's cited default + and the actual value passed to :class:`BoundedSemaphore`. + """ + dispatcher = LiveTrackEventDispatcher(provider) + try: + assert ( + dispatcher._max_inflight == LiveTrackEventDispatcher._DEFAULT_MAX_INFLIGHT + ) + assert LiveTrackEventDispatcher._DEFAULT_MAX_INFLIGHT == 40 + finally: + dispatcher.shutdown() + + +# --------------------------------------------------------------------------- +# uncovered branches: loop-unavailable during dispatch, loop-already-stopped +# during shutdown, and explicit exception-log path +# --------------------------------------------------------------------------- + + +def test_dispatch_swallows_when_loop_unavailable( + provider: MagicMock, + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """Covers the ``RuntimeError`` catch in :meth:`dispatch`. + + Simulates the race where the loop is stopped/closed between the + shutdown-event check and ``run_coroutine_threadsafe``. dispatch + must: + + - not raise + - not call the provider + - release the semaphore slot it took (so subsequent live dispatches + can still acquire) + - log at debug + """ + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=2) + try: + # Force run_coroutine_threadsafe to raise as if the loop were + # already closed. Patched via the dispatcher module's namespace + # so the dispatcher sees the fake at its call site. + def _raise_runtime(*_args: object, **_kwargs: object) -> object: + raise RuntimeError("Event loop is closed") + + monkeypatch.setattr( + f"{_DISPATCHER_MODULE}.asyncio.run_coroutine_threadsafe", + _raise_runtime, + ) + + with caplog.at_level(logging.DEBUG, logger=_DISPATCHER_LOGGER): + # Fire more calls than max_inflight — if the semaphore is + # NOT released on the RuntimeError path, we would see a + # "pool saturated" warning after 2 calls instead of the + # expected "loop unavailable" debug on every call. + for i in range(5): + dispatcher.dispatch(event_name=f"event.oops.{i}") + + # Provider never invoked — the loop is (simulated) dead. + assert provider.track_event_async.await_count == 0 + assert provider.track_event_async.call_count == 0 + + # Debug log fired for each attempt, and no saturation warning. + debug_hits = [ + r for r in caplog.records if "Telemetry loop unavailable" in r.message + ] + saturation_hits = [ + r for r in caplog.records if "Telemetry pool saturated" in r.message + ] + assert len(debug_hits) == 5, ( + f"expected 5 loop-unavailable debug logs, got {len(debug_hits)}" + ) + assert not saturation_hits, ( + "semaphore was not released on RuntimeError — pool saturated after 2 calls" + ) + finally: + dispatcher.shutdown() + + +def test_shutdown_swallows_when_loop_already_stopped(provider: MagicMock) -> None: + """Covers the ``RuntimeError`` catch in :meth:`shutdown`. + + If the loop is already stopped and closed by the time ``shutdown`` + is called (e.g. the ``_run_loop`` finally block has completed after + a direct external stop), ``call_soon_threadsafe`` raises. shutdown + must swallow and complete cleanly. + """ + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=2) + + # Stop the loop directly, bypassing shutdown(). The ``_run_loop`` + # finally block will run: cancel tasks, gather, then close the + # loop. When our shutdown() then tries call_soon_threadsafe, the + # loop is already closed → RuntimeError. + dispatcher._loop.call_soon_threadsafe(dispatcher._loop.stop) + dispatcher._loop_thread.join(timeout=5.0) + assert not dispatcher._loop_thread.is_alive() + + # Must not raise; the internal RuntimeError from the already-closed + # loop is swallowed. + dispatcher.shutdown() + + # And is still idempotent afterwards. + dispatcher.shutdown() + + +def test_worker_exception_is_logged_at_debug( + provider: MagicMock, + dispatcher: LiveTrackEventDispatcher, + caplog: pytest.LogCaptureFixture, +) -> None: + """Covers the ``except Exception`` branch inside ``_run``. + + Complements :func:`test_worker_exception_does_not_propagate` by + asserting the log record (with ``exc_info``) actually fires, so + coverage records the debug-log line inside the coroutine. + """ + provider.track_event_async.side_effect = ValueError("bad payload") + + with caplog.at_level(logging.DEBUG, logger=_DISPATCHER_LOGGER): + dispatcher.dispatch(event_name="agent.bad") + # Wait for the coroutine to run AND the callback to fire so + # coverage collects the except-branch lines from the loop thread. + assert _wait_for(lambda: provider.track_event_async.await_count >= 1) + # Small sleep to let the callback finalize (release semaphore, + # drop from set) — otherwise coverage may race the callback. + time.sleep(0.05) + + matching = [ + r + for r in caplog.records + if "Failed to dispatch track_event" in r.message and r.levelno == logging.DEBUG + ] + assert matching, "expected a debug log for the swallowed exception" + # exc_info is attached so operators can trace the failure. + assert matching[0].exc_info is not None + assert isinstance(matching[0].exc_info[1], ValueError) + + +# --------------------------------------------------------------------------- +# thread safety +# --------------------------------------------------------------------------- + + +def test_dispatch_is_safe_from_many_threads(provider: MagicMock) -> None: + """dispatches from many caller threads all reach the provider. + + Exercises the semaphore, the futures set, and + ``run_coroutine_threadsafe`` under concurrent access from + non-loop threads. If the futures-set mutation weren't locked, this + would race and drop or duplicate futures under load. + """ + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=128) + try: + n_threads = 50 + barrier = threading.Barrier(n_threads) + + def _fire(name: str) -> None: + # Wait until all threads are ready, then fire together. + barrier.wait(timeout=2.0) + dispatcher.dispatch(event_name=name) + + threads = [ + threading.Thread(target=_fire, args=(f"burst.{i}",)) + for i in range(n_threads) + ] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5.0) + + assert _wait_for(lambda: provider.track_event_async.await_count == n_threads), ( + f"expected {n_threads} awaits, got {provider.track_event_async.await_count}" + ) + + seen = { + c.kwargs["event_name"] for c in provider.track_event_async.await_args_list + } + assert seen == {f"burst.{i}" for i in range(n_threads)} + finally: + dispatcher.shutdown() + + +# --------------------------------------------------------------------------- +# shutdown timeout +# --------------------------------------------------------------------------- + + +def test_shutdown_respects_timeout_when_drain_stalls(provider: MagicMock) -> None: + """``shutdown(wait=True, timeout=…)`` must return within the window + even if pending coroutines are stuck. + + Ensures a stalled backend cannot hang process teardown. Coroutines + still in flight past the timeout are cancelled by the loop's + teardown path. + """ + release = threading.Event() + + async def _never_finish(**_: object) -> None: + while not release.is_set(): + await asyncio.sleep(0.02) + + provider.track_event_async.side_effect = _never_finish + dispatcher = LiveTrackEventDispatcher(provider, max_inflight=4) + + dispatcher.dispatch(event_name="stuck.1") + dispatcher.dispatch(event_name="stuck.2") + assert _wait_for(lambda: provider.track_event_async.await_count >= 1) + + t0 = time.monotonic() + try: + dispatcher.shutdown(wait=True, timeout=0.1) + finally: + release.set() + elapsed = time.monotonic() - t0 + + # shutdown(timeout=0.1) waits ≤0.1s on the futures, then stops the + # loop and joins the thread (5s cap). Total must be well under a + # few seconds — a hang here would freeze the whole test suite. + assert elapsed < 3.0, f"shutdown took {elapsed:.3f}s with timeout=0.1s" + assert not dispatcher._loop_thread.is_alive() diff --git a/packages/uipath-platform/uv.lock b/packages/uipath-platform/uv.lock index 5d36225ba..39c160fe8 100644 --- a/packages/uipath-platform/uv.lock +++ b/packages/uipath-platform/uv.lock @@ -1095,7 +1095,7 @@ dev = [ [[package]] name = "uipath-platform" -version = "0.1.87" +version = "0.1.88" source = { editable = "." } dependencies = [ { name = "httpx" }, diff --git a/packages/uipath/uv.lock b/packages/uipath/uv.lock index dae2ffe65..73365026d 100644 --- a/packages/uipath/uv.lock +++ b/packages/uipath/uv.lock @@ -2691,7 +2691,7 @@ dev = [ [[package]] name = "uipath-platform" -version = "0.1.87" +version = "0.1.88" source = { editable = "../uipath-platform" } dependencies = [ { name = "httpx" },