diff --git a/aws_lambda_powertools/shared/constants.py b/aws_lambda_powertools/shared/constants.py
index bc19ff13b30..6c808d38758 100644
--- a/aws_lambda_powertools/shared/constants.py
+++ b/aws_lambda_powertools/shared/constants.py
@@ -76,3 +76,6 @@
# Idempotency constants
IDEMPOTENCY_DISABLED_ENV: str = "POWERTOOLS_IDEMPOTENCY_DISABLED"
+
+# Circuit breaker constants
+CIRCUIT_BREAKER_DISABLED_ENV: str = "POWERTOOLS_CIRCUIT_BREAKER_DISABLED"
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py
new file mode 100644
index 00000000000..e931245f9d0
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py
@@ -0,0 +1,35 @@
+"""
+Circuit Breaker utility for protecting unhealthy downstream dependencies.
+
+!!! warning "Alpha / experimental"
+ This utility is published under the `_alpha` namespace while we collect
+ feedback. The public API may change in a backwards-incompatible way before it
+ is promoted to GA. Pin your version and follow the tracking discussion before
+ relying on it in production.
+"""
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.circuit_breaker import circuit_breaker
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import (
+ CircuitBreakerConfigError,
+ CircuitBreakerError,
+ CircuitBreakerOpenError,
+ CircuitBreakerPersistenceError,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import (
+ CircuitInfo,
+ CircuitState,
+ CircuitTransition,
+)
+
+__all__ = (
+ "circuit_breaker",
+ "CircuitBreakerConfig",
+ "CircuitInfo",
+ "CircuitState",
+ "CircuitTransition",
+ "CircuitBreakerError",
+ "CircuitBreakerOpenError",
+ "CircuitBreakerConfigError",
+ "CircuitBreakerPersistenceError",
+)
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py
new file mode 100644
index 00000000000..b4f66b98dc8
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py
@@ -0,0 +1,248 @@
+"""
+Orchestrator for the Circuit Breaker utility.
+
+:class:`CircuitBreakerHandler` owns the state machine and the per-environment failure
+counter; the persistence layer owns the shared truth. This split keeps the healthy
+path write-free: failures are counted locally and only persisted on a state transition.
+"""
+
+from __future__ import annotations
+
+import datetime
+import logging
+import uuid
+from typing import TYPE_CHECKING, Any
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerOpenError
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState, CircuitTransition
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
+ CircuitBreakerPersistenceLayer,
+ )
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo
+
+logger = logging.getLogger(__name__)
+
+# Per-environment, per-circuit consecutive counters. Module-level so they survive across
+# invocations within the same execution environment, the same way idempotency caches do.
+_LOCAL_FAILURES: dict[str, int] = {}
+_LOCAL_SUCCESSES: dict[str, int] = {}
+
+# Tracks the last state this environment observed from the store, per circuit. Used to
+# detect transitions back to CLOSED that happened externally (another env tripped and
+# recovered), so stale local failure streaks can be invalidated.
+_LAST_OBSERVED_STATE: dict[str, CircuitState] = {}
+
+# Stable per-environment identifier used to claim the half-open probe lock.
+_ENVIRONMENT_ID = uuid.uuid4().hex
+
+
+class CircuitBreakerHandler:
+ """
+ Drive a single protected call through the circuit breaker state machine.
+
+ A new handler is created per invocation by the decorator. It reads the shared state,
+ routes the call (run, short-circuit, or probe), and records the outcome.
+
+ Parameters
+ ----------
+ function : Callable
+ The protected function.
+ name : str
+ Circuit name.
+ config : CircuitBreakerConfig
+ Circuit configuration.
+ persistence_store : CircuitBreakerPersistenceLayer
+ Shared state store.
+ on_circuit_open : Callable | None
+ Callback invoked with the protected call's own ``*args``/``**kwargs`` plus a
+ trailing ``circuit`` keyword argument when the circuit is open. If ``None``, an
+ open circuit raises :class:`CircuitBreakerOpenError`.
+ function_args : tuple
+ Positional arguments the protected function was called with.
+ function_kwargs : dict
+ Keyword arguments the protected function was called with.
+ """
+
+ def __init__(
+ self,
+ function: Callable,
+ name: str,
+ config: CircuitBreakerConfig,
+ persistence_store: CircuitBreakerPersistenceLayer,
+ on_circuit_open: Callable | None = None,
+ on_transition: Callable | None = None,
+ function_args: tuple | None = None,
+ function_kwargs: dict | None = None,
+ ):
+ self.function = function
+ self.name = name
+ self.config = config
+ self.on_circuit_open = on_circuit_open
+ self.on_transition = on_transition
+ self.fn_args = function_args or ()
+ self.fn_kwargs = function_kwargs or {}
+
+ persistence_store.configure(config=config, circuit_name=name)
+ self.persistence_store = persistence_store
+
+ def handle(self) -> Any:
+ """
+ Evaluate the circuit and route the call.
+
+ Returns
+ -------
+ Any
+ The protected function's result when the call runs, or the
+ ``on_circuit_open`` callback's return value when the circuit is open.
+
+ Raises
+ ------
+ CircuitBreakerOpenError
+ If the circuit is open and no callback is registered.
+ """
+ record = self.persistence_store.get_state(self.name)
+
+ if record.state == CircuitState.CLOSED:
+ # If we previously observed a non-CLOSED state and the circuit is now back to
+ # CLOSED, another environment completed the recovery cycle. Reset local counters
+ # so a stale partial failure streak doesn't immediately re-trip the circuit.
+ prev = _LAST_OBSERVED_STATE.get(self.name)
+ if prev is not None and prev != CircuitState.CLOSED:
+ _LOCAL_FAILURES[self.name] = 0
+ _LAST_OBSERVED_STATE[self.name] = CircuitState.CLOSED
+ return self._call_closed()
+
+ if record.state == CircuitState.OPEN:
+ _LAST_OBSERVED_STATE[self.name] = CircuitState.OPEN
+ # ``opened_at`` may legitimately be 0 (epoch); treat only None as missing.
+ opened_at = record.opened_at if record.opened_at is not None else self._now()
+ if self._now() >= opened_at + self.config.recovery_timeout:
+ # Recovery window elapsed: try to become the single prober.
+ if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, opened_at):
+ self._notify(CircuitState.OPEN, CircuitState.HALF_OPEN, opened_at=opened_at)
+ return self._call_probe()
+ return self._open_response(record.to_circuit_info())
+
+ # HALF_OPEN: only the environment that owns the probe lock runs.
+ _LAST_OBSERVED_STATE[self.name] = CircuitState.HALF_OPEN
+ if record.half_open_owner == _ENVIRONMENT_ID:
+ return self._call_probe()
+
+ # If the probe lease has expired (owner recycled mid-probe), take over.
+ if record.probe_lease_expiry is not None and self._now() >= record.probe_lease_expiry:
+ logger.debug("Circuit '%s' probe lease expired; attempting takeover.", self.name)
+ if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, record.opened_at or 0):
+ return self._call_probe()
+
+ return self._open_response(record.to_circuit_info())
+
+ def _call_closed(self) -> Any:
+ """Run the protected call while the circuit is closed, tracking failures."""
+ try:
+ result = self.function(*self.fn_args, **self.fn_kwargs)
+ except Exception as exc:
+ if not self.config.counts_as_failure(exc):
+ raise
+ failures = _LOCAL_FAILURES.get(self.name, 0) + 1
+ _LOCAL_FAILURES[self.name] = failures
+ if failures >= self.config.failure_threshold:
+ logger.debug("Circuit '%s' tripping CLOSED to OPEN after %d failures.", self.name, failures)
+ opened_at = self._now()
+ self._safe_persist(
+ self.persistence_store.save_open,
+ self.name,
+ failure_count=failures,
+ opened_at=opened_at,
+ )
+ _LOCAL_FAILURES[self.name] = 0
+ self._notify(CircuitState.CLOSED, CircuitState.OPEN, opened_at=opened_at)
+ raise
+ else:
+ _LOCAL_FAILURES[self.name] = 0
+ return result
+
+ def _call_probe(self) -> Any:
+ """Run a probe during half-open, closing or reopening based on the outcome."""
+ try:
+ result = self.function(*self.fn_args, **self.fn_kwargs)
+ except Exception as exc:
+ if not self.config.counts_as_failure(exc):
+ raise
+ logger.debug("Circuit '%s' probe failed; reopening.", self.name)
+ opened_at = self._now()
+ self._safe_persist(self.persistence_store.save_reopen, self.name, opened_at=opened_at)
+ _LOCAL_SUCCESSES[self.name] = 0
+ self._notify(CircuitState.HALF_OPEN, CircuitState.OPEN, opened_at=opened_at)
+ raise
+ else:
+ successes = _LOCAL_SUCCESSES.get(self.name, 0) + 1
+ _LOCAL_SUCCESSES[self.name] = successes
+ if successes >= self.config.success_threshold:
+ logger.debug("Circuit '%s' closing after %d probe successes.", self.name, successes)
+ self._safe_persist(self.persistence_store.save_closed, self.name)
+ _LOCAL_SUCCESSES[self.name] = 0
+ _LOCAL_FAILURES[self.name] = 0
+ self._notify(CircuitState.HALF_OPEN, CircuitState.CLOSED)
+ return result
+
+ def _safe_persist(self, fn: Callable, *args: Any, **kwargs: Any) -> None:
+ """
+ Call a persistence write, swallowing and logging failures.
+
+ State-transition writes must never mask the downstream's real result or replace
+ the downstream's real exception. This mirrors the fail-open read policy in the
+ persistence layer.
+ """
+ try:
+ fn(*args, **kwargs)
+ except Exception:
+ logger.warning(
+ "Circuit '%s': persistence write (%s) failed; the transition may be delayed but the "
+ "downstream result is preserved.",
+ self.name,
+ getattr(fn, "__name__", repr(fn)),
+ exc_info=True,
+ )
+
+ def _open_response(self, circuit: CircuitInfo) -> Any:
+ """Produce the response for an open circuit: callback result or raise."""
+ if self.on_circuit_open is not None:
+ # Forward the protected call's arguments unchanged: positional stay positional,
+ # keyword stay keyword. The circuit snapshot is passed as a keyword argument so
+ # it never collides with positionalized kwargs nor depends on dict ordering.
+ return self.on_circuit_open(*self.fn_args, **self.fn_kwargs, circuit=circuit)
+ raise CircuitBreakerOpenError(
+ f"Circuit '{self.name}' is open.",
+ circuit=circuit,
+ )
+
+ def _notify(self, from_state: CircuitState, to_state: CircuitState, opened_at: int | None = None) -> None:
+ """
+ Fire the ``on_transition`` hook for a state change.
+
+ Called only on real transitions, never on the hot path. Any exception the hook
+ raises is swallowed and logged: observability must never break the protected call.
+ """
+ if self.on_transition is None:
+ return
+ try:
+ self.on_transition(
+ CircuitTransition(
+ circuit_name=self.name,
+ from_state=from_state,
+ to_state=to_state,
+ opened_at=opened_at,
+ ),
+ )
+ except Exception:
+ logger.warning("on_transition hook for circuit '%s' raised; ignoring.", self.name, exc_info=True)
+
+ @staticmethod
+ def _now() -> int:
+ """Current unix timestamp in seconds."""
+ return int(datetime.datetime.now().timestamp())
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py
new file mode 100644
index 00000000000..de4e66b0680
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py
@@ -0,0 +1,119 @@
+"""
+Primary interface for the Circuit Breaker utility.
+"""
+
+from __future__ import annotations
+
+import functools
+import logging
+import os
+import warnings
+from typing import TYPE_CHECKING, Any
+
+from aws_lambda_powertools.shared import constants
+from aws_lambda_powertools.shared.functions import strtobool
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.base import CircuitBreakerHandler
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+from aws_lambda_powertools.warnings import PowertoolsUserWarning
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
+ CircuitBreakerPersistenceLayer,
+ )
+
+logger = logging.getLogger(__name__)
+
+
+def circuit_breaker(
+ name: str,
+ persistence_store: CircuitBreakerPersistenceLayer,
+ on_circuit_open: Callable | None = None,
+ on_transition: Callable | None = None,
+ config: CircuitBreakerConfig | None = None,
+) -> Callable:
+ """
+ Protect a function that calls an unhealthy-prone downstream with a circuit breaker.
+
+ Wrap the function that makes the downstream call, not the whole Lambda handler, so a
+ tripped circuit reflects one dependency rather than unrelated handler logic.
+
+ When the circuit is open the protected function is not called. Instead, if an
+ ``on_circuit_open`` callback is registered it runs and its return value becomes the
+ call's result; otherwise :class:`CircuitBreakerOpenError` is raised.
+
+ Parameters
+ ----------
+ name : str
+ Unique circuit name. Each name is an independent circuit; a function calling
+ several backends should use one circuit per backend.
+ persistence_store : CircuitBreakerPersistenceLayer
+ Shared state store (for example ``CircuitBreakerDynamoDBPersistence``).
+ on_circuit_open : Callable | None
+ Called when the circuit is open, with the protected function's own arguments
+ (positional stay positional, keyword stay keyword) plus a trailing ``circuit``
+ keyword argument carrying a ``CircuitInfo``. Its return value becomes the call's
+ result. If ``None``, an open circuit raises ``CircuitBreakerOpenError``.
+ on_transition : Callable | None
+ Called with a single ``CircuitTransition`` argument whenever the circuit changes
+ state (open, probe, close, reopen). Fires only on transitions, never on the
+ per-invocation hot path, so it is a safe place to emit a CloudWatch metric. Any
+ exception it raises is swallowed and logged so observability never breaks the
+ protected call.
+ config : CircuitBreakerConfig | None
+ Tunables. Defaults to ``CircuitBreakerConfig()`` when omitted.
+
+ Returns
+ -------
+ Callable
+ The decorated function.
+
+ Example
+ -------
+ **Protect a payment backend, buffering rejected requests**
+
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker, CircuitInfo
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+ )
+
+ persistence = CircuitBreakerDynamoDBPersistence(table_name="CircuitBreakerState")
+
+ def buffer(order: dict, circuit: CircuitInfo):
+ sqs.send_message(QueueUrl=url, MessageBody=json.dumps(order))
+
+ @circuit_breaker(name="payment-backend", persistence_store=persistence, on_circuit_open=buffer)
+ def charge(order: dict) -> dict:
+ return payment_api.charge(order)
+ """
+ config = config or CircuitBreakerConfig()
+
+ def decorator(function: Callable) -> Callable:
+ @functools.wraps(function)
+ def wrapper(*args, **kwargs) -> Any:
+ # Skip the circuit entirely when disabled (development only).
+ if strtobool(os.getenv(constants.CIRCUIT_BREAKER_DISABLED_ENV, "false")):
+ warnings.warn(
+ message="Disabling the circuit breaker is intended for development environments only "
+ "and should not be used in production.",
+ category=PowertoolsUserWarning,
+ stacklevel=2,
+ )
+ return function(*args, **kwargs)
+
+ handler = CircuitBreakerHandler(
+ function=function,
+ name=name,
+ config=config,
+ persistence_store=persistence_store,
+ on_circuit_open=on_circuit_open,
+ on_transition=on_transition,
+ function_args=args,
+ function_kwargs=kwargs,
+ )
+ return handler.handle()
+
+ return wrapper
+
+ return decorator
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py
new file mode 100644
index 00000000000..9425e90ab38
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py
@@ -0,0 +1,128 @@
+"""
+Configuration for the Circuit Breaker utility.
+"""
+
+from __future__ import annotations
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerConfigError
+
+
+class CircuitBreakerConfig:
+ """
+ Tunables for a circuit breaker.
+
+ All values have sensible defaults, so ``CircuitBreakerConfig()`` is a valid
+ production configuration. Pass an instance to ``@circuit_breaker(config=...)`` to
+ override them.
+
+ Parameters
+ ----------
+ failure_threshold : int
+ Number of *consecutive* failures that trips a closed circuit to open. Defaults to 5.
+ recovery_timeout : int
+ Seconds the circuit stays open before allowing a half-open probe. Defaults to 30.
+ success_threshold : int
+ Number of *consecutive* probe successes required to close a half-open circuit.
+ Defaults to 3.
+ handled_exceptions : tuple[type[Exception], ...] | None
+ Allowlist: only these exception types count as failures; anything else
+ propagates without affecting the circuit. Mutually exclusive with
+ ``ignored_exceptions``. Defaults to ``None`` (treated as ``(Exception,)``).
+ ignored_exceptions : tuple[type[Exception], ...] | None
+ Denylist: every exception counts as a failure *except* these. Mutually
+ exclusive with ``handled_exceptions``. Defaults to ``None``.
+ local_cache_max_age : int
+ Seconds a circuit's state is cached in the execution environment before a
+ read-through to the store. Matches the Parameters utility default. Defaults to 5.
+
+ Raises
+ ------
+ CircuitBreakerConfigError
+ If both ``handled_exceptions`` and ``ignored_exceptions`` are provided, or a
+ numeric tunable is not a positive integer.
+
+ Example
+ -------
+ **Only count timeouts and connection errors as failures**
+
+ config = CircuitBreakerConfig(
+ failure_threshold=5,
+ recovery_timeout=30,
+ handled_exceptions=(TimeoutError, ConnectionError),
+ )
+ """
+
+ def __init__(
+ self,
+ failure_threshold: int = 5,
+ recovery_timeout: int = 30,
+ success_threshold: int = 3,
+ handled_exceptions: tuple[type[Exception], ...] | None = None,
+ ignored_exceptions: tuple[type[Exception], ...] | None = None,
+ local_cache_max_age: int = 5,
+ ):
+ self._validate(
+ failure_threshold=failure_threshold,
+ recovery_timeout=recovery_timeout,
+ success_threshold=success_threshold,
+ handled_exceptions=handled_exceptions,
+ ignored_exceptions=ignored_exceptions,
+ local_cache_max_age=local_cache_max_age,
+ )
+
+ self.failure_threshold = failure_threshold
+ self.recovery_timeout = recovery_timeout
+ self.success_threshold = success_threshold
+ self.handled_exceptions = handled_exceptions
+ self.ignored_exceptions = ignored_exceptions
+ self.local_cache_max_age = local_cache_max_age
+
+ @staticmethod
+ def _validate(
+ failure_threshold: int,
+ recovery_timeout: int,
+ success_threshold: int,
+ handled_exceptions: tuple[type[Exception], ...] | None,
+ ignored_exceptions: tuple[type[Exception], ...] | None,
+ local_cache_max_age: int,
+ ) -> None:
+ if handled_exceptions and ignored_exceptions:
+ raise CircuitBreakerConfigError(
+ "handled_exceptions and ignored_exceptions are mutually exclusive; pass only one.",
+ )
+
+ # Thresholds and timeouts must be strictly positive; cache age may be 0 (always read through).
+ for field, value in (
+ ("failure_threshold", failure_threshold),
+ ("recovery_timeout", recovery_timeout),
+ ("success_threshold", success_threshold),
+ ):
+ if not isinstance(value, int) or value <= 0:
+ raise CircuitBreakerConfigError(f"{field} must be a positive integer, got {value!r}.")
+
+ if not isinstance(local_cache_max_age, int) or local_cache_max_age < 0:
+ raise CircuitBreakerConfigError(
+ f"local_cache_max_age must be a non-negative integer, got {local_cache_max_age!r}.",
+ )
+
+ def counts_as_failure(self, exception: Exception) -> bool:
+ """
+ Decide whether an exception raised by the protected call counts as a circuit failure.
+
+ Parameters
+ ----------
+ exception : Exception
+ The exception raised by the protected function.
+
+ Returns
+ -------
+ bool
+ ``True`` if the exception should increment the failure counter, ``False`` if
+ it should propagate without affecting the circuit.
+ """
+ if self.handled_exceptions is not None:
+ return isinstance(exception, self.handled_exceptions)
+ if self.ignored_exceptions is not None:
+ return not isinstance(exception, self.ignored_exceptions)
+ # Default: any exception counts as a failure.
+ return True
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py
new file mode 100644
index 00000000000..cf3c350fc83
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py
@@ -0,0 +1,77 @@
+"""
+Circuit Breaker exceptions.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo
+
+
+class CircuitBreakerError(Exception):
+ """
+ Base error class.
+
+ Overrides message/details formatting so the printed exception stays readable.
+ See https://github.com/aws-powertools/powertools-lambda-python/issues/1772
+ """
+
+ def __init__(self, *args: str | Exception | None):
+ self.message = str(args[0]) if args else ""
+ self.details = "".join(str(arg) for arg in args[1:]) if args[1:] else None
+
+ def __str__(self):
+ """Return all arguments formatted, or the original message."""
+ if self.message and self.details:
+ return f"{self.message} - ({self.details})"
+ return self.message
+
+
+class CircuitBreakerOpenError(CircuitBreakerError):
+ """
+ Raised when the circuit is open and no ``on_circuit_open`` callback is registered.
+
+ The rejected request never reached the downstream. The circuit snapshot is attached
+ so the caller can decide how to respond.
+
+ Parameters
+ ----------
+ *args : str | Exception | None
+ Standard error message/details.
+ circuit : CircuitInfo | None
+ Snapshot of the circuit at rejection time.
+
+ Example
+ -------
+ **Handling an open circuit when no callback is registered**
+
+ try:
+ charge(order)
+ except CircuitBreakerOpenError as exc:
+ logger.warning("rejected by circuit %s", exc.circuit.name)
+ return {"statusCode": 202}
+ """
+
+ def __init__(self, *args: str | Exception | None, circuit: CircuitInfo | None = None):
+ self.circuit = circuit
+ super().__init__(*args)
+
+
+class CircuitBreakerConfigError(CircuitBreakerError):
+ """
+ Raised when ``CircuitBreakerConfig`` is built with an unsupported combination of
+ options (for example, both ``handled_exceptions`` and ``ignored_exceptions``).
+ """
+
+
+class CircuitBreakerPersistenceError(CircuitBreakerError):
+ """
+ Raised by a persistence backend for an unrecoverable store error on a *write* path
+ (persisting a state transition), where there is no safe local fallback.
+
+ Reads never raise this: ``get_state`` fails open (treats the circuit as closed) and
+ only logs, so a degraded store can never become the outage the breaker is meant to
+ prevent. Custom backends may raise this from their write primitives.
+ """
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py
new file mode 100644
index 00000000000..18704bff793
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py
@@ -0,0 +1,15 @@
+"""
+Persistence layers for the Circuit Breaker utility.
+"""
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import CircuitBreakerPersistenceLayer
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.dynamodb import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+
+__all__ = (
+ "CircuitBreakerPersistenceLayer",
+ "CircuitBreakerDynamoDBPersistence",
+ "CircuitStateRecord",
+)
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py
new file mode 100644
index 00000000000..ec8d305a829
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py
@@ -0,0 +1,289 @@
+"""
+Abstract persistence layer for the Circuit Breaker utility.
+
+Concrete backends (DynamoDB, cache) subclass :class:`CircuitBreakerPersistenceLayer`
+and implement the small set of store primitives. The base class owns the local
+read-through cache and the fail-open policy so every backend behaves identically.
+"""
+
+from __future__ import annotations
+
+import datetime
+import logging
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING
+
+from aws_lambda_powertools.shared.cache_dict import LRUDict
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
+
+if TYPE_CHECKING:
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+
+logger = logging.getLogger(__name__)
+
+# Circuit names are static in user code, so a handful of circuits per environment is the
+# norm. This cap only guards the pathological case of dynamically generated names.
+LOCAL_CACHE_MAX_ITEMS = 1024
+
+# Slack added on top of a recovery cycle when computing the durable store TTL. The item
+# must outlive any in-flight recovery window so a live circuit is never reaped mid-cycle,
+# while an abandoned circuit (no traffic, no further writes) still self-cleans soon after.
+PERSISTED_STATE_TTL_BUFFER = 3600
+
+
+class CircuitBreakerExistingLockError(Exception):
+ """Internal signal that a conditional half-open probe write lost the race."""
+
+
+class CircuitBreakerRecordNotFoundError(Exception):
+ """Internal signal that no record exists for a circuit name."""
+
+
+class CircuitBreakerPersistenceLayer(ABC):
+ """
+ Abstract base class for circuit breaker persistence layers.
+
+ Owns the per-environment read cache and the fail-open behavior. Subclasses
+ implement :meth:`_get_record`, :meth:`_put_record`, and :meth:`_update_record`
+ for a specific store.
+
+ A persistence layer is keyed by **circuit name**, not by a payload hash, which is
+ the main reason it does not reuse the Idempotency persistence layer.
+ """
+
+ def __init__(self) -> None:
+ """Initialize defaults; real configuration happens in :meth:`configure`."""
+ self.circuit_name: str = ""
+ self.local_cache_max_age: int = 5
+ self.recovery_timeout: int = 30
+ # Maps circuit name -> the unix timestamp the locally cached record goes stale.
+ # Kept separate from the record's durable ``expiry_timestamp`` (the store TTL) so
+ # the short in-memory freshness window is never mistaken for the long store TTL.
+ self._cache: LRUDict = LRUDict(max_items=LOCAL_CACHE_MAX_ITEMS)
+
+ def configure(self, config: CircuitBreakerConfig, circuit_name: str) -> None:
+ """
+ Bind the layer to a circuit and its configuration.
+
+ Called once per invocation by the handler; the assignments are cheap and the
+ same persistence instance is reused across invocations within an environment.
+
+ Parameters
+ ----------
+ config : CircuitBreakerConfig
+ Configuration providing the local cache TTL and recovery timeout.
+ circuit_name : str
+ The circuit this layer instance serves.
+ """
+ self.circuit_name = circuit_name
+ self.local_cache_max_age = config.local_cache_max_age
+ self.recovery_timeout = config.recovery_timeout
+
+ # ------------------------------------------------------------------ cache
+
+ def _cache_key(self, name: str) -> str:
+ return name
+
+ def _durable_ttl(self) -> int:
+ """
+ Compute the store TTL stamped on a persisted record.
+
+ Sized to outlive a full recovery window so a live circuit is never reaped
+ mid-cycle, while an abandoned circuit (no further writes) self-cleans soon after.
+ """
+ return int(datetime.datetime.now().timestamp()) + self.recovery_timeout + PERSISTED_STATE_TTL_BUFFER
+
+ def _save_to_cache(self, record: CircuitStateRecord) -> None:
+ """Cache a record locally with a short in-memory freshness window."""
+ local_expiry = int(datetime.datetime.now().timestamp()) + self.local_cache_max_age
+ self._cache[self._cache_key(record.name)] = (local_expiry, record)
+
+ def _retrieve_from_cache(self, name: str) -> CircuitStateRecord | None:
+ """Return a cached record if present and still within its local freshness window."""
+ cached = self._cache.get(self._cache_key(name))
+ if cached is None:
+ return None
+ local_expiry, record = cached
+ if int(datetime.datetime.now().timestamp()) < local_expiry:
+ return record
+ del self._cache[self._cache_key(name)]
+ return None
+
+ # ------------------------------------------------------------- public API
+
+ def get_state(self, name: str) -> CircuitStateRecord:
+ """
+ Return the current circuit state, reading the store only on a cache miss.
+
+ A cache miss (cold start or expired local entry) forces a read-through before
+ the caller routes the request, so a freshly started environment never assumes a
+ circuit is closed without checking.
+
+ Fail-open: if the store read itself raises, the circuit is treated as
+ ``CLOSED``. A circuit breaker must never become the outage it is meant to
+ prevent.
+
+ Parameters
+ ----------
+ name : str
+ Circuit name.
+
+ Returns
+ -------
+ CircuitStateRecord
+ The current record, a synthesized closed record if none exists yet, or a
+ synthesized closed record if the store could not be reached.
+ """
+ cached = self._retrieve_from_cache(name)
+ if cached is not None:
+ return cached
+
+ try:
+ record = self._get_record(name)
+ except CircuitBreakerRecordNotFoundError:
+ record = CircuitStateRecord(name=name, state=CircuitState.CLOSED)
+ except Exception:
+ # Fail open without caching, so the next invocation retries the store rather
+ # than serving a synthesized CLOSED for the whole local cache window.
+ logger.warning(
+ "Failed to read circuit state for '%s'; failing open (treating as CLOSED).",
+ name,
+ exc_info=True,
+ )
+ return CircuitStateRecord(name=name, state=CircuitState.CLOSED)
+
+ self._save_to_cache(record)
+ return record
+
+ def save_open(self, name: str, failure_count: int, opened_at: int) -> None:
+ """
+ Persist a CLOSED to OPEN transition.
+
+ Parameters
+ ----------
+ name : str
+ Circuit name.
+ failure_count : int
+ Consecutive failures that tripped the circuit.
+ opened_at : int
+ Unix timestamp the circuit opened; anchors the recovery timeout.
+ """
+ record = CircuitStateRecord(
+ name=name,
+ state=CircuitState.OPEN,
+ failure_count=failure_count,
+ opened_at=opened_at,
+ expiry_timestamp=self._durable_ttl(),
+ )
+ self._put_record(record)
+ self._save_to_cache(record)
+
+ def try_acquire_half_open(self, name: str, owner: str, opened_at: int) -> bool:
+ """
+ Atomically elect a single environment to run the half-open probe.
+
+ The conditional write succeeds only when the circuit is OPEN with no existing
+ lock owner AND the ``opened_at`` matches what the caller observed (guards against
+ stale eventually-consistent reads). A lease expiry is stamped so that if the
+ winning environment is recycled before completing the probe, others can take over
+ once the lease lapses.
+
+ Parameters
+ ----------
+ name : str
+ Circuit name.
+ owner : str
+ Identifier of the environment attempting the probe.
+ opened_at : int
+ The ``opened_at`` the caller observed, kept stable across the transition.
+
+ Returns
+ -------
+ bool
+ ``True`` if this environment won the probe lock, ``False`` if another
+ environment already holds it.
+ """
+ # Lease = recovery_timeout gives the probe a full cycle to complete.
+ probe_lease_expiry = int(datetime.datetime.now().timestamp()) + self.recovery_timeout
+ record = CircuitStateRecord(
+ name=name,
+ state=CircuitState.HALF_OPEN,
+ opened_at=opened_at,
+ half_open_owner=owner,
+ probe_lease_expiry=probe_lease_expiry,
+ expiry_timestamp=self._durable_ttl(),
+ )
+ try:
+ self._put_record(record, condition="half_open", expected_opened_at=opened_at)
+ except CircuitBreakerExistingLockError:
+ return False
+ self._save_to_cache(record)
+ return True
+
+ def save_closed(self, name: str) -> None:
+ """Persist a transition back to CLOSED and reset counters."""
+ record = CircuitStateRecord(
+ name=name,
+ state=CircuitState.CLOSED,
+ failure_count=0,
+ expiry_timestamp=self._durable_ttl(),
+ )
+ self._update_record(record)
+ self._save_to_cache(record)
+
+ def save_reopen(self, name: str, opened_at: int) -> None:
+ """Persist a HALF_OPEN to OPEN transition after a failed probe."""
+ record = CircuitStateRecord(
+ name=name,
+ state=CircuitState.OPEN,
+ opened_at=opened_at,
+ expiry_timestamp=self._durable_ttl(),
+ )
+ self._update_record(record)
+ self._save_to_cache(record)
+
+ # --------------------------------------------------------- backend hooks
+
+ @abstractmethod
+ def _get_record(self, name: str) -> CircuitStateRecord:
+ """
+ Fetch a circuit record from the store.
+
+ Raises
+ ------
+ CircuitBreakerRecordNotFoundError
+ If no record exists for ``name``.
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def _put_record(
+ self,
+ record: CircuitStateRecord,
+ condition: str | None = None,
+ expected_opened_at: int | None = None,
+ ) -> None:
+ """
+ Write a circuit record.
+
+ Parameters
+ ----------
+ record : CircuitStateRecord
+ Record to write.
+ condition : str | None
+ When ``"half_open"``, the write must be conditional so only one
+ environment wins the probe lock; on a lost race the backend raises
+ :class:`CircuitBreakerExistingLockError`.
+ expected_opened_at : int | None
+ When set alongside ``condition="half_open"``, the write additionally
+ requires that the stored ``opened_at`` matches this value. This closes
+ a race where an eventually-consistent read could let a stale environment
+ win an election immediately after a failed probe reopened the circuit.
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ def _update_record(self, record: CircuitStateRecord) -> None:
+ """Update an existing circuit record (unconditional state change)."""
+ raise NotImplementedError
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py
new file mode 100644
index 00000000000..1642979795f
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py
@@ -0,0 +1,243 @@
+"""
+DynamoDB persistence backend for the Circuit Breaker utility.
+"""
+
+from __future__ import annotations
+
+import datetime
+import logging
+from typing import TYPE_CHECKING
+
+import boto3
+from boto3.dynamodb.types import TypeDeserializer
+from botocore.exceptions import ClientError
+
+from aws_lambda_powertools.shared import user_agent
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
+ CircuitBreakerExistingLockError,
+ CircuitBreakerPersistenceLayer,
+ CircuitBreakerRecordNotFoundError,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
+
+if TYPE_CHECKING:
+ from botocore.config import Config
+ from mypy_boto3_dynamodb.client import DynamoDBClient
+
+logger = logging.getLogger(__name__)
+
+
+class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer):
+ """
+ Store circuit state in an Amazon DynamoDB table, one item per circuit.
+
+ The class name is prefixed with ``CircuitBreaker`` so a function using both the
+ Idempotency and Circuit Breaker utilities can import both persistence layers
+ without an alias.
+
+ Parameters
+ ----------
+ table_name : str
+ Name of the DynamoDB table that stores circuit state.
+ key_attr : str
+ Partition key attribute holding the circuit name. Defaults to ``"id"``.
+ state_attr : str
+ Attribute holding the circuit state. Defaults to ``"state"``.
+ failure_count_attr : str
+ Attribute holding the consecutive failure count. Defaults to ``"failure_count"``.
+ opened_at_attr : str
+ Attribute holding the open timestamp. Defaults to ``"opened_at"``.
+ half_open_owner_attr : str
+ Attribute holding the half-open probe lock owner. Defaults to ``"half_open_owner"``.
+ probe_lease_expiry_attr : str
+ Attribute holding the probe lease expiry timestamp. Defaults to ``"probe_lease_expiry"``.
+ expiry_attr : str
+ TTL attribute. Defaults to ``"expiration"``.
+ boto_config : botocore.config.Config, optional
+ Botocore configuration used when creating the client.
+ boto3_session : boto3.session.Session, optional
+ Session used to create the client.
+ boto3_client : DynamoDBClient, optional
+ Pre-built client; ``boto3_session`` and ``boto_config`` are ignored if given.
+
+ Example
+ -------
+ **Create a DynamoDB-backed circuit breaker store**
+
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+ )
+
+ persistence = CircuitBreakerDynamoDBPersistence(table_name="CircuitBreakerState")
+ """
+
+ def __init__(
+ self,
+ table_name: str,
+ key_attr: str = "id",
+ state_attr: str = "state",
+ failure_count_attr: str = "failure_count",
+ opened_at_attr: str = "opened_at",
+ half_open_owner_attr: str = "half_open_owner",
+ probe_lease_expiry_attr: str = "probe_lease_expiry",
+ expiry_attr: str = "expiration",
+ boto_config: Config | None = None,
+ boto3_session: boto3.session.Session | None = None,
+ boto3_client: DynamoDBClient | None = None,
+ ):
+ if boto3_client is None:
+ boto3_session = boto3_session or boto3.session.Session()
+ boto3_client = boto3_session.client("dynamodb", config=boto_config)
+ self.client = boto3_client
+
+ user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker")
+
+ self.table_name = table_name
+ self.key_attr = key_attr
+ self.state_attr = state_attr
+ self.failure_count_attr = failure_count_attr
+ self.opened_at_attr = opened_at_attr
+ self.half_open_owner_attr = half_open_owner_attr
+ self.probe_lease_expiry_attr = probe_lease_expiry_attr
+ self.expiry_attr = expiry_attr
+
+ self._deserializer = TypeDeserializer()
+
+ super().__init__()
+
+ def _item_to_record(self, item: dict) -> CircuitStateRecord:
+ """Translate a raw DynamoDB item into a :class:`CircuitStateRecord`."""
+ data = self._deserializer.deserialize({"M": item})
+ opened_at = data.get(self.opened_at_attr)
+ probe_lease_expiry = data.get(self.probe_lease_expiry_attr)
+ return CircuitStateRecord(
+ name=data[self.key_attr],
+ state=CircuitState(data[self.state_attr]),
+ failure_count=int(data.get(self.failure_count_attr, 0)),
+ opened_at=int(opened_at) if opened_at is not None else None,
+ half_open_owner=data.get(self.half_open_owner_attr),
+ probe_lease_expiry=int(probe_lease_expiry) if probe_lease_expiry is not None else None,
+ expiry_timestamp=data.get(self.expiry_attr),
+ )
+
+ def _record_to_item(self, record: CircuitStateRecord) -> dict:
+ """Translate a :class:`CircuitStateRecord` into a DynamoDB item."""
+ item: dict = {
+ self.key_attr: {"S": record.name},
+ self.state_attr: {"S": str(record.state)},
+ self.failure_count_attr: {"N": str(record.failure_count)},
+ }
+ if record.opened_at is not None:
+ item[self.opened_at_attr] = {"N": str(record.opened_at)}
+ if record.half_open_owner is not None:
+ item[self.half_open_owner_attr] = {"S": record.half_open_owner}
+ if record.probe_lease_expiry is not None:
+ item[self.probe_lease_expiry_attr] = {"N": str(record.probe_lease_expiry)}
+ if record.expiry_timestamp is not None:
+ item[self.expiry_attr] = {"N": str(record.expiry_timestamp)}
+ return item
+
+ def _get_record(self, name: str) -> CircuitStateRecord:
+ # Eventually consistent on purpose: matches the local cache's stale tolerance
+ # and halves the read cost on the hot path.
+ response = self.client.get_item(
+ TableName=self.table_name,
+ Key={self.key_attr: {"S": name}},
+ ConsistentRead=False,
+ )
+ try:
+ item = response["Item"]
+ except KeyError as exc:
+ raise CircuitBreakerRecordNotFoundError from exc
+ return self._item_to_record(item)
+
+ def _build_half_open_condition(self, expected_opened_at: int | None = None) -> dict:
+ """Build the conditional expression kwargs for a half-open probe election."""
+ condition_parts = [
+ "(#state = :open AND attribute_not_exists(#half_open_owner))",
+ "(#state = :half_open AND #probe_lease_expiry <= :now)",
+ ]
+ expression_attr_names: dict = {
+ "#state": self.state_attr,
+ "#half_open_owner": self.half_open_owner_attr,
+ "#probe_lease_expiry": self.probe_lease_expiry_attr,
+ }
+ expression_values: dict = {
+ ":open": {"S": str(CircuitState.OPEN)},
+ ":half_open": {"S": str(CircuitState.HALF_OPEN)},
+ ":now": {"N": str(int(datetime.datetime.now().timestamp()))},
+ }
+
+ if expected_opened_at is not None:
+ condition_parts[0] = (
+ "(#state = :open AND attribute_not_exists(#half_open_owner) AND #opened_at = :expected_opened_at)"
+ )
+ expression_attr_names["#opened_at"] = self.opened_at_attr
+ expression_values[":expected_opened_at"] = {"N": str(expected_opened_at)}
+
+ return {
+ "ConditionExpression": " OR ".join(condition_parts),
+ "ExpressionAttributeNames": expression_attr_names,
+ "ExpressionAttributeValues": expression_values,
+ }
+
+ def _put_record(
+ self,
+ record: CircuitStateRecord,
+ condition: str | None = None,
+ expected_opened_at: int | None = None,
+ ) -> None:
+ item = self._record_to_item(record)
+
+ put_kwargs: dict = {"TableName": self.table_name, "Item": item}
+
+ if condition == "half_open":
+ put_kwargs.update(self._build_half_open_condition(expected_opened_at))
+
+ try:
+ self.client.put_item(**put_kwargs)
+ except ClientError as exc:
+ if exc.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException":
+ raise CircuitBreakerExistingLockError from exc
+ raise
+
+ def _update_record(self, record: CircuitStateRecord) -> None:
+ update_expression = "SET #state = :state, #failure_count = :failure_count"
+ expression_attr_names = {
+ "#state": self.state_attr,
+ "#failure_count": self.failure_count_attr,
+ }
+ expression_attr_values: dict = {
+ ":state": {"S": str(record.state)},
+ ":failure_count": {"N": str(record.failure_count)},
+ }
+
+ if record.expiry_timestamp is not None:
+ update_expression += ", #expiration = :expiration"
+ expression_attr_names["#expiration"] = self.expiry_attr
+ expression_attr_values[":expiration"] = {"N": str(record.expiry_timestamp)}
+
+ # Clear the half-open owner lock and probe lease on every state change out of
+ # HALF_OPEN, whether the probe closed the circuit (opened_at is None) or reopened
+ # it (opened_at set). Otherwise the stale owner/lease makes the next probe
+ # election's condition fail forever, stranding the circuit.
+ if record.opened_at is not None:
+ update_expression += ", #opened_at = :opened_at REMOVE #half_open_owner, #probe_lease_expiry"
+ expression_attr_names["#opened_at"] = self.opened_at_attr
+ expression_attr_names["#half_open_owner"] = self.half_open_owner_attr
+ expression_attr_names["#probe_lease_expiry"] = self.probe_lease_expiry_attr
+ expression_attr_values[":opened_at"] = {"N": str(record.opened_at)}
+ else:
+ update_expression += " REMOVE #opened_at, #half_open_owner, #probe_lease_expiry"
+ expression_attr_names["#opened_at"] = self.opened_at_attr
+ expression_attr_names["#half_open_owner"] = self.half_open_owner_attr
+ expression_attr_names["#probe_lease_expiry"] = self.probe_lease_expiry_attr
+
+ self.client.update_item(
+ TableName=self.table_name,
+ Key={self.key_attr: {"S": record.name}},
+ UpdateExpression=update_expression,
+ ExpressionAttributeNames=expression_attr_names,
+ ExpressionAttributeValues=expression_attr_values,
+ )
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py
new file mode 100644
index 00000000000..71086c3c1a7
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py
@@ -0,0 +1,63 @@
+"""
+Internal record type for circuit state held in a persistence store.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo, CircuitState
+
+
+@dataclass
+class CircuitStateRecord:
+ """
+ The persisted state of a single circuit.
+
+ One record exists per circuit name. This is the utility's internal representation;
+ user code never sees it directly, only the ``CircuitInfo`` produced by
+ :meth:`to_circuit_info`.
+
+ Parameters
+ ----------
+ name : str
+ Circuit name, used as the partition key in the store.
+ state : CircuitState
+ Current circuit state.
+ failure_count : int
+ Consecutive failures recorded by the environment that last wrote the record.
+ opened_at : int | None
+ Unix timestamp (seconds) the circuit opened. Anchors the recovery timeout;
+ ``None`` while closed.
+ half_open_owner : str | None
+ Identifier of the execution environment that won the half-open probe lock, if any.
+ expiry_timestamp : int | None
+ Unix timestamp (seconds) for the store's TTL attribute.
+ """
+
+ name: str
+ state: CircuitState
+ failure_count: int = 0
+ opened_at: int | None = None
+ half_open_owner: str | None = None
+ probe_lease_expiry: int | None = None
+ expiry_timestamp: int | None = None
+
+ def to_circuit_info(self) -> CircuitInfo:
+ """
+ Project this record to the public ``CircuitInfo`` handed to user code.
+
+ Strips internal fields (``half_open_owner``, ``expiry_timestamp``) so no
+ persistence detail leaks across the public boundary.
+
+ Returns
+ -------
+ CircuitInfo
+ Public snapshot of the circuit.
+ """
+ return CircuitInfo(
+ name=self.name,
+ state=self.state,
+ failure_count=self.failure_count,
+ opened_at=self.opened_at,
+ )
diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py
new file mode 100644
index 00000000000..cc041e17a36
--- /dev/null
+++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py
@@ -0,0 +1,126 @@
+"""
+Public state types for the Circuit Breaker utility.
+
+These are the only circuit-breaker types handed to user code (callbacks and the
+``CircuitInfo`` attached to ``CircuitBreakerOpenError``). They deliberately expose no
+persistence internals.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import Enum
+
+
+class CircuitState(str, Enum):
+ """
+ The state of a circuit.
+
+ Subclasses ``str`` so the value serializes directly to a persistence store as a
+ plain string (e.g. DynamoDB) and compares equal to its string form.
+
+ Attributes
+ ----------
+ CLOSED : str
+ Normal operation. Requests reach the downstream and failures are counted.
+ OPEN : str
+ The downstream is considered unhealthy. The protected call is skipped.
+ HALF_OPEN : str
+ Recovery is being tested. A limited number of probe requests are allowed
+ through to decide whether the circuit should close again.
+ """
+
+ CLOSED = "CLOSED"
+ OPEN = "OPEN"
+ HALF_OPEN = "HALF_OPEN"
+
+ def __str__(self) -> str:
+ """Return the bare value (e.g. ``"OPEN"``) rather than ``CircuitState.OPEN``."""
+ return self.value
+
+
+@dataclass(frozen=True)
+class CircuitInfo:
+ """
+ Immutable snapshot of a circuit, passed to user code.
+
+ This is the public boundary of the utility: it is the single argument (alongside
+ the payload) handed to an ``on_circuit_open`` callback, and it is attached to
+ ``CircuitBreakerOpenError`` so a caller can inspect why the circuit rejected the
+ request. No persistence details (probe lock, TTL) are exposed.
+
+ Parameters
+ ----------
+ name : str
+ The circuit name, as given to the ``@circuit_breaker`` decorator.
+ state : CircuitState
+ The circuit state at the moment the request was evaluated.
+ failure_count : int
+ A point-in-time snapshot of the *consecutive* failures the environment that
+ last wrote the record had counted, captured at the moment of a state
+ transition. It is **not** a running total of failures across the fleet: the
+ failure counter lives in memory per execution environment (so the healthy path
+ stays write-free), and only the tripping environment's count is persisted when
+ the circuit opens. It is ``0`` in states reached without a fresh trip (for
+ example ``HALF_OPEN``, or ``OPEN`` re-entered after a failed probe). For failure
+ *volume*, emit a CloudWatch metric from your own code or an ``on_transition``
+ hook rather than reading this field.
+ opened_at : int | None
+ Unix timestamp (seconds) at which the circuit opened, or ``None`` while the
+ circuit is closed. Drives the recovery timeout.
+
+ Example
+ -------
+ **Inspecting circuit details inside a callback**
+
+ def on_open(payload: dict, circuit: CircuitInfo):
+ logger.warning("circuit %s open since %s", circuit.name, circuit.opened_at)
+ return {"statusCode": 503}
+ """
+
+ name: str
+ state: CircuitState
+ failure_count: int
+ opened_at: int | None = None
+
+
+@dataclass(frozen=True)
+class CircuitTransition:
+ """
+ Immutable description of a circuit state change, passed to an ``on_transition`` hook.
+
+ The hook fires only on the rare state transitions a circuit makes (open, probe,
+ close, reopen), never on the per-invocation hot path, so emitting a metric from it
+ does not undermine the write-free healthy path.
+
+ Parameters
+ ----------
+ circuit_name : str
+ The circuit name, as given to the ``@circuit_breaker`` decorator.
+ from_state : CircuitState
+ The state the circuit was in before the transition.
+ to_state : CircuitState
+ The state the circuit moved to.
+ opened_at : int | None
+ Unix timestamp (seconds) the circuit opened, when relevant to the new state.
+
+ Example
+ -------
+ **Emit a CloudWatch metric per transition**
+
+ from aws_lambda_powertools.metrics import MetricUnit, single_metric
+
+ def emit(transition: CircuitTransition) -> None:
+ with single_metric(
+ namespace="MyApp",
+ name=f"Circuit{transition.to_state}",
+ unit=MetricUnit.Count,
+ value=1,
+ ) as metric:
+ metric.add_dimension(name="circuit", value=transition.circuit_name)
+ """
+
+ circuit_name: str
+ from_state: CircuitState
+ to_state: CircuitState
+ opened_at: int | None = None
diff --git a/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md b/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md
new file mode 100644
index 00000000000..824b40225bc
--- /dev/null
+++ b/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md
@@ -0,0 +1,2 @@
+
+::: aws_lambda_powertools.utilities.circuit_breaker_alpha.circuit_breaker
diff --git a/docs/api_doc/circuit_breaker_alpha/config.md b/docs/api_doc/circuit_breaker_alpha/config.md
new file mode 100644
index 00000000000..20b548082c1
--- /dev/null
+++ b/docs/api_doc/circuit_breaker_alpha/config.md
@@ -0,0 +1,2 @@
+
+::: aws_lambda_powertools.utilities.circuit_breaker_alpha.config
diff --git a/docs/api_doc/circuit_breaker_alpha/exceptions.md b/docs/api_doc/circuit_breaker_alpha/exceptions.md
new file mode 100644
index 00000000000..283374e42e5
--- /dev/null
+++ b/docs/api_doc/circuit_breaker_alpha/exceptions.md
@@ -0,0 +1,2 @@
+
+::: aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions
diff --git a/docs/api_doc/circuit_breaker_alpha/persistence.md b/docs/api_doc/circuit_breaker_alpha/persistence.md
new file mode 100644
index 00000000000..f865dfd10f0
--- /dev/null
+++ b/docs/api_doc/circuit_breaker_alpha/persistence.md
@@ -0,0 +1,2 @@
+
+::: aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence
diff --git a/docs/api_doc/circuit_breaker_alpha/states.md b/docs/api_doc/circuit_breaker_alpha/states.md
new file mode 100644
index 00000000000..c6f232a0e28
--- /dev/null
+++ b/docs/api_doc/circuit_breaker_alpha/states.md
@@ -0,0 +1,2 @@
+
+::: aws_lambda_powertools.utilities.circuit_breaker_alpha.states
diff --git a/docs/utilities/circuit_breaker.md b/docs/utilities/circuit_breaker.md
new file mode 100644
index 00000000000..a59d3fde292
--- /dev/null
+++ b/docs/utilities/circuit_breaker.md
@@ -0,0 +1,228 @@
+---
+title: Circuit Breaker
+description: Utility
+---
+
+
+
+!!! warning "Alpha / experimental"
+ This utility ships under the **`circuit_breaker_alpha`** namespace while we collect
+ feedback. The public API may change in a backwards-incompatible way before it is
+ promoted to GA, at which point the import path becomes `circuit_breaker`. Pin your
+ Powertools version and follow the tracking discussion before relying on it in
+ production.
+
+The circuit breaker utility stops sending traffic to an unhealthy downstream dependency, giving it room to recover while you decide what happens to the rejected requests.
+
+## Key features
+
+* Stops calling an unhealthy downstream after a configurable number of consecutive failures
+* Hands rejected requests to an `on_circuit_open` callback so you decide what happens next (buffer, drop, return a cached value)
+* Tests recovery with an explicit half-open probe rather than blindly retrying everything at once
+* Shares circuit state across execution environments via Amazon DynamoDB
+* Keeps the healthy path write-free: failures are counted in memory and only persisted on a state transition
+
+## Terminology
+
+**Circuit** is a named guard around a single downstream dependency. Each `name` is an independent circuit.
+
+**State** is the circuit's current mode: `CLOSED` (normal), `OPEN` (downstream considered unhealthy, calls skipped), or `HALF_OPEN` (testing recovery).
+
+**Persistence layer** is the shared storage that holds each circuit's state so every execution environment agrees on whether a circuit is open.
+
+**Recovery timeout** is how long a circuit stays open before allowing a half-open probe.
+
+
+```mermaid
+stateDiagram-v2
+ [*] --> CLOSED
+ CLOSED --> OPEN: N consecutive failures
+ OPEN --> HALF_OPEN: recovery timeout elapsed
+ HALF_OPEN --> CLOSED: probe succeeds
+ HALF_OPEN --> OPEN: probe fails
+```
+
+Circuit breaker state transitions
+
+
+## Getting started
+
+We use Amazon DynamoDB as the persistence layer in this documentation.
+
+### IAM Permissions
+
+When using Amazon DynamoDB as the persistence layer, you will need the following IAM permissions:
+
+| IAM Permission | Operation |
+| ------------------------------------ | ------------------------------------------------------- |
+| **`dynamodb:GetItem`**{: .copyMe} | Read shared circuit state |
+| **`dynamodb:PutItem`**{: .copyMe} | Persist an opened circuit and elect the half-open probe |
+| **`dynamodb:UpdateItem`**{: .copyMe} | Close or reopen a circuit after a probe |
+
+### Required resources
+
+To start, you'll need:
+
+
+
+
+* **Persistent storage**
+
+ ---
+
+ [Amazon DynamoDB](#dynamodb-table)
+
+* **AWS Lambda function**
+
+ ---
+
+ With permissions to read and write your persistent storage
+
+
+
+
+
+#### DynamoDB table
+
+Unless you're looking to [customize each attribute](#customizing-the-dynamodb-table), you only need the following:
+
+| Configuration | Value | Notes |
+| ------------------ | ------------ | ------------------------------------------------------------ |
+| Partition key | `id` | Holds the circuit name |
+| TTL attribute name | `expiration` | Using AWS Console? This is configurable after table creation |
+
+You **can** use a single DynamoDB table for all your circuits.
+
+##### DynamoDB IaC example
+
+=== "AWS Serverless Application Model (SAM) example"
+
+ ```yaml hl_lines="3-15 24-29 32-33"
+ --8<-- "examples/circuit_breaker_alpha/templates/sam.yaml"
+ ```
+
+### Circuit breaker in action
+
+The common case is the `@circuit_breaker` decorator wrapping the function that makes the downstream call. With no `config`, sensible defaults apply (open after 5 consecutive failures, probe after 30 seconds, close after 3 probe successes, count any exception as a failure).
+
+=== "getting_started_with_circuit_breaker.py"
+
+ ```python hl_lines="3-6 9 19 27"
+ --8<-- "examples/circuit_breaker_alpha/src/getting_started_with_circuit_breaker.py"
+ ```
+
+!!! note "Wrap the downstream call, not the whole handler"
+ The circuit protects a single dependency. If you decorate a handler that parses the
+ event, validates, and calls two backends, a parsing bug would trip a circuit named
+ after a backend that is perfectly healthy. Decorate the handler directly only when
+ the handler **is** the downstream call (a thin pass-through).
+
+### What the decorated function returns
+
+There is no wrapper type to inspect. The contract is:
+
+| Circuit state | Result |
+| ------------------------------------- | ---------------------------------------------------------------------- |
+| **Closed** | The protected function's return value |
+| **Open**, `on_circuit_open` set | Whatever the callback returns |
+| **Open**, no callback | Raises `CircuitBreakerOpenError` (with the `CircuitInfo` attached) |
+
+## Handling an open circuit
+
+### With a callback
+
+Register an `on_circuit_open` callback to decide what happens to a rejected request. The callback receives the same arguments the protected function was called with (positional arguments stay positional, keyword arguments stay keyword), plus a trailing `circuit` keyword argument carrying a `CircuitInfo` snapshot. Its return value becomes the result of the call.
+
+=== "working_with_callback.py"
+
+ ```python hl_lines="6 24-28 31-35"
+ --8<-- "examples/circuit_breaker_alpha/src/working_with_callback.py"
+ ```
+
+!!! info "Why a callback instead of built-in S3/SQS sinks?"
+ A managed sink would have to own client setup, payload-size handling, retries, and
+ IAM, and it would leak *where* the payload landed back to the caller. A one-line
+ callback does the same thing with full control and no lock-in, so the utility stays
+ out of your way.
+
+### Without a callback
+
+If no callback is registered, an open circuit raises `CircuitBreakerOpenError`. Catch it to decide how to respond. The exception carries a `circuit` attribute (`CircuitInfo`) so you can inspect why the request was rejected.
+
+=== "working_without_callback.py"
+
+ ```python hl_lines="5 16-22 38-43"
+ --8<-- "examples/circuit_breaker_alpha/src/working_without_callback.py"
+ ```
+
+## Configuration
+
+All options live on `CircuitBreakerConfig`. Every value has a default, so `CircuitBreakerConfig()` is a valid production configuration.
+
+| Parameter | Default | Description |
+| ------------------------- | ------- | ------------------------------------------------------------------------------------------------- |
+| **`failure_threshold`** | `5` | Consecutive failures that trip a closed circuit to open |
+| **`recovery_timeout`** | `30` | Seconds the circuit stays open before a half-open probe |
+| **`success_threshold`** | `3` | Consecutive probe successes required to close a half-open circuit |
+| **`handled_exceptions`** | `None` | Allowlist: only these exception types count as failures. Mutually exclusive with the denylist |
+| **`ignored_exceptions`** | `None` | Denylist: every exception counts as a failure except these. Mutually exclusive with the allowlist |
+| **`local_cache_max_age`** | `5` | Seconds a circuit's state is cached per environment before a read-through |
+
+### Choosing which exceptions count as a failure
+
+By default, **any exception** counts as a failure. But not every error means the downstream is unhealthy: a `400` is the caller's fault, a `503` is not. Scope it from either side:
+
+* **`handled_exceptions`** (allowlist): only these count. Everything else propagates without affecting the circuit.
+* **`ignored_exceptions`** (denylist): everything counts except these.
+
+Passing both raises `CircuitBreakerConfigError`. An exception that doesn't count is re-raised to the caller untouched.
+
+## Advanced
+
+### How recovery works
+
+After `recovery_timeout` seconds, the circuit moves to `HALF_OPEN` and a **single** execution environment is elected (via a conditional DynamoDB write) to run a probe. If `success_threshold` consecutive probes succeed, the circuit closes; a single failing probe reopens it. This avoids a thundering herd of every environment hammering a recovering backend at once.
+
+### State coordination across environments
+
+The consecutive-failure counter lives in memory per execution environment, so a healthy circuit performs **no writes**. Only when an environment reaches `failure_threshold` does it persist `OPEN`. The shared state is cached locally for `local_cache_max_age` seconds to avoid a read per invocation. A cache miss (cold start or expired entry) forces a read-through before routing.
+
+!!! note "Fail-open by design"
+ If the persistence store cannot be reached when reading state, the circuit is treated
+ as **closed**. A circuit breaker should never become the outage it is meant to prevent.
+
+### Observability with metrics
+
+Register an `on_transition` hook to be notified whenever the circuit changes state (open, probe, close, reopen). The hook fires **only on transitions**, never on the per-invocation hot path, so it is a safe place to emit a CloudWatch metric without giving up the write-free healthy path. It receives a single `CircuitTransition` (`circuit_name`, `from_state`, `to_state`, `opened_at`).
+
+=== "working_with_metrics.py"
+
+ ```python hl_lines="3 12-21 26"
+ --8<-- "examples/circuit_breaker_alpha/src/working_with_metrics.py"
+ ```
+
+Any exception raised inside the hook is swallowed and logged, so a misbehaving metric call can never break the protected request.
+
+!!! warning "`failure_count` is a trip-time snapshot, not a running total"
+ The `failure_count` on `CircuitInfo` is the number of *consecutive* failures the
+ environment that tripped the circuit had counted at the moment it opened. Because the
+ failure counter lives in memory per execution environment (keeping the healthy path
+ write-free), it is **not** a fleet-wide total and reads `0` in states reached without a
+ fresh trip (such as `HALF_OPEN`). For failure **volume**, emit a metric from your own
+ code or the `on_transition` hook rather than reading this field.
+
+### Disabling the circuit breaker
+
+Set **`POWERTOOLS_CIRCUIT_BREAKER_DISABLED`**{: .copyMe} to a truthy value to bypass the circuit entirely and always call the protected function. This is intended for **development environments only** and emits a warning.
+
+### Customizing the DynamoDB table
+
+`CircuitBreakerDynamoDBPersistence` accepts attribute-name overrides (`key_attr`, `state_attr`, `failure_count_attr`, `opened_at_attr`, `half_open_owner_attr`, `expiry_attr`) and the usual boto3 escape hatches (`boto3_session`, `boto3_client`, `boto_config`) for reusing an existing table layout or client.
+
+## Testing your code
+
+When unit testing the function a circuit protects, set `POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true` to bypass the circuit and persistence layer entirely, so your tests exercise the business logic without needing DynamoDB.
+
+```bash title="Disabling circuit breaker for tests"
+POWERTOOLS_CIRCUIT_BREAKER_DISABLED=true python -m pytest
+```
diff --git a/examples/circuit_breaker_alpha/src/getting_started_with_circuit_breaker.py b/examples/circuit_breaker_alpha/src/getting_started_with_circuit_breaker.py
new file mode 100644
index 00000000000..ad9a4de1066
--- /dev/null
+++ b/examples/circuit_breaker_alpha/src/getting_started_with_circuit_breaker.py
@@ -0,0 +1,29 @@
+import os
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
+persistence = CircuitBreakerDynamoDBPersistence(table_name=table)
+
+
+class PaymentBackend:
+ def charge(self, order: dict): ...
+
+
+payment_api = PaymentBackend()
+
+
+@circuit_breaker(name="payment-backend", persistence_store=persistence)
+def charge(order: dict) -> dict:
+ return payment_api.charge(order) # the protected downstream call
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ # Circuit closed -> charge() runs and returns the backend response.
+ # Circuit open -> charge() is skipped and CircuitBreakerOpenError is raised,
+ # because no on_circuit_open callback is registered.
+ return charge(event)
diff --git a/examples/circuit_breaker_alpha/src/working_with_callback.py b/examples/circuit_breaker_alpha/src/working_with_callback.py
new file mode 100644
index 00000000000..d2b97ad6ac5
--- /dev/null
+++ b/examples/circuit_breaker_alpha/src/working_with_callback.py
@@ -0,0 +1,44 @@
+import json
+import os
+
+import boto3
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha import CircuitInfo, circuit_breaker
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
+queue_url = os.getenv("OVERFLOW_QUEUE_URL", "")
+persistence = CircuitBreakerDynamoDBPersistence(table_name=table)
+sqs = boto3.client("sqs")
+
+
+class PaymentBackend:
+ def charge(self, order: dict): ...
+
+
+payment_api = PaymentBackend()
+
+
+def buffer_payload(order: dict, circuit: CircuitInfo) -> dict:
+ # Circuit is OPEN. The protected call never ran and the payload is yours to handle:
+ # buffer it, drop it, or return a cached value. Here we push it to an overflow queue.
+ sqs.send_message(QueueUrl=queue_url, MessageBody=json.dumps(order))
+ return {"statusCode": 202, "circuit": circuit.name}
+
+
+@circuit_breaker(
+ name="payment-backend",
+ persistence_store=persistence,
+ on_circuit_open=buffer_payload,
+)
+def charge(order: dict) -> dict:
+ return payment_api.charge(order)
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ # Circuit closed -> returns the backend response.
+ # Circuit open -> buffer_payload(event, circuit) runs and its return value is returned.
+ return charge(event)
diff --git a/examples/circuit_breaker_alpha/src/working_with_metrics.py b/examples/circuit_breaker_alpha/src/working_with_metrics.py
new file mode 100644
index 00000000000..f37f1b68351
--- /dev/null
+++ b/examples/circuit_breaker_alpha/src/working_with_metrics.py
@@ -0,0 +1,45 @@
+import os
+
+from aws_lambda_powertools.metrics import MetricUnit, single_metric
+from aws_lambda_powertools.utilities.circuit_breaker_alpha import (
+ CircuitTransition,
+ circuit_breaker,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
+persistence = CircuitBreakerDynamoDBPersistence(table_name=table)
+
+
+def emit_transition_metric(transition: CircuitTransition) -> None:
+ # Fires only when the circuit changes state, so this never runs on the hot path.
+ with single_metric(
+ namespace="MyApplication",
+ name=f"Circuit_{transition.to_state}",
+ unit=MetricUnit.Count,
+ value=1,
+ ) as metric:
+ metric.add_dimension(name="circuit", value=transition.circuit_name)
+
+
+class PaymentBackend:
+ def charge(self, order: dict): ...
+
+
+payment_api = PaymentBackend()
+
+
+@circuit_breaker(
+ name="payment-backend",
+ persistence_store=persistence,
+ on_transition=emit_transition_metric,
+)
+def charge(order: dict) -> dict:
+ return payment_api.charge(order)
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ return charge(event)
diff --git a/examples/circuit_breaker_alpha/src/working_without_callback.py b/examples/circuit_breaker_alpha/src/working_without_callback.py
new file mode 100644
index 00000000000..a5f166438c8
--- /dev/null
+++ b/examples/circuit_breaker_alpha/src/working_without_callback.py
@@ -0,0 +1,44 @@
+import os
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha import (
+ CircuitBreakerConfig,
+ CircuitBreakerOpenError,
+ circuit_breaker,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+table = os.getenv("CIRCUIT_BREAKER_TABLE", "")
+persistence = CircuitBreakerDynamoDBPersistence(table_name=table)
+
+config = CircuitBreakerConfig(
+ failure_threshold=5, # consecutive failures before opening
+ recovery_timeout=30, # seconds in OPEN before a half-open probe
+ success_threshold=3, # consecutive probe successes before closing
+ # Only these exceptions count as a failure. A ValueError (caller's fault) is
+ # re-raised without affecting the circuit.
+ handled_exceptions=(TimeoutError, ConnectionError),
+)
+
+
+class PaymentBackend:
+ def charge(self, order: dict): ...
+
+
+payment_api = PaymentBackend()
+
+
+@circuit_breaker(name="payment-backend", persistence_store=persistence, config=config)
+def charge(order: dict) -> dict:
+ return payment_api.charge(order)
+
+
+def lambda_handler(event: dict, context: LambdaContext):
+ try:
+ return charge(event)
+ except CircuitBreakerOpenError as exc:
+ # No callback registered, so we decide what to do with the rejected request here.
+ circuit_name = exc.circuit.name if exc.circuit else "unknown"
+ return {"statusCode": 202, "circuit": circuit_name}
diff --git a/examples/circuit_breaker_alpha/templates/sam.yaml b/examples/circuit_breaker_alpha/templates/sam.yaml
new file mode 100644
index 00000000000..686a2caaecf
--- /dev/null
+++ b/examples/circuit_breaker_alpha/templates/sam.yaml
@@ -0,0 +1,33 @@
+Transform: AWS::Serverless-2016-10-31
+Resources:
+ CircuitBreakerTable:
+ Type: AWS::DynamoDB::Table
+ Properties:
+ AttributeDefinitions:
+ - AttributeName: id
+ AttributeType: S
+ KeySchema:
+ - AttributeName: id
+ KeyType: HASH
+ TimeToLiveSpecification:
+ AttributeName: expiration
+ Enabled: true
+ BillingMode: PAY_PER_REQUEST
+
+ HelloWorldFunction:
+ Type: AWS::Serverless::Function
+ Properties:
+ Runtime: python3.12
+ Handler: app.py
+ Policies:
+ - Statement:
+ - Sid: AllowDynamodbReadWrite
+ Effect: Allow
+ Action:
+ - dynamodb:GetItem
+ - dynamodb:PutItem
+ - dynamodb:UpdateItem
+ Resource: !GetAtt CircuitBreakerTable.Arn
+ Environment:
+ Variables:
+ CIRCUIT_BREAKER_TABLE: !Ref CircuitBreakerTable
diff --git a/mkdocs.yml b/mkdocs.yml
index 6bea356bac6..265560a55d5 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -33,6 +33,7 @@ nav:
- utilities/data_classes.md
- utilities/parser.md
- utilities/idempotency.md
+ - utilities/circuit_breaker.md
- utilities/data_masking.md
- utilities/feature_flags.md
- utilities/metadata.md
@@ -105,6 +106,12 @@ nav:
- Exceptions: api_doc/feature_flags/exceptions.md
- Feature flags: api_doc/feature_flags/feature_flags.md
- Schema: api_doc/feature_flags/schema.md
+ - Circuit Breaker (alpha):
+ - Circuit Breaker: api_doc/circuit_breaker_alpha/circuit_breaker.md
+ - Config: api_doc/circuit_breaker_alpha/config.md
+ - States: api_doc/circuit_breaker_alpha/states.md
+ - Exceptions: api_doc/circuit_breaker_alpha/exceptions.md
+ - Persistence: api_doc/circuit_breaker_alpha/persistence.md
- Idempotency:
- Base: api_doc/idempotency/base.md
- Config: api_doc/idempotency/config.md
@@ -248,6 +255,7 @@ plugins:
- utilities/data_classes.md
- utilities/parser.md
- utilities/idempotency.md
+ - utilities/circuit_breaker.md
- utilities/data_masking.md
- utilities/feature_flags.md
- utilities/metadata.md
diff --git a/tests/functional/circuit_breaker_alpha/__init__.py b/tests/functional/circuit_breaker_alpha/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/functional/circuit_breaker_alpha/conftest.py b/tests/functional/circuit_breaker_alpha/conftest.py
new file mode 100644
index 00000000000..5ea47ee6cd0
--- /dev/null
+++ b/tests/functional/circuit_breaker_alpha/conftest.py
@@ -0,0 +1,104 @@
+from __future__ import annotations
+
+import pytest
+
+import aws_lambda_powertools.utilities.circuit_breaker_alpha.base as base_module
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.base import CircuitBreakerHandler
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
+ CircuitBreakerExistingLockError,
+ CircuitBreakerPersistenceLayer,
+ CircuitBreakerRecordNotFoundError,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
+
+
+class FakePersistence(CircuitBreakerPersistenceLayer):
+ """In-memory store for exercising the handler state machine without DynamoDB."""
+
+ def __init__(self):
+ self.db: dict[str, CircuitStateRecord] = {}
+ super().__init__()
+
+ def _get_record(self, name: str) -> CircuitStateRecord:
+ if name not in self.db:
+ raise CircuitBreakerRecordNotFoundError
+ stored = self.db[name]
+ # Return a copy so the handler can't mutate stored state by reference.
+ return CircuitStateRecord(
+ name=stored.name,
+ state=stored.state,
+ failure_count=stored.failure_count,
+ opened_at=stored.opened_at,
+ half_open_owner=stored.half_open_owner,
+ probe_lease_expiry=stored.probe_lease_expiry,
+ )
+
+ def _put_record(
+ self,
+ record: CircuitStateRecord,
+ condition: str | None = None,
+ expected_opened_at: int | None = None,
+ ) -> None:
+ if condition == "half_open":
+ existing = self.db.get(record.name)
+ now = CircuitBreakerHandler._now()
+
+ # Mirror the DynamoDB condition: two valid paths
+ # Path 1: state=OPEN AND no owner (AND opened_at matches if provided)
+ # Path 2: state=HALF_OPEN AND probe_lease_expiry <= now (lease takeover)
+ fresh_election_ok = existing is None or (
+ existing.state == CircuitState.OPEN
+ and existing.half_open_owner is None
+ and (expected_opened_at is None or existing.opened_at == expected_opened_at)
+ )
+ lease_takeover_ok = (
+ existing is not None
+ and existing.state == CircuitState.HALF_OPEN
+ and existing.probe_lease_expiry is not None
+ and now >= existing.probe_lease_expiry
+ )
+
+ if not fresh_election_ok and not lease_takeover_ok:
+ raise CircuitBreakerExistingLockError
+ self.db[record.name] = record
+
+ def _update_record(self, record: CircuitStateRecord) -> None:
+ # Mirror DynamoDB UpdateItem semantics: a partial merge driven by which
+ # attributes the backend actually writes, NOT a wholesale replace. This is
+ # what exposes attributes the update path forgets to clear (e.g. a stale
+ # half_open_owner left behind on reopen).
+ existing = self.db.get(record.name)
+ if existing is None:
+ self.db[record.name] = record
+ return
+ existing.state = record.state
+ existing.failure_count = record.failure_count
+ existing.expiry_timestamp = record.expiry_timestamp
+ # Leaving HALF_OPEN (close or reopen) always releases the probe-owner lock and
+ # probe lease; only opened_at differs between the two transitions.
+ existing.half_open_owner = None
+ existing.probe_lease_expiry = None
+ existing.opened_at = record.opened_at
+
+
+@pytest.fixture
+def store() -> FakePersistence:
+ return FakePersistence()
+
+
+@pytest.fixture(autouse=True)
+def reset_local_counters():
+ """Clear the per-environment module-level counters between tests."""
+ base_module._LOCAL_FAILURES.clear()
+ base_module._LOCAL_SUCCESSES.clear()
+ base_module._LAST_OBSERVED_STATE.clear()
+ yield
+ base_module._LOCAL_FAILURES.clear()
+ base_module._LOCAL_SUCCESSES.clear()
+ base_module._LAST_OBSERVED_STATE.clear()
+
+
+@pytest.fixture
+def now() -> int:
+ return base_module.CircuitBreakerHandler._now()
diff --git a/tests/functional/circuit_breaker_alpha/test_circuit_breaker.py b/tests/functional/circuit_breaker_alpha/test_circuit_breaker.py
new file mode 100644
index 00000000000..1227291ee66
--- /dev/null
+++ b/tests/functional/circuit_breaker_alpha/test_circuit_breaker.py
@@ -0,0 +1,604 @@
+from __future__ import annotations
+
+import warnings
+
+import pytest
+
+import aws_lambda_powertools.utilities.circuit_breaker_alpha.base as base_module
+from aws_lambda_powertools.utilities.circuit_breaker_alpha import (
+ CircuitBreakerConfig,
+ CircuitBreakerOpenError,
+ CircuitState,
+ CircuitTransition,
+ circuit_breaker,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerError
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+
+# All tests disable the local read cache (max_age=0) so each call re-reads the fake store.
+
+
+def test_closed_circuit_returns_result_and_writes_nothing(store):
+ @circuit_breaker(name="c", persistence_store=store, config=CircuitBreakerConfig(local_cache_max_age=0))
+ def call(value):
+ return value * 2
+
+ assert call(21) == 42
+ assert store.db == {}, "healthy path must not write to the store"
+
+
+def test_trips_open_after_failure_threshold(store):
+ config = CircuitBreakerConfig(failure_threshold=3, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ConnectionError("downstream down")
+
+ for _ in range(3):
+ with pytest.raises(ConnectionError):
+ call()
+
+ assert store.db["c"].state == CircuitState.OPEN
+ assert store.db["c"].opened_at is not None
+
+
+def test_open_with_callback_returns_callback_value_without_calling_protected(store, now):
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now)
+ protected_ran = {"value": False}
+
+ def on_open(order, circuit):
+ return {"buffered": order, "state": str(circuit.state)}
+
+ config = CircuitBreakerConfig(recovery_timeout=9999, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, on_circuit_open=on_open, config=config)
+ def charge(order):
+ protected_ran["value"] = True
+ return f"charged {order}"
+
+ result = charge({"id": 1})
+ assert result == {"buffered": {"id": 1}, "state": "OPEN"}
+ assert protected_ran["value"] is False
+
+
+def test_open_without_callback_raises_with_circuit_info(store, now):
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=7, opened_at=now)
+ config = CircuitBreakerConfig(recovery_timeout=9999, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def charge(order):
+ return f"charged {order}"
+
+ with pytest.raises(CircuitBreakerOpenError) as exc_info:
+ charge({"id": 1})
+
+ assert exc_info.value.circuit.name == "c"
+ assert exc_info.value.circuit.failure_count == 7
+
+
+def test_half_open_probe_success_closes_after_success_threshold(store, now):
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now - 100)
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=2, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "ok"
+
+ call() # wins the probe lock, state becomes HALF_OPEN
+ assert store.db["c"].state == CircuitState.HALF_OPEN
+
+ call() # second consecutive probe success closes the circuit
+ assert store.db["c"].state == CircuitState.CLOSED
+
+
+def test_half_open_probe_failure_reopens(store, now):
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now - 100)
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=2, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ConnectionError("still down")
+
+ with pytest.raises(ConnectionError):
+ call()
+
+ assert store.db["c"].state == CircuitState.OPEN
+
+
+# --------------------------------------------------------------------------- bug regressions
+
+
+def test_half_open_can_be_reacquired_after_failed_probe_reopens(store, now):
+ # Bug #1: a failed probe (HALF_OPEN -> OPEN) must clear the half_open_owner so a
+ # later recovery window can elect a prober again. Otherwise the circuit is stuck
+ # OPEN forever because attribute_not_exists(half_open_owner) never holds again.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now - 100)
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=2, local_cache_max_age=0)
+
+ outcomes = iter([ConnectionError("still down"), "recovered"])
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ result = next(outcomes)
+ if isinstance(result, Exception):
+ raise result
+ return result
+
+ # First recovery window: env wins the probe, probe fails, circuit reopens.
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+ assert store.db["c"].half_open_owner is None, "reopen must release the probe lock"
+
+ # Second recovery window (opened_at is fresh, push it into the past again).
+ store.db["c"].opened_at = now - 100
+ # Downstream recovered: the env must be able to acquire the probe again and run it.
+ assert call() == "recovered"
+ assert store.db["c"].state == CircuitState.HALF_OPEN
+
+
+def test_open_callback_receives_keyword_arguments_intact(store, now):
+ # Bug #3: the callback must receive the same kwargs the protected function got,
+ # as kwargs, not flattened into positional values.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now)
+ config = CircuitBreakerConfig(recovery_timeout=9999, local_cache_max_age=0)
+
+ captured = {}
+
+ def on_open(order, customer, circuit):
+ captured["order"] = order
+ captured["customer"] = customer
+ return "buffered"
+
+ @circuit_breaker(name="c", persistence_store=store, on_circuit_open=on_open, config=config)
+ def charge(order, customer):
+ return "charged"
+
+ # Called entirely with keyword arguments, deliberately out of signature order.
+ assert charge(customer="alice", order={"id": 1}) == "buffered"
+ assert captured["order"] == {"id": 1}
+ assert captured["customer"] == "alice"
+
+
+def test_consecutive_failures_trip_but_a_success_resets_the_streak(store):
+ # The failure counter is per-environment and counts *consecutive* failures as this
+ # env sees them: any success in between must reset the streak.
+ config = CircuitBreakerConfig(failure_threshold=3, local_cache_max_age=0)
+
+ should_fail = {"value": True}
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ if should_fail["value"]:
+ raise ConnectionError("down")
+ return "ok"
+
+ # Two failures, then a success: the streak resets, so the circuit must NOT be tripping.
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ call()
+ should_fail["value"] = False
+ assert call() == "ok"
+ assert "c" not in store.db
+
+ # A fresh run of 3 consecutive failures from here must trip it.
+ should_fail["value"] = True
+ for _ in range(3):
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+
+
+def test_circuit_can_retrip_after_a_previous_close(store):
+ # Regression guard: a healthy circuit's steady state is a *persisted CLOSED* record.
+ # Reading that record must not reset the running failure counter, or the circuit
+ # could never trip again after it has closed once.
+ config = CircuitBreakerConfig(failure_threshold=3, local_cache_max_age=0)
+
+ # Pretend a prior recovery left a persisted CLOSED record behind.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.CLOSED, failure_count=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ConnectionError("down again")
+
+ for _ in range(3):
+ with pytest.raises(ConnectionError):
+ call()
+
+ assert store.db["c"].state == CircuitState.OPEN, "circuit must re-trip even with a prior CLOSED record present"
+
+
+def test_full_lifecycle_survives_multiple_recovery_cycles(store):
+ # End-to-end guard tying #1 together: the circuit must cycle OPEN -> HALF_OPEN ->
+ # CLOSED, re-trip, survive a failed probe (which reopens and must release the lock),
+ # and then recover again. Before the owner-release fix the second recovery dead-locked.
+ config = CircuitBreakerConfig(
+ failure_threshold=2,
+ recovery_timeout=30,
+ success_threshold=2,
+ local_cache_max_age=0,
+ )
+ mode = {"value": "fail"}
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ if mode["value"] == "fail":
+ raise ConnectionError("down")
+ return "ok"
+
+ def elapse_recovery_window():
+ store.db["c"].opened_at -= 100
+
+ # Trip open.
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+
+ # Recover through two successful probes.
+ mode["value"] = "ok"
+ elapse_recovery_window()
+ assert call() == "ok"
+ assert store.db["c"].state == CircuitState.HALF_OPEN
+ assert call() == "ok"
+ assert store.db["c"].state == CircuitState.CLOSED
+
+ # Re-trip from the now-CLOSED state.
+ mode["value"] = "fail"
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+
+ # A failed probe reopens it and must release the probe lock.
+ elapse_recovery_window()
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+ assert store.db["c"].half_open_owner is None
+
+ # The lock being free, recovery must be possible again.
+ mode["value"] = "ok"
+ elapse_recovery_window()
+ assert call() == "ok"
+ assert store.db["c"].state == CircuitState.HALF_OPEN
+ assert call() == "ok"
+ assert store.db["c"].state == CircuitState.CLOSED
+
+
+def test_opened_at_zero_is_treated_as_a_real_timestamp(store):
+ # Bug #7: opened_at == 0 is a valid (if pathological) epoch timestamp, not "missing".
+ # `record.opened_at or self._now()` wrongly re-anchors it to now, pinning OPEN forever.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=0)
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=1, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "recovered"
+
+ # opened_at=0 is far in the past, so the recovery window has long elapsed:
+ # the call must be allowed to probe, not short-circuited.
+ assert call() == "recovered"
+ assert store.db["c"].state in (CircuitState.HALF_OPEN, CircuitState.CLOSED)
+
+
+def test_ignored_exception_does_not_trip_circuit(store):
+ config = CircuitBreakerConfig(
+ failure_threshold=2,
+ handled_exceptions=(ConnectionError,),
+ local_cache_max_age=0,
+ )
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ValueError("caller error, not a downstream failure")
+
+ for _ in range(5):
+ with pytest.raises(ValueError):
+ call()
+
+ assert "c" not in store.db
+
+
+def test_disabled_env_bypasses_circuit(store, now, monkeypatch):
+ monkeypatch.setenv("POWERTOOLS_CIRCUIT_BREAKER_DISABLED", "true")
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=9, opened_at=now)
+ config = CircuitBreakerConfig(recovery_timeout=9999, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "ran anyway"
+
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ assert call() == "ran anyway"
+
+
+def test_config_is_optional(store):
+ @circuit_breaker(name="c", persistence_store=store)
+ def call():
+ return "ok"
+
+ assert call() == "ok"
+
+
+# --------------------------------------------------------------------------- on_transition hook
+
+
+def test_on_transition_fires_for_each_state_change(store):
+ transitions: list[CircuitTransition] = []
+ config = CircuitBreakerConfig(
+ failure_threshold=2,
+ recovery_timeout=30,
+ success_threshold=1,
+ local_cache_max_age=0,
+ )
+ mode = {"value": "fail"}
+
+ @circuit_breaker(
+ name="c",
+ persistence_store=store,
+ on_transition=transitions.append,
+ config=config,
+ )
+ def call():
+ if mode["value"] == "fail":
+ raise ConnectionError("down")
+ return "ok"
+
+ # CLOSED -> OPEN after 2 failures.
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ call()
+ # OPEN -> HALF_OPEN (election) -> CLOSED (success_threshold=1) on the recovery probe.
+ mode["value"] = "ok"
+ store.db["c"].opened_at -= 100
+ assert call() == "ok"
+
+ pairs = [(t.from_state, t.to_state) for t in transitions]
+ assert pairs == [
+ (CircuitState.CLOSED, CircuitState.OPEN),
+ (CircuitState.OPEN, CircuitState.HALF_OPEN),
+ (CircuitState.HALF_OPEN, CircuitState.CLOSED),
+ ]
+ assert all(t.circuit_name == "c" for t in transitions)
+ # opened_at carried on the open/probe transitions, absent on close.
+ assert transitions[0].opened_at is not None
+ assert transitions[-1].opened_at is None
+
+
+def test_on_transition_fires_on_failed_probe_reopen(store, now):
+ transitions: list[CircuitTransition] = []
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now - 100)
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=2, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, on_transition=transitions.append, config=config)
+ def call():
+ raise ConnectionError("still down")
+
+ with pytest.raises(ConnectionError):
+ call()
+
+ pairs = [(t.from_state, t.to_state) for t in transitions]
+ assert pairs == [
+ (CircuitState.OPEN, CircuitState.HALF_OPEN),
+ (CircuitState.HALF_OPEN, CircuitState.OPEN),
+ ]
+
+
+def test_raising_on_transition_hook_is_swallowed(store):
+ config = CircuitBreakerConfig(failure_threshold=1, local_cache_max_age=0)
+
+ def boom(_transition):
+ raise RuntimeError("hook blew up")
+
+ @circuit_breaker(name="c", persistence_store=store, on_transition=boom, config=config)
+ def call():
+ raise ConnectionError("down")
+
+ # The hook raises during the CLOSED->OPEN notify, but the protected call's own
+ # ConnectionError must surface unchanged, not the hook's RuntimeError.
+ with pytest.raises(ConnectionError):
+ call()
+ assert store.db["c"].state == CircuitState.OPEN
+
+
+# --------------------------------------------------------------------------- bug regressions
+
+
+def test_stale_local_failures_reset_on_external_close(store, now):
+ """Bug 1: a partial failure streak in env A must not survive an external recovery cycle."""
+ config = CircuitBreakerConfig(failure_threshold=3, local_cache_max_age=0)
+ mode = {"value": "fail"}
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ if mode["value"] == "fail":
+ raise ConnectionError("down")
+ return "ok"
+
+ # Env A accumulates 2 failures (below threshold of 3).
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ call()
+ assert base_module._LOCAL_FAILURES["c"] == 2
+
+ # Simulate: another env trips the circuit, then a third closes it.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=3, opened_at=now)
+ # Force env A to observe OPEN so _LAST_OBSERVED_STATE tracks the transition.
+ with pytest.raises(CircuitBreakerOpenError):
+ call()
+ # Now the circuit is externally closed.
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.CLOSED, failure_count=0)
+
+ # A single failure in env A must NOT re-trip (stale streak was invalidated).
+ mode["value"] = "fail"
+ with pytest.raises(ConnectionError):
+ call()
+ assert "c" not in store.db or store.db["c"].state == CircuitState.CLOSED
+ assert base_module._LOCAL_FAILURES["c"] == 1 # fresh count, not 3
+
+
+def test_store_write_failure_does_not_mask_downstream_result(store, now):
+ """Bug 2: if save_closed() fails, the successful probe result must still be returned."""
+ config = CircuitBreakerConfig(failure_threshold=3, recovery_timeout=30, success_threshold=1, local_cache_max_age=0)
+
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=3, opened_at=now - 100)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "payment_charged"
+
+ # Make the persistence write fail.
+ original_update = store._update_record
+
+ def failing_update(record):
+ raise RuntimeError("DynamoDB timeout")
+
+ store._update_record = failing_update
+
+ # The probe should still return the downstream result despite the write failure.
+ result = call()
+ assert result == "payment_charged"
+
+ store._update_record = original_update
+
+
+def test_store_write_failure_on_trip_does_not_replace_downstream_exception(store):
+ """Bug 2 (trip path): if save_open() fails, the downstream's own exception must propagate."""
+ config = CircuitBreakerConfig(failure_threshold=2, local_cache_max_age=0)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ConnectionError("the real error")
+
+ # Make the persistence write fail.
+ def failing_put(record, condition=None, expected_opened_at=None):
+ raise RuntimeError("DynamoDB throttle")
+
+ store._put_record = failing_put
+
+ # Two failures should raise the ORIGINAL ConnectionError, not a RuntimeError from the store.
+ for _ in range(2):
+ with pytest.raises(ConnectionError, match="the real error"):
+ call()
+
+
+def test_probe_lease_takeover_when_owner_recycled(store, now):
+ """Design issue 1: a stranded HALF_OPEN probe with expired lease can be taken over."""
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=1, local_cache_max_age=0)
+
+ # Simulate: the probe owner ("dead-env") was recycled, lease has expired.
+ store.db["c"] = CircuitStateRecord(
+ name="c",
+ state=CircuitState.HALF_OPEN,
+ opened_at=now - 200,
+ half_open_owner="dead-env",
+ probe_lease_expiry=now - 10, # expired
+ )
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "recovered"
+
+ # Current environment should take over the expired lease and succeed.
+ result = call()
+ assert result == "recovered"
+ assert store.db["c"].state == CircuitState.CLOSED
+
+
+def test_half_open_non_owner_with_active_lease_is_rejected(store, now):
+ """Design issue 1 negative: active lease must NOT allow takeover."""
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=1, local_cache_max_age=0)
+
+ # Probe owner is alive (lease not expired yet).
+ store.db["c"] = CircuitStateRecord(
+ name="c",
+ state=CircuitState.HALF_OPEN,
+ opened_at=now - 50,
+ half_open_owner="other-env",
+ probe_lease_expiry=now + 999, # far in the future
+ )
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "should not run"
+
+ with pytest.raises(CircuitBreakerOpenError):
+ call()
+
+
+def test_open_lost_election_returns_open_response(store, now):
+ """Branch: try_acquire_half_open returns False (another env won the race)."""
+ config = CircuitBreakerConfig(recovery_timeout=30, success_threshold=1, local_cache_max_age=0)
+
+ # Circuit is OPEN and recovery window has elapsed, but election will fail
+ # because another env already holds the lock.
+ store.db["c"] = CircuitStateRecord(
+ name="c",
+ state=CircuitState.OPEN,
+ failure_count=5,
+ opened_at=now - 100,
+ half_open_owner="winner-env",
+ )
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ return "should not run"
+
+ with pytest.raises(CircuitBreakerOpenError):
+ call()
+
+
+def test_probe_ignored_exception_propagates_without_affecting_circuit(store, now):
+ """Branch: probe raises an exception that doesn't count as failure."""
+ config = CircuitBreakerConfig(
+ recovery_timeout=30,
+ success_threshold=1,
+ handled_exceptions=(ConnectionError,),
+ local_cache_max_age=0,
+ )
+
+ store.db["c"] = CircuitStateRecord(name="c", state=CircuitState.OPEN, failure_count=5, opened_at=now - 100)
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ raise ValueError("not a counted failure")
+
+ with pytest.raises(ValueError, match="not a counted failure"):
+ call()
+ # Circuit should still be HALF_OPEN (not reopened), the ValueError didn't count.
+ assert store.db["c"].state == CircuitState.HALF_OPEN
+
+
+def test_local_cache_serves_state_without_store_read(store):
+ """Covers the cache hit path (persistence/base.py line 140)."""
+ config = CircuitBreakerConfig(failure_threshold=3, local_cache_max_age=60)
+ call_count = {"value": 0}
+
+ @circuit_breaker(name="c", persistence_store=store, config=config)
+ def call():
+ call_count["value"] += 1
+ return "ok"
+
+ # First call reads the store (cache miss).
+ assert call() == "ok"
+ # Second call should serve from cache — override _get_record to detect reads.
+ original_get = store._get_record
+ get_called = {"value": False}
+
+ def tracked_get(name):
+ get_called["value"] = True
+ return original_get(name)
+
+ store._get_record = tracked_get
+ assert call() == "ok"
+ assert not get_called["value"], "second call should have used the cache, not the store"
+ store._get_record = original_get
+
+
+def test_error_with_details_formatting():
+ """Covers exceptions.py line 28 — __str__ with details."""
+ err = CircuitBreakerError("main message", "extra detail")
+ assert str(err) == "main message - (extra detail)"
diff --git a/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py
new file mode 100644
index 00000000000..d1617f26617
--- /dev/null
+++ b/tests/functional/circuit_breaker_alpha/test_dynamodb_persistence.py
@@ -0,0 +1,270 @@
+from __future__ import annotations
+
+import boto3
+import pytest
+from botocore.config import Config
+from botocore.stub import Stubber
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import (
+ CircuitBreakerDynamoDBPersistence,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState
+
+TABLE_NAME = "CircuitBreakerState"
+
+
+@pytest.fixture
+def persistence():
+ client = boto3.client("dynamodb", config=Config(region_name="us-east-1"))
+ layer = CircuitBreakerDynamoDBPersistence(table_name=TABLE_NAME, boto3_client=client)
+ layer.configure(CircuitBreakerConfig(local_cache_max_age=0), circuit_name="payment")
+ return layer
+
+
+def test_get_state_missing_item_returns_closed(persistence):
+ stubber = Stubber(persistence.client)
+ stubber.add_response(
+ "get_item",
+ {},
+ {"TableName": TABLE_NAME, "Key": {"id": {"S": "payment"}}, "ConsistentRead": False},
+ )
+ with stubber:
+ record = persistence.get_state("payment")
+ assert record.state == CircuitState.CLOSED
+
+
+def test_get_state_failing_store_fails_open(persistence):
+ stubber = Stubber(persistence.client)
+ stubber.add_client_error("get_item", service_error_code="InternalServerError")
+ with stubber:
+ record = persistence.get_state("payment")
+ assert record.state == CircuitState.CLOSED, "store failure must fail open (CLOSED)"
+
+
+def _capture_put_item(persistence):
+ """Patch put_item to capture its params instead of asserting a time-dependent TTL."""
+ captured = {}
+ original = persistence.client.put_item
+
+ def capturing(**kwargs):
+ captured.update(kwargs)
+ return {}
+
+ persistence.client.put_item = capturing
+ return captured, lambda: setattr(persistence.client, "put_item", original)
+
+
+def test_save_open_writes_open_item(persistence):
+ captured, restore = _capture_put_item(persistence)
+ try:
+ persistence.save_open("payment", failure_count=5, opened_at=1000)
+ finally:
+ restore()
+
+ item = captured["Item"]
+ assert item["id"] == {"S": "payment"}
+ assert item["state"] == {"S": "OPEN"}
+ assert item["failure_count"] == {"N": "5"}
+ assert item["opened_at"] == {"N": "1000"}
+ assert "expiration" in item, "open item must carry a TTL"
+
+
+def test_try_acquire_half_open_wins(persistence):
+ captured, restore = _capture_put_item(persistence)
+ try:
+ assert persistence.try_acquire_half_open("payment", "env-a", 1000) is True
+ finally:
+ restore()
+
+ item = captured["Item"]
+ assert item["state"] == {"S": "HALF_OPEN"}
+ assert item["half_open_owner"] == {"S": "env-a"}
+ assert item["opened_at"] == {"N": "1000"}
+ assert "expiration" in item
+ # Condition supports both fresh election (OPEN, no owner, matching opened_at) and
+ # lease takeover (HALF_OPEN with expired lease).
+ assert "#state = :open AND attribute_not_exists(#half_open_owner)" in captured["ConditionExpression"]
+ assert "#probe_lease_expiry <= :now" in captured["ConditionExpression"]
+ assert captured["ExpressionAttributeValues"][":open"] == {"S": "OPEN"}
+ assert captured["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "1000"}
+ assert "probe_lease_expiry" in item
+
+
+def test_try_acquire_half_open_loses_on_conditional_failure(persistence):
+ stubber = Stubber(persistence.client)
+ stubber.add_client_error("put_item", service_error_code="ConditionalCheckFailedException")
+ with stubber:
+ assert persistence.try_acquire_half_open("payment", "env-b", 1000) is False
+
+
+def test_save_closed_updates_record(persistence):
+ stubber = Stubber(persistence.client)
+ stubber.add_response("update_item", {})
+ with stubber:
+ persistence.save_closed("payment")
+ stubber.assert_no_pending_responses()
+
+
+# --------------------------------------------------------------------------- bug regressions
+
+
+def test_save_reopen_removes_half_open_owner(persistence):
+ # Bug #1: HALF_OPEN -> OPEN must clear half_open_owner, otherwise the next
+ # probe election (attribute_not_exists(half_open_owner)) can never succeed.
+ captured = {}
+ original_update = persistence.client.update_item
+
+ def capturing_update(**kwargs):
+ captured.update(kwargs)
+ return {}
+
+ persistence.client.update_item = capturing_update
+ try:
+ persistence.save_reopen("payment", opened_at=2000)
+ finally:
+ persistence.client.update_item = original_update
+
+ expression = captured["UpdateExpression"]
+ assert "REMOVE" in expression
+ assert captured["ExpressionAttributeNames"]["#half_open_owner"] == "half_open_owner"
+ assert "#half_open_owner" in expression.split("REMOVE", 1)[1], "owner must be in the REMOVE clause"
+
+
+def test_save_open_item_contains_expiration_attribute(persistence):
+ # Bug #2: the written item must carry the TTL (expiration) attribute, otherwise
+ # the documented self-cleaning of abandoned circuits never happens. Capture the
+ # actual PutItem params rather than asserting an exact (time-dependent) value.
+ captured = {}
+ persistence.local_cache_max_age = 5
+
+ original_put = persistence.client.put_item
+
+ def capturing_put(**kwargs):
+ captured.update(kwargs)
+ return {}
+
+ persistence.client.put_item = capturing_put
+ try:
+ persistence.save_open("payment", failure_count=5, opened_at=1000)
+ finally:
+ persistence.client.put_item = original_put
+
+ item = captured["Item"]
+ assert "expiration" in item, "open item must carry a DynamoDB TTL attribute"
+
+
+# --------------------------------------------------------------------------- serialization coverage
+
+
+def test_item_to_record_full_item(persistence):
+ """Cover _item_to_record with all fields present (lines 109-122)."""
+ item = {
+ "id": {"S": "payment"},
+ "state": {"S": "HALF_OPEN"},
+ "failure_count": {"N": "3"},
+ "opened_at": {"N": "1000"},
+ "half_open_owner": {"S": "env-x"},
+ "probe_lease_expiry": {"N": "2000"},
+ "expiration": {"N": "9999"},
+ }
+ record = persistence._item_to_record(item)
+ assert record.name == "payment"
+ assert record.state == CircuitState.HALF_OPEN
+ assert record.failure_count == 3
+ assert record.opened_at == 1000
+ assert record.half_open_owner == "env-x"
+ assert record.probe_lease_expiry == 2000
+ assert record.expiry_timestamp == 9999
+
+
+def test_item_to_record_minimal_item(persistence):
+ """Cover _item_to_record with optional fields absent."""
+ item = {
+ "id": {"S": "payment"},
+ "state": {"S": "CLOSED"},
+ }
+ record = persistence._item_to_record(item)
+ assert record.name == "payment"
+ assert record.state == CircuitState.CLOSED
+ assert record.failure_count == 0
+ assert record.opened_at is None
+ assert record.half_open_owner is None
+ assert record.probe_lease_expiry is None
+
+
+def test_record_to_item_full_record(persistence):
+ """Cover _record_to_item with all fields set (lines 124-139)."""
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+
+ record = CircuitStateRecord(
+ name="payment",
+ state=CircuitState.HALF_OPEN,
+ failure_count=5,
+ opened_at=1000,
+ half_open_owner="env-a",
+ probe_lease_expiry=2000,
+ expiry_timestamp=9999,
+ )
+ item = persistence._record_to_item(record)
+ assert item["id"] == {"S": "payment"}
+ assert item["state"] == {"S": "HALF_OPEN"}
+ assert item["failure_count"] == {"N": "5"}
+ assert item["opened_at"] == {"N": "1000"}
+ assert item["half_open_owner"] == {"S": "env-a"}
+ assert item["probe_lease_expiry"] == {"N": "2000"}
+ assert item["expiration"] == {"N": "9999"}
+
+
+def test_record_to_item_minimal_record(persistence):
+ """Cover _record_to_item with optional fields as None (branch misses on lines 131-137)."""
+ from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+
+ record = CircuitStateRecord(name="payment", state=CircuitState.CLOSED)
+ item = persistence._record_to_item(record)
+ assert item == {
+ "id": {"S": "payment"},
+ "state": {"S": "CLOSED"},
+ "failure_count": {"N": "0"},
+ }
+ assert "opened_at" not in item
+ assert "half_open_owner" not in item
+ assert "probe_lease_expiry" not in item
+ assert "expiration" not in item
+
+
+def test_get_record_returns_deserialized_item(persistence):
+ """Cover _get_record success path (lines 141-153)."""
+ stubber = Stubber(persistence.client)
+ stubber.add_response(
+ "get_item",
+ {
+ "Item": {
+ "id": {"S": "payment"},
+ "state": {"S": "OPEN"},
+ "failure_count": {"N": "5"},
+ "opened_at": {"N": "1000"},
+ },
+ },
+ {"TableName": TABLE_NAME, "Key": {"id": {"S": "payment"}}, "ConsistentRead": False},
+ )
+ with stubber:
+ record = persistence._get_record("payment")
+ assert record.state == CircuitState.OPEN
+ assert record.failure_count == 5
+ assert record.opened_at == 1000
+
+
+def test_build_half_open_condition_without_expected_opened_at(persistence):
+ """Cover _build_half_open_condition when expected_opened_at is None (line 172 branch)."""
+ result = persistence._build_half_open_condition(expected_opened_at=None)
+ assert ":expected_opened_at" not in result["ConditionExpression"]
+ assert "#opened_at" not in result["ExpressionAttributeNames"]
+
+
+def test_build_half_open_condition_with_expected_opened_at(persistence):
+ """Cover _build_half_open_condition when expected_opened_at is set (lines 172-177)."""
+ result = persistence._build_half_open_condition(expected_opened_at=5000)
+ assert "#opened_at = :expected_opened_at" in result["ConditionExpression"]
+ assert result["ExpressionAttributeNames"]["#opened_at"] == "opened_at"
+ assert result["ExpressionAttributeValues"][":expected_opened_at"] == {"N": "5000"}
diff --git a/tests/unit/circuit_breaker_alpha/__init__.py b/tests/unit/circuit_breaker_alpha/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/unit/circuit_breaker_alpha/test_config_and_states.py b/tests/unit/circuit_breaker_alpha/test_config_and_states.py
new file mode 100644
index 00000000000..a0b811e38c0
--- /dev/null
+++ b/tests/unit/circuit_breaker_alpha/test_config_and_states.py
@@ -0,0 +1,93 @@
+from __future__ import annotations
+
+import dataclasses
+
+import pytest
+
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import (
+ CircuitBreakerConfigError,
+ CircuitBreakerOpenError,
+)
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord
+from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo, CircuitState
+
+
+def test_circuit_state_serializes_to_plain_string():
+ assert str(CircuitState.OPEN) == "OPEN"
+ assert CircuitState.OPEN == "OPEN"
+
+
+def test_circuit_info_is_immutable():
+ info = CircuitInfo(name="payment", state=CircuitState.OPEN, failure_count=5, opened_at=123)
+ with pytest.raises(dataclasses.FrozenInstanceError):
+ info.name = "other" # type: ignore[misc]
+
+
+def test_config_defaults():
+ config = CircuitBreakerConfig()
+ assert config.failure_threshold == 5
+ assert config.recovery_timeout == 30
+ assert config.success_threshold == 3
+ assert config.local_cache_max_age == 5
+ assert config.handled_exceptions is None
+ assert config.ignored_exceptions is None
+
+
+def test_config_rejects_both_exception_lists():
+ with pytest.raises(CircuitBreakerConfigError, match="mutually exclusive"):
+ CircuitBreakerConfig(handled_exceptions=(TimeoutError,), ignored_exceptions=(ValueError,))
+
+
+@pytest.mark.parametrize("field", ["failure_threshold", "recovery_timeout", "success_threshold"])
+def test_config_rejects_non_positive_thresholds(field):
+ with pytest.raises(CircuitBreakerConfigError, match="positive integer"):
+ CircuitBreakerConfig(**{field: 0})
+
+
+def test_config_allows_zero_cache_age():
+ assert CircuitBreakerConfig(local_cache_max_age=0).local_cache_max_age == 0
+
+
+def test_config_rejects_negative_cache_age():
+ with pytest.raises(CircuitBreakerConfigError, match="non-negative"):
+ CircuitBreakerConfig(local_cache_max_age=-1)
+
+
+def test_counts_as_failure_default_any_exception():
+ config = CircuitBreakerConfig()
+ assert config.counts_as_failure(ValueError()) is True
+ assert config.counts_as_failure(TimeoutError()) is True
+
+
+def test_counts_as_failure_allowlist():
+ config = CircuitBreakerConfig(handled_exceptions=(TimeoutError, ConnectionError))
+ assert config.counts_as_failure(TimeoutError()) is True
+ assert config.counts_as_failure(ValueError()) is False
+
+
+def test_counts_as_failure_denylist():
+ config = CircuitBreakerConfig(ignored_exceptions=(ValueError,))
+ assert config.counts_as_failure(ValueError()) is False
+ assert config.counts_as_failure(KeyError()) is True
+
+
+def test_open_error_carries_circuit_info():
+ info = CircuitInfo(name="payment", state=CircuitState.OPEN, failure_count=5, opened_at=123)
+ error = CircuitBreakerOpenError("open", circuit=info)
+ assert error.circuit is info
+
+
+def test_record_to_circuit_info_strips_internal_fields():
+ record = CircuitStateRecord(
+ name="payment",
+ state=CircuitState.OPEN,
+ failure_count=5,
+ opened_at=123,
+ half_open_owner="env-abc",
+ expiry_timestamp=999,
+ )
+ info = record.to_circuit_info()
+ assert info == CircuitInfo(name="payment", state=CircuitState.OPEN, failure_count=5, opened_at=123)
+ assert not hasattr(info, "half_open_owner")
+ assert not hasattr(info, "expiry_timestamp")