stamina provides production-safe retries. pip install stamina. Decorator: import stamina. @stamina.retry(on=httpx.HTTPStatusError). async def fetch(url): .... Sync: @stamina.retry(on=Exception). def call_api(): .... Attempts: @stamina.retry(on=Exception, attempts=5). Timeout: @stamina.retry(on=Exception, timeout=30) — total time budget. Wait: @stamina.retry(on=Exception, wait_initial=0.1, wait_max=10, wait_jitter=1). Defaults: wait_initial=0.1s, wait_max=45s, wait_factor=2 (exponential), wait_jitter=1s random jitter. Multiple exceptions: @stamina.retry(on=(httpx.HTTPStatusError, TimeoutError)). Conditional: filter by status code with a callable: @stamina.retry(on=lambda e: isinstance(e, httpx.HTTPStatusError) and e.response.status_code >= 500). Manual loop: for attempt in stamina.retry_context(on=Exception, attempts=3): with attempt: result = risky_call(). Instrumentation: stamina.instrumentation.set_on_retry_handler(handler). OpenTelemetry: from stamina.instrumentation.otel import add_retry_attributes. structlog: custom handler logs retry events. Disable in tests: stamina.set_active(False). Claude Code generates stamina retry decorators, retry_context loops, and observable retry pipelines.
CLAUDE.md for stamina
## stamina Stack
- Version: stamina >= 24.2 | pip install stamina
- Decorator: @stamina.retry(on=ExcType) | @stamina.retry(on=(A, B), attempts=5)
- Budget: timeout=30 (total seconds) | attempts=N (max tries, overrides timeout)
- Backoff: wait_initial=0.1, wait_max=45, wait_factor=2, wait_jitter=1 (defaults)
- Conditional: on=lambda e: isinstance(e, HTTPError) and e.response.status_code >= 500
- Context: for attempt in stamina.retry_context(on=Exception): with attempt: risky()
- Testing: stamina.set_active(False) — disables retries for unit tests
stamina Retry Pipeline
# app/resilient.py — stamina retry patterns for APIs, DB, and queue operations
from __future__ import annotations
import logging
import time
from typing import Any
import stamina
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Instrumentation — log retry attempts with structlog or stdlib logging
# ─────────────────────────────────────────────────────────────────────────────
def _on_retry(details: stamina.RetryDetails) -> None:
"""
Called before each retry attempt.
details: name, args, kwargs, exception, wait, attempt
"""
log.warning(
"retry_attempt",
extra={
"function": details.name,
"attempt": details.attempt,
"wait_secs": round(details.wait, 3),
"exception": type(details.exception).__name__,
"message": str(details.exception),
},
)
stamina.instrumentation.set_on_retry_handler(_on_retry)
# ─────────────────────────────────────────────────────────────────────────────
# 1. HTTP client — retry on transient server errors
# ─────────────────────────────────────────────────────────────────────────────
class _TransientHTTPError(Exception):
"""Raised for 5xx status codes that are worth retrying."""
def __init__(self, status_code: int, message: str) -> None:
self.status_code = status_code
super().__init__(f"HTTP {status_code}: {message}")
def _is_retryable(exc: Exception) -> bool:
"""Only retry on 5xx or connection errors, not 4xx client errors."""
if isinstance(exc, _TransientHTTPError):
return exc.status_code >= 500
return isinstance(exc, (TimeoutError, ConnectionError))
class SimpleAPIClient:
"""API client with stamina retry on transient failures."""
def __init__(self, base_url: str) -> None:
self.base_url = base_url
self._call_count = 0
@stamina.retry(
on=_is_retryable,
attempts=5,
wait_initial=0.1,
wait_max=10.0,
wait_jitter=0.5,
)
def get(self, path: str) -> dict:
"""GET request — retried up to 5 times on 5xx or connection errors."""
self._call_count += 1
# Simulate transient 503 on first two calls
if self._call_count <= 2:
raise _TransientHTTPError(503, "Service Unavailable")
return {"data": f"result from {path}", "attempt": self._call_count}
@stamina.retry(
on=_is_retryable,
timeout=30.0, # total budget: 30s, regardless of attempt count
wait_initial=0.2,
wait_max=8.0,
)
async def async_post(self, path: str, payload: dict) -> dict:
"""Async POST — retry within a 30s total time budget."""
import asyncio
await asyncio.sleep(0.01)
return {"created": True, "path": path}
# ─────────────────────────────────────────────────────────────────────────────
# 2. Database operations — retry on transient connection errors
# ─────────────────────────────────────────────────────────────────────────────
class DatabaseUnavailableError(Exception):
pass
_db_fail_count = 0
@stamina.retry(
on=DatabaseUnavailableError,
attempts=4,
wait_initial=0.5,
wait_max=5.0,
wait_factor=2.0,
wait_jitter=0.2,
)
def fetch_user(user_id: int) -> dict:
"""
DB lookup with exponential backoff:
attempt 1, wait ~0.5s, attempt 2, wait ~1s, attempt 3, wait ~2s, attempt 4.
wait_factor=2.0 doubles the wait each time; wait_jitter adds ±0.2s noise.
"""
global _db_fail_count
_db_fail_count += 1
if _db_fail_count <= 2:
raise DatabaseUnavailableError("connection pool exhausted")
_db_fail_count = 0
return {"id": user_id, "name": f"User {user_id}"}
@stamina.retry(
on=DatabaseUnavailableError,
attempts=3,
wait_initial=0.1,
)
def write_event(event: dict) -> bool:
"""Write an event record — 3 attempts, fast initial retry for write idempotency."""
# In production: INSERT with conflict handling
return True
# ─────────────────────────────────────────────────────────────────────────────
# 3. retry_context — manual retry loop for conditional control
# ─────────────────────────────────────────────────────────────────────────────
class RateLimitError(Exception):
def __init__(self, retry_after: float) -> None:
self.retry_after = retry_after
super().__init__(f"Rate limited — retry after {retry_after}s")
def call_with_rate_limit_handling(endpoint: str) -> dict:
"""
retry_context gives a for-loop interface — useful when you need
to inspect the exception before deciding to retry.
"""
_attempts = 0
for attempt in stamina.retry_context(on=Exception, attempts=5, wait_initial=0.05):
with attempt:
_attempts += 1
if _attempts <= 2:
raise RateLimitError(retry_after=0.1)
return {"endpoint": endpoint, "result": "ok", "attempts": _attempts}
return {} # unreachable — stamina re-raises after max attempts
# ─────────────────────────────────────────────────────────────────────────────
# 4. Async retry — same decorator works for async functions
# ─────────────────────────────────────────────────────────────────────────────
_queue_fail = 0
@stamina.retry(
on=ConnectionError,
attempts=6,
wait_initial=0.05,
wait_max=5.0,
)
async def publish_event(topic: str, payload: dict) -> bool:
"""Async message queue publish — retried on connection drops."""
global _queue_fail
_queue_fail += 1
if _queue_fail <= 3:
raise ConnectionError("queue broker unavailable")
_queue_fail = 0
return True
# ─────────────────────────────────────────────────────────────────────────────
# 5. Disabling retries in tests
# ─────────────────────────────────────────────────────────────────────────────
class RetryDisabledContext:
"""Context manager that disables stamina retries — useful in tests."""
def __enter__(self) -> "RetryDisabledContext":
stamina.set_active(False)
return self
def __exit__(self, *_) -> None:
stamina.set_active(True)
def test_fetch_user_raises_on_db_error() -> None:
"""
stamina.set_active(False) makes @stamina.retry a no-op so the
function raises immediately — tests check error handling, not retry behavior.
"""
with RetryDisabledContext():
global _db_fail_count
_db_fail_count = 99 # force failure
try:
fetch_user(1)
assert False, "should have raised"
except DatabaseUnavailableError:
pass # expected
finally:
_db_fail_count = 0
print("test_fetch_user_raises_on_db_error: PASS")
# ─────────────────────────────────────────────────────────────────────────────
# 6. Monitoring — count retries in Prometheus / StatsD
# ─────────────────────────────────────────────────────────────────────────────
_retry_counts: dict[str, int] = {}
def _counting_handler(details: stamina.RetryDetails) -> None:
_retry_counts[details.name] = _retry_counts.get(details.name, 0) + 1
# In production: prometheus_client.Counter("retries_total").labels(fn=details.name).inc()
def install_counting_instrumentation() -> None:
"""Layer a counting handler on top of the logging one."""
original = stamina.instrumentation.get_on_retry_handler()
def _combined(details: stamina.RetryDetails) -> None:
if original:
original(details)
_counting_handler(details)
stamina.instrumentation.set_on_retry_handler(_combined)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING,
format="%(levelname)s %(name)s %(message)s")
print("=== API client retry ===")
client = SimpleAPIClient("https://api.example.com")
result = client.get("/users/1")
print(f" {result}")
print("\n=== DB retry ===")
user = fetch_user(42)
print(f" {user}")
print("\n=== retry_context manual loop ===")
data = call_with_rate_limit_handling("/endpoint")
print(f" {data}")
print("\n=== Test isolation ===")
test_fetch_user_raises_on_db_error()
print("\n=== Async retry ===")
import asyncio
ok = asyncio.run(publish_event("events", {"type": "user.created"}))
print(f" published: {ok}")
For the tenacity alternative — tenacity provides a rich set of stop, wait, and retry combinators (stop_after_attempt, wait_exponential_jitter, retry_if_exception_type) that compose into a fluent decorator, while stamina is a thinner wrapper with opinionated defaults (exponential backoff with jitter baked in, no combinators needed) and first-class support for the stamina.set_active(False) test-isolation pattern that tenacity requires you to mock manually, plus a structured RetryDetails object passed to the instrumentation handler instead of raw event callbacks. For the time.sleep loop alternative — a hand-rolled for attempt in range(N): try: ... except Err: time.sleep(delay * 2**attempt) loop grows in every service that needs retries, has no jitter (thundering herd on recovery), no total-budget timeout, and no observability hook, while @stamina.retry(on=Err, attempts=N) handles all of this with one line, attaches the instrumentation handler for structured logging and metrics, and the same decorator works identically for sync and async functions. The Claude Skills 360 bundle includes stamina skill sets covering @stamina.retry on single and multiple exception types, conditional on=lambda for status-code filtering, attempts vs timeout budget selection, wait_initial/wait_max/wait_factor/wait_jitter tuning, retry_context manual loop for inspection-based logic, async retry with the same decorator, set_active(False) for test isolation, custom on_retry_handler for structlog and Prometheus, counting instrumentation for retry rate dashboards, and layering multiple instrumentation handlers. Start with the free tier to try production retry code generation.