pybreaker implements the circuit breaker pattern for Python with thread-safe state management. pip install pybreaker. Basic: import pybreaker; cb = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60). Wrap: cb.call(requests.get, url). Decorator: @cb. Error: raises pybreaker.CircuitBreakerError when open. States: CLOSED (normal), OPEN (blocking calls), HALF_OPEN (probing). Transitions: CLOSED → OPEN after fail_max consecutive failures. OPEN → HALF_OPEN after reset_timeout seconds. HALF_OPEN → CLOSED on success, → OPEN on failure. Current state: cb.state. Count: cb.fail_counter. Listeners: pybreaker.CircuitBreakerListener subclass with before_call, call_succeeded, call_failed, state_change. Custom error: CircuitBreaker(exclude=[ValueError]) — don’t count ValueError as a failure. expected_exception=requests.exceptions.HTTPError — only catch certain exceptions. Redis state: pybreaker.CircuitBreaker(state_storage=RedisStorage) — shared state across processes/containers. from pybreaker import CircuitRedisStorage. Reset manually: cb.close(). cb.open(). Bulkhead: one CircuitBreaker per dependency. payment_cb, email_cb, search_cb — isolation. Claude Code generates pybreaker circuit breakers, HTTP resilience wrappers, and distributed fault-tolerant service calls.
CLAUDE.md for pybreaker
## pybreaker Stack
- Version: pybreaker >= 1.2 | pip install pybreaker
- Create: CircuitBreaker(fail_max=5, reset_timeout=60)
- Call: cb.call(fn, *args) | @cb decorator on function
- States: CLOSED → OPEN (after fail_max failures) → HALF_OPEN (after reset_timeout) → CLOSED
- Error: CircuitBreakerError raised when circuit is OPEN
- Listeners: subclass CircuitBreakerListener with state_change/call_failed methods
- Redis: CircuitBreaker(state_storage=CircuitRedisStorage(redis)) for distributed
pybreaker Circuit Breaker Pipeline
# app/circuit_breaker.py — pybreaker circuit breakers, listeners, and HTTP wrappers
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Callable
import pybreaker
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Custom listeners
# ─────────────────────────────────────────────────────────────────────────────
class LoggingListener(pybreaker.CircuitBreakerListener):
"""Log all circuit breaker state changes and failures."""
def state_change(self, cb, old_state, new_state):
logger.warning(
"Circuit '%s' state: %s → %s (fail_count=%d)",
cb.name,
old_state.name,
new_state.name,
cb.fail_counter,
)
def call_failed(self, cb, func, exc):
logger.error(
"Circuit '%s' call failed (%s): %s",
cb.name,
func.__name__ if hasattr(func, "__name__") else str(func),
exc,
)
def call_succeeded(self, cb, func, result):
if cb.state.name == "half_open":
logger.info(
"Circuit '%s' probe succeeded — closing",
cb.name,
)
def before_call(self, cb, func, *args, **kwargs):
pass # called before each .call(); override for rate limiting
class MetricsListener(pybreaker.CircuitBreakerListener):
"""Accumulate call statistics — useful for health dashboards."""
def __init__(self) -> None:
self.success_count = 0
self.failure_count = 0
self.rejected_count = 0
self.open_count = 0
def call_succeeded(self, cb, func, result):
self.success_count += 1
def call_failed(self, cb, func, exc):
if isinstance(exc, pybreaker.CircuitBreakerError):
self.rejected_count += 1
else:
self.failure_count += 1
def state_change(self, cb, old_state, new_state):
if new_state.name == "open":
self.open_count += 1
@property
def stats(self) -> dict[str, int]:
return {
"success": self.success_count,
"failure": self.failure_count,
"rejected": self.rejected_count,
"opens": self.open_count,
}
# ─────────────────────────────────────────────────────────────────────────────
# 2. Circuit breaker factory
# ─────────────────────────────────────────────────────────────────────────────
def make_breaker(
name: str,
fail_max: int = 5,
reset_timeout: int = 60,
exclude: list[type] | None = None,
listeners: list[pybreaker.CircuitBreakerListener] | None = None,
metrics: bool = True,
) -> tuple[pybreaker.CircuitBreaker, MetricsListener | None]:
"""
Create a CircuitBreaker with sensible defaults.
Returns (breaker, metrics_listener).
metrics_listener is None if metrics=False.
"""
all_listeners: list[pybreaker.CircuitBreakerListener] = [LoggingListener()]
metrics_listener = None
if metrics:
metrics_listener = MetricsListener()
all_listeners.append(metrics_listener)
if listeners:
all_listeners.extend(listeners)
cb = pybreaker.CircuitBreaker(
fail_max=fail_max,
reset_timeout=reset_timeout,
name=name,
exclude=exclude or [],
listeners=all_listeners,
)
return cb, metrics_listener
# ─────────────────────────────────────────────────────────────────────────────
# 3. HTTP request wrapper
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class HttpCircuitBreaker:
"""
An HTTP client with a per-host circuit breaker.
Opens the circuit after `fail_max` consecutive errors (connection, timeout, 5xx).
Resets after `reset_timeout` seconds.
"""
name: str
fail_max: int = 5
reset_timeout: int = 60
timeout: float = 10.0
def __post_init__(self):
self._breaker, self.metrics = make_breaker(
self.name,
fail_max=self.fail_max,
reset_timeout=self.reset_timeout,
)
def get(self, url: str, **kwargs) -> Any:
"""GET a URL through the circuit breaker."""
return self._breaker.call(self._do_get, url, **kwargs)
def post(self, url: str, **kwargs) -> Any:
"""POST to a URL through the circuit breaker."""
return self._breaker.call(self._do_post, url, **kwargs)
def _do_get(self, url: str, **kwargs):
import requests
resp = requests.get(url, timeout=self.timeout, **kwargs)
if resp.status_code >= 500:
raise requests.exceptions.HTTPError(
f"HTTP {resp.status_code}", response=resp
)
return resp
def _do_post(self, url: str, **kwargs):
import requests
resp = requests.post(url, timeout=self.timeout, **kwargs)
if resp.status_code >= 500:
raise requests.exceptions.HTTPError(
f"HTTP {resp.status_code}", response=resp
)
return resp
@property
def is_open(self) -> bool:
return self._breaker.state.name == "open"
@property
def state(self) -> str:
return self._breaker.state.name
def close(self) -> None:
"""Manually close (reset) the circuit — e.g. after fixing the downstream service."""
self._breaker.close()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Service call with fallback
# ─────────────────────────────────────────────────────────────────────────────
def call_with_fallback(
breaker: pybreaker.CircuitBreaker,
fn: Callable[..., Any],
fallback_fn: Callable[..., Any],
*args,
**kwargs,
) -> Any:
"""
Call `fn` through the circuit breaker.
If the circuit is open or the call fails, execute `fallback_fn` instead.
Pattern: primary service call with circuit breaker + cached/default fallback.
"""
try:
return breaker.call(fn, *args, **kwargs)
except pybreaker.CircuitBreakerError:
logger.warning("Circuit open — using fallback for %s", fn.__name__)
return fallback_fn(*args, **kwargs)
except Exception as exc:
logger.error("Call failed, using fallback: %s", exc)
return fallback_fn(*args, **kwargs)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Registry — one breaker per dependency
# ─────────────────────────────────────────────────────────────────────────────
class CircuitBreakerRegistry:
"""
Manage a pool of named circuit breakers.
One breaker per external dependency (payment API, email service, DB, etc.)
Provides bulkhead isolation: one dependency failing doesn't open others.
"""
def __init__(self) -> None:
self._breakers: dict[str, tuple[pybreaker.CircuitBreaker, MetricsListener | None]] = {}
def get_or_create(
self,
name: str,
fail_max: int = 5,
reset_timeout: int = 60,
) -> pybreaker.CircuitBreaker:
"""Return existing or create a new CircuitBreaker for `name`."""
if name not in self._breakers:
self._breakers[name] = make_breaker(name, fail_max=fail_max, reset_timeout=reset_timeout)
return self._breakers[name][0]
def call(self, name: str, fn: Callable, *args, **kwargs) -> Any:
"""Call `fn` through the named circuit breaker."""
return self.get_or_create(name).call(fn, *args, **kwargs)
def status(self) -> dict[str, dict[str, Any]]:
"""Return state and metrics for all registered breakers."""
result = {}
for name, (cb, metrics) in self._breakers.items():
result[name] = {
"state": cb.state.name,
"fail_count": cb.fail_counter,
"metrics": metrics.stats if metrics else {},
}
return result
def all_closed(self) -> bool:
"""Return True if all circuits are closed (healthy)."""
return all(cb.state.name == "closed" for cb, _ in self._breakers.values())
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Basic circuit breaker ===")
cb, metrics = make_breaker("demo", fail_max=3, reset_timeout=2)
fail_count = [0]
def flaky_service() -> str:
fail_count[0] += 1
if fail_count[0] <= 3:
raise ConnectionError(f"connection refused (call #{fail_count[0]})")
return "ok"
for i in range(6):
try:
result = cb.call(flaky_service)
print(f" call {i+1}: OK → {result} | state={cb.state.name}")
except pybreaker.CircuitBreakerError:
print(f" call {i+1}: CIRCUIT OPEN (fast-fail) | state={cb.state.name}")
except ConnectionError as e:
print(f" call {i+1}: FAILURE — {e} | state={cb.state.name}")
print(f"\n Metrics: {metrics.stats}")
print("\n=== call_with_fallback ===")
cb2, _ = make_breaker("fallback_demo", fail_max=2, reset_timeout=60)
def primary() -> str:
raise IOError("primary down")
def fallback() -> str:
return "cached_response"
for i in range(4):
result = call_with_fallback(cb2, primary, fallback)
print(f" call {i+1}: {result!r} | state={cb2.state.name}")
print("\n=== CircuitBreakerRegistry ===")
registry = CircuitBreakerRegistry()
def payment_api() -> str:
raise TimeoutError("payment timeout")
def email_api() -> str:
return "email sent"
for i in range(3):
try:
registry.call("payment", payment_api)
except (TimeoutError, pybreaker.CircuitBreakerError):
pass
registry.call("email", email_api)
print(" Registry status:")
for name, info in registry.status().items():
print(f" {name:12}: state={info['state']:8} metrics={info['metrics']}")
print("\n=== Decorator style ===")
cb3, _ = make_breaker("deco", fail_max=2, reset_timeout=60)
call_n = [0]
@cb3
def search_service(query: str) -> list[str]:
call_n[0] += 1
if call_n[0] <= 2:
raise RuntimeError("search unavailable")
return [f"result for {query}"]
for i in range(4):
try:
r = search_service("python")
print(f" call {i+1}: {r} | state={cb3.state.name}")
except (RuntimeError, pybreaker.CircuitBreakerError) as e:
print(f" call {i+1}: {type(e).__name__}: {e} | state={cb3.state.name}")
For the tenacity alternative — tenacity’s @retry retries the same service call repeatedly with backoff; a circuit breaker stops calling a failing service entirely (fast-fails) while the circuit is open, preventing thread saturation and cascade failures; in production you typically use both: tenacity for transient errors on individual requests and pybreaker to stop hammering a completely down dependency. For the resilience4py alternative — resilience4py has richer configuration mirrors of Java’s resilience4j (bulkhead, time limiter, concurrent call limiting) but is less actively maintained; pybreaker is the most commonly used Python circuit breaker library with a simple API and thread-safe Redis state for distributed deployments. The Claude Skills 360 bundle includes pybreaker skill sets covering CircuitBreaker with fail_max/reset_timeout/name/exclude, CLOSED/OPEN/HALF_OPEN state transitions, LoggingListener and MetricsListener subclasses, make_breaker() factory with metrics, HttpCircuitBreaker for requests-based HTTP, call_with_fallback() primary+fallback pattern, CircuitBreakerRegistry for bulkhead isolation, @cb decorator style, cb.call() explicit call style, and manual cb.close()/cb.open() reset. Start with the free tier to try circuit breaker pattern code generation.