Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 115 additions & 6 deletions src/uipath/runtime/governance/_audit/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from typing import Any
from typing import Any, Callable

from uipath.core.governance import EnforcementMode

from .metadata import GovernanceRuntimeMetadata

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -226,14 +228,31 @@ class AuditManager:
# Trip a sink after this many consecutive emit failures (circuit-breaker).
_SINK_FAILURE_THRESHOLD = 10

def __init__(self, register_default_sinks: bool = True) -> None:
def __init__(
self,
register_default_sinks: bool = True,
*,
track_event: Callable[..., None] | None = None,
runtime_metadata: GovernanceRuntimeMetadata | None = None,
) -> None:
"""Initialize the audit manager.

Args:
register_default_sinks: If True (default), register the
always-on ``traces`` sink. Tests that want a bare
manager can pass ``False`` and register sinks
explicitly.
always-on ``traces`` and platform-mandated
``track_events`` sinks. Tests that want a bare manager
can pass ``False`` and register sinks explicitly.
track_event: Platform-supplied telemetry callable wired by
the host. When ``None`` (or any sink-construction
error), the ``track_events`` sink is skipped and a
warning is logged — the runtime continues so a wiring
bug never breaks the agent run.
runtime_metadata: Constants stamped on every telemetry
event (execution engine, agent type, agent framework,
runtime version). Defaults to
:class:`GovernanceRuntimeMetadata` () — auto-resolved
version + ``unknown`` agent type / framework. The host
overrides with concrete values.
"""
self._sinks: list[AuditSink] = []
# Guards _sinks, _sink_failures, _tripped_sinks — all read +
Expand All @@ -246,6 +265,7 @@ def __init__(self, register_default_sinks: bool = True) -> None:

if register_default_sinks:
self._register_traces_sink()
self._register_track_event_sink(track_event, runtime_metadata)

def _register_traces_sink(self) -> None:
"""Register the always-on ``traces`` sink.
Expand All @@ -262,6 +282,48 @@ def _register_traces_sink(self) -> None:
self.register_sink(sink)
logger.info("Governance audit sink registered: traces")

def _register_track_event_sink(
self,
track_event: Callable[..., None] | None,
runtime_metadata: GovernanceRuntimeMetadata | None,
) -> None:
"""Register the platform-mandated ``track_events`` sink.

Mirrors :meth:`_register_traces_sink`: deferred import,
construct, register, log. The sink is expected to be wired by
the host's platform layer.

Wrapped in a broad ``except`` so a misconfigured wiring layer
(missing callable, sink construction error) never crashes the
agent — the runtime logs and proceeds without the sink. The
sink-level circuit breaker handles per-emit failures
separately.
"""
try:
from .track_events import TrackEventAuditSink

if track_event is None:
raise ValueError(
"Platform-mandated track_event callable was not supplied; "
"the host wiring layer must pass it to AuditManager(...)."
)
meta = (
runtime_metadata
if runtime_metadata is not None
else GovernanceRuntimeMetadata()
)
sink = TrackEventAuditSink(track_event, meta)
self.register_sink(sink)
logger.info("Governance audit sink registered: track_events")
except Exception as exc: # noqa: BLE001 - registration must not crash the agent
# ``str(exc)`` instead of passing ``exc`` directly: the
# logging LogRecord retains its ``args`` tuple until the
# handler formats the record, and a raw exception there
# carries its ``__traceback__`` → frame chain → ``self``,
# which would keep the AuditManager alive in any
# log-capturing test. Stringifying breaks that ref.
logger.warning("Failed to register track_events sink: %s", str(exc))

def register_sink(self, sink: AuditSink) -> None:
"""Register an audit sink.

