Skip to content

Commit 34bc054

Browse files
Merge branch 'develop' into dependabot/pip/develop/redis-8.0.0
2 parents cbdf293 + a9cffe4 commit 34bc054

30 files changed

Lines changed: 2878 additions & 9 deletions

aws_lambda_powertools/shared/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,6 @@
7676

7777
# Idempotency constants
7878
IDEMPOTENCY_DISABLED_ENV: str = "POWERTOOLS_IDEMPOTENCY_DISABLED"
79+
80+
# Circuit breaker constants
81+
CIRCUIT_BREAKER_DISABLED_ENV: str = "POWERTOOLS_CIRCUIT_BREAKER_DISABLED"
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""
2+
Circuit Breaker utility for protecting unhealthy downstream dependencies.
3+
4+
!!! warning "Alpha / experimental"
5+
This utility is published under the `_alpha` namespace while we collect
6+
feedback. The public API may change in a backwards-incompatible way before it
7+
is promoted to GA. Pin your version and follow the tracking discussion before
8+
relying on it in production.
9+
"""
10+
11+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.circuit_breaker import circuit_breaker
12+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
13+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import (
14+
CircuitBreakerConfigError,
15+
CircuitBreakerError,
16+
CircuitBreakerOpenError,
17+
CircuitBreakerPersistenceError,
18+
)
19+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import (
20+
CircuitInfo,
21+
CircuitState,
22+
CircuitTransition,
23+
)
24+
25+
__all__ = (
26+
"circuit_breaker",
27+
"CircuitBreakerConfig",
28+
"CircuitInfo",
29+
"CircuitState",
30+
"CircuitTransition",
31+
"CircuitBreakerError",
32+
"CircuitBreakerOpenError",
33+
"CircuitBreakerConfigError",
34+
"CircuitBreakerPersistenceError",
35+
)
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
"""
2+
Orchestrator for the Circuit Breaker utility.
3+
4+
:class:`CircuitBreakerHandler` owns the state machine and the per-environment failure
5+
counter; the persistence layer owns the shared truth. This split keeps the healthy
6+
path write-free: failures are counted locally and only persisted on a state transition.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import datetime
12+
import logging
13+
import uuid
14+
from typing import TYPE_CHECKING, Any
15+
16+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerOpenError
17+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState, CircuitTransition
18+
19+
if TYPE_CHECKING:
20+
from collections.abc import Callable
21+
22+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig
23+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import (
24+
CircuitBreakerPersistenceLayer,
25+
)
26+
from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo
27+
28+
logger = logging.getLogger(__name__)
29+
30+
# Per-environment, per-circuit consecutive counters. Module-level so they survive across
31+
# invocations within the same execution environment, the same way idempotency caches do.
32+
_LOCAL_FAILURES: dict[str, int] = {}
33+
_LOCAL_SUCCESSES: dict[str, int] = {}
34+
35+
# Tracks the last state this environment observed from the store, per circuit. Used to
36+
# detect transitions back to CLOSED that happened externally (another env tripped and
37+
# recovered), so stale local failure streaks can be invalidated.
38+
_LAST_OBSERVED_STATE: dict[str, CircuitState] = {}
39+
40+
# Stable per-environment identifier used to claim the half-open probe lock.
41+
_ENVIRONMENT_ID = uuid.uuid4().hex
42+
43+
44+
class CircuitBreakerHandler:
45+
"""
46+
Drive a single protected call through the circuit breaker state machine.
47+
48+
A new handler is created per invocation by the decorator. It reads the shared state,
49+
routes the call (run, short-circuit, or probe), and records the outcome.
50+
51+
Parameters
52+
----------
53+
function : Callable
54+
The protected function.
55+
name : str
56+
Circuit name.
57+
config : CircuitBreakerConfig
58+
Circuit configuration.
59+
persistence_store : CircuitBreakerPersistenceLayer
60+
Shared state store.
61+
on_circuit_open : Callable | None
62+
Callback invoked with the protected call's own ``*args``/``**kwargs`` plus a
63+
trailing ``circuit`` keyword argument when the circuit is open. If ``None``, an
64+
open circuit raises :class:`CircuitBreakerOpenError`.
65+
function_args : tuple
66+
Positional arguments the protected function was called with.
67+
function_kwargs : dict
68+
Keyword arguments the protected function was called with.
69+
"""
70+
71+
def __init__(
72+
self,
73+
function: Callable,
74+
name: str,
75+
config: CircuitBreakerConfig,
76+
persistence_store: CircuitBreakerPersistenceLayer,
77+
on_circuit_open: Callable | None = None,
78+
on_transition: Callable | None = None,
79+
function_args: tuple | None = None,
80+
function_kwargs: dict | None = None,
81+
):
82+
self.function = function
83+
self.name = name
84+
self.config = config
85+
self.on_circuit_open = on_circuit_open
86+
self.on_transition = on_transition
87+
self.fn_args = function_args or ()
88+
self.fn_kwargs = function_kwargs or {}
89+
90+
persistence_store.configure(config=config, circuit_name=name)
91+
self.persistence_store = persistence_store
92+
93+
def handle(self) -> Any:
94+
"""
95+
Evaluate the circuit and route the call.
96+
97+
Returns
98+
-------
99+
Any
100+
The protected function's result when the call runs, or the
101+
``on_circuit_open`` callback's return value when the circuit is open.
102+
103+
Raises
104+
------
105+
CircuitBreakerOpenError
106+
If the circuit is open and no callback is registered.
107+
"""
108+
record = self.persistence_store.get_state(self.name)
109+
110+
if record.state == CircuitState.CLOSED:
111+
# If we previously observed a non-CLOSED state and the circuit is now back to
112+
# CLOSED, another environment completed the recovery cycle. Reset local counters
113+
# so a stale partial failure streak doesn't immediately re-trip the circuit.
114+
prev = _LAST_OBSERVED_STATE.get(self.name)
115+
if prev is not None and prev != CircuitState.CLOSED:
116+
_LOCAL_FAILURES[self.name] = 0
117+
_LAST_OBSERVED_STATE[self.name] = CircuitState.CLOSED
118+
return self._call_closed()
119+
120+
if record.state == CircuitState.OPEN:
121+
_LAST_OBSERVED_STATE[self.name] = CircuitState.OPEN
122+
# ``opened_at`` may legitimately be 0 (epoch); treat only None as missing.
123+
opened_at = record.opened_at if record.opened_at is not None else self._now()
124+
if self._now() >= opened_at + self.config.recovery_timeout:
125+
# Recovery window elapsed: try to become the single prober.
126+
if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, opened_at):
127+
self._notify(CircuitState.OPEN, CircuitState.HALF_OPEN, opened_at=opened_at)
128+
return self._call_probe()
129+
return self._open_response(record.to_circuit_info())
130+
131+
# HALF_OPEN: only the environment that owns the probe lock runs.
132+
_LAST_OBSERVED_STATE[self.name] = CircuitState.HALF_OPEN
133+
if record.half_open_owner == _ENVIRONMENT_ID:
134+
return self._call_probe()
135+
136+
# If the probe lease has expired (owner recycled mid-probe), take over.
137+
if record.probe_lease_expiry is not None and self._now() >= record.probe_lease_expiry:
138+
logger.debug("Circuit '%s' probe lease expired; attempting takeover.", self.name)
139+
if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, record.opened_at or 0):
140+
return self._call_probe()
141+
142+
return self._open_response(record.to_circuit_info())
143+
144+
def _call_closed(self) -> Any:
145+
"""Run the protected call while the circuit is closed, tracking failures."""
146+
try:
147+
result = self.function(*self.fn_args, **self.fn_kwargs)
148+
except Exception as exc:
149+
if not self.config.counts_as_failure(exc):
150+
raise
151+
failures = _LOCAL_FAILURES.get(self.name, 0) + 1
152+
_LOCAL_FAILURES[self.name] = failures
153+
if failures >= self.config.failure_threshold:
154+
logger.debug("Circuit '%s' tripping CLOSED to OPEN after %d failures.", self.name, failures)
155+
opened_at = self._now()
156+
self._safe_persist(
157+
self.persistence_store.save_open,
158+
self.name,
159+
failure_count=failures,
160+
opened_at=opened_at,
161+
)
162+
_LOCAL_FAILURES[self.name] = 0
163+
self._notify(CircuitState.CLOSED, CircuitState.OPEN, opened_at=opened_at)
164+
raise
165+
else:
166+
_LOCAL_FAILURES[self.name] = 0
167+
return result
168+
169+
def _call_probe(self) -> Any:
170+
"""Run a probe during half-open, closing or reopening based on the outcome."""
171+
try:
172+
result = self.function(*self.fn_args, **self.fn_kwargs)
173+
except Exception as exc:
174+
if not self.config.counts_as_failure(exc):
175+
raise
176+
logger.debug("Circuit '%s' probe failed; reopening.", self.name)
177+
opened_at = self._now()
178+
self._safe_persist(self.persistence_store.save_reopen, self.name, opened_at=opened_at)
179+
_LOCAL_SUCCESSES[self.name] = 0
180+
self._notify(CircuitState.HALF_OPEN, CircuitState.OPEN, opened_at=opened_at)
181+
raise
182+
else:
183+
successes = _LOCAL_SUCCESSES.get(self.name, 0) + 1
184+
_LOCAL_SUCCESSES[self.name] = successes
185+
if successes >= self.config.success_threshold:
186+
logger.debug("Circuit '%s' closing after %d probe successes.", self.name, successes)
187+
self._safe_persist(self.persistence_store.save_closed, self.name)
188+
_LOCAL_SUCCESSES[self.name] = 0
189+
_LOCAL_FAILURES[self.name] = 0
190+
self._notify(CircuitState.HALF_OPEN, CircuitState.CLOSED)
191+
return result
192+
193+
def _safe_persist(self, fn: Callable, *args: Any, **kwargs: Any) -> None:
194+
"""
195+
Call a persistence write, swallowing and logging failures.
196+
197+
State-transition writes must never mask the downstream's real result or replace
198+
the downstream's real exception. This mirrors the fail-open read policy in the
199+
persistence layer.
200+
"""
201+
try:
202+
fn(*args, **kwargs)
203+
except Exception:
204+
logger.warning(
205+
"Circuit '%s': persistence write (%s) failed; the transition may be delayed but the "
206+
"downstream result is preserved.",
207+
self.name,
208+
getattr(fn, "__name__", repr(fn)),
209+
exc_info=True,
210+
)
211+
212+
def _open_response(self, circuit: CircuitInfo) -> Any:
213+
"""Produce the response for an open circuit: callback result or raise."""
214+
if self.on_circuit_open is not None:
215+
# Forward the protected call's arguments unchanged: positional stay positional,
216+
# keyword stay keyword. The circuit snapshot is passed as a keyword argument so
217+
# it never collides with positionalized kwargs nor depends on dict ordering.
218+
return self.on_circuit_open(*self.fn_args, **self.fn_kwargs, circuit=circuit)
219+
raise CircuitBreakerOpenError(
220+
f"Circuit '{self.name}' is open.",
221+
circuit=circuit,
222+
)
223+
224+
def _notify(self, from_state: CircuitState, to_state: CircuitState, opened_at: int | None = None) -> None:
225+
"""
226+
Fire the ``on_transition`` hook for a state change.
227+
228+
Called only on real transitions, never on the hot path. Any exception the hook
229+
raises is swallowed and logged: observability must never break the protected call.
230+
"""
231+
if self.on_transition is None:
232+
return
233+
try:
234+
self.on_transition(
235+
CircuitTransition(
236+
circuit_name=self.name,
237+
from_state=from_state,
238+
to_state=to_state,
239+
opened_at=opened_at,
240+
),
241+
)
242+
except Exception:
243+
logger.warning("on_transition hook for circuit '%s' raised; ignoring.", self.name, exc_info=True)
244+
245+
@staticmethod
246+
def _now() -> int:
247+
"""Current unix timestamp in seconds."""
248+
return int(datetime.datetime.now().timestamp())

0 commit comments

Comments
 (0)