Expand Down Expand Up @@ -385,6 +447,8 @@ def emit_rule_evaluation(
detail: str = "",
agent_name: str = "agent",
description: str = "",
duration_ms: float = 0.0,
mapped_to_uipath: bool = False,
) -> None:
"""Convenience method to emit a rule evaluation event.

Expand All @@ -393,6 +457,10 @@ def emit_rule_evaluation(
its own mode — parallel runtimes can run in different modes
simultaneously, and a process-global wouldn't be authoritative
for any of them.

``duration_ms`` and ``mapped_to_uipath`` are stamped on the
event's data dict for telemetry sinks (e.g.
:class:`TrackEventAuditSink`); the OTel traces sink ignores them.
"""
self.emit(
AuditEvent(
Expand All @@ -409,6 +477,8 @@ def emit_rule_evaluation(
"detail": detail,
"description": description,
"status": "MATCHED" if matched else "PASS",
"duration_ms": duration_ms,
"mapped_to_uipath": mapped_to_uipath,
},
)
)
Expand All @@ -421,8 +491,41 @@ def emit_hook_summary(
matched_rules: int,
final_action: str,
enforcement_mode: EnforcementMode,
duration_ms: float = 0.0,
skipped_policy_names: list[str] | None = None,
guardrail_dispatched_count: int = 0,
denied_count: int | None = None,
) -> None:
"""Convenience method to emit a hook summary event."""
"""Convenience method to emit a hook summary event.

``matched_rules`` keeps its historical meaning — any rule whose
checks matched, regardless of the configured action — for
backward compatibility with existing sinks. The newer
``denied_count`` captures only the rules the evaluator actually
wanted to act on (matched **and** configured action ≠
``allow``). A matched rule whose action is ``allow`` is a
positive informational match and is folded into
``passed_count``, not ``denied_count``.

Args:
duration_ms: Total wall time spent evaluating this hook.
skipped_policy_names: Rules in the pack that were not
evaluated (currently disabled). The summary carries
their ids so operators can spot which policies a tenant
turned off.
guardrail_dispatched_count: How many UiPath-mapped
guardrail-fallback rules fired for this hook and were
handed to the compensating path. Lets dashboards
compute the native-vs-dispatched ratio.
denied_count: Rules that matched **and** would have acted
(action ∈ {``deny``, ``escalate``, ``audit``}). When
``None`` (legacy callers), falls back to
``matched_rules`` so the old "matched == denial"
semantic is preserved.
"""
skipped = list(skipped_policy_names or [])
actual_denied = denied_count if denied_count is not None else matched_rules
passed_count = max(total_rules - actual_denied, 0)
self.emit(
AuditEvent(
event_type=EventType.HOOK_END,
Expand All @@ -433,6 +536,12 @@ def emit_hook_summary(
"matched_rules": matched_rules,
"final_action": final_action,
"enforcement_mode": enforcement_mode,
"duration_ms": duration_ms,
"passed_count": passed_count,
"denied_count": actual_denied,
"skipped_count": len(skipped),
"skipped_policy_names": skipped,
"guardrail_dispatched_count": guardrail_dispatched_count,
},
)
)
Expand Down
69 changes: 69 additions & 0 deletions src/uipath/runtime/governance/_audit/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Per-runtime metadata stamped on every governance telemetry event.

The host constructs a :class:`GovernanceRuntimeMetadata` once per
agent run and passes it to the telemetry sink. Every event produced
by :class:`TrackEventAuditSink` carries these fields so downstream
consumers can pivot on engine / agent type / framework / runtime
version.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from importlib.metadata import PackageNotFoundError, version

NATIVE_EXECUTION_ENGINE = "uipath_native_governance_checker"


def _resolve_runtime_version() -> str:
"""Read the ``uipath-runtime`` package version, or ``"unknown"``.

``importlib.metadata.version`` fails when the package is imported
from a source checkout that was never installed (CI fixtures,
editable installs with stripped metadata). Telemetry must keep
flowing in those cases, so the fallback is a sentinel rather than
a raise.
"""
try:
return version("uipath-runtime")
except PackageNotFoundError:
return "unknown"


@dataclass(frozen=True)
class GovernanceRuntimeMetadata:
"""Constants stamped on every governance telemetry event.

Attributes:
execution_engine: Implementation behind the evaluator. Default
``"uipath_native_governance_checker"``. When a future engine
(e.g. AGT) replaces the native checker, the host supplies
its own identifier here so the emitted event records which
engine produced the verdict.
agent_type: Category of agent under governance — e.g.
``"uipath_coded"``, ``"uipath_lowcode"``, ``"servicenow"``,
or any other identifier the host wants to attach. External
agents (ServiceNow, etc.) join this taxonomy when they
land. ``"unknown"`` keeps telemetry flowing if the host
forgets to set it.
agent_framework: Framework that drives the agent — e.g.
``"langchain"``, ``"openai_agents"``, ``"llamaindex"``,
``"google_adk"``, ``"agent_framework"``, ``"mcp"``.
runtime_version: ``uipath-runtime`` package version. Resolved
from installed package metadata at construction; falls
back to ``"unknown"`` for source checkouts.
"""

execution_engine: str = NATIVE_EXECUTION_ENGINE
agent_type: str = "unknown"
agent_framework: str = "unknown"
runtime_version: str = field(default_factory=_resolve_runtime_version)

def as_payload(self) -> dict[str, str]:
"""Return the metadata as a dict ready to merge into an event payload."""
return {
"execution_engine": self.execution_engine,
"agent_type": self.agent_type,
"agent_framework": self.agent_framework,
"runtime_version": self.runtime_version,
}
Loading
Loading