Python’s contextvars module provides context-local storage that works correctly across both asyncio tasks and threads. import contextvars. Declare: var = contextvars.ContextVar("name", default=None) — the name is for debugging; default is returned by .get() when no value is set in the current context. Get: var.get() → current value or default; var.get(sentinel) → sentinel if unset (overrides default). Set: token = var.set(value) → Token capturing the prior state. Reset: var.reset(token) — rolls back to the value before set was called; must use the token from the matching set. Copy context: ctx = contextvars.copy_context() → Context — a snapshot of all ContextVar bindings in the current context. Run in context: ctx.run(fn, *args, **kwargs) — calls fn with the snapshot’s bindings; any set calls inside are isolated to that invocation and do not affect the outer context. Iteration: for var, value in ctx.items() — enumerate all bindings. Integration: asyncio automatically propagates context to tasks created with asyncio.create_task(); concurrent.futures.ThreadPoolExecutor copies context to each submitted callable. Claude Code generates request-scoped storage, middleware tracing, per-task configuration, and tenant-aware async handlers.
CLAUDE.md for contextvars
## contextvars Stack
- Stdlib: import contextvars
- Declare: var = contextvars.ContextVar("request_id", default=None)
- Set: token = var.set("req-abc123")
- Get: val = var.get() # "req-abc123"
- Reset: var.reset(token) # restore prior value
- Copy: ctx = contextvars.copy_context()
- ctx.run(fn, *args) # fn sees snapshot; set is isolated
- Note: asyncio.create_task() propagates context automatically
contextvars Context Variable Pipeline
# app/contextvarsutil.py — request context, scoped set, tracing, tenant
from __future__ import annotations
import contextvars
import threading
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Generator
# ─────────────────────────────────────────────────────────────────────────────
# 1. Reusable ContextVar helpers
# ─────────────────────────────────────────────────────────────────────────────
def scoped_set(
var: contextvars.ContextVar,
value: object,
) -> "contextmanager":
"""
Context manager: set var to value for the duration of the block,
then reset to the prior value (even on exception).
Example:
with scoped_set(REQUEST_ID, "req-42"):
print(REQUEST_ID.get()) # "req-42"
print(REQUEST_ID.get()) # previous value
"""
@contextmanager
def _inner() -> Generator[None, None, None]:
token = var.set(value)
try:
yield
finally:
var.reset(token)
return _inner()
def get_or_default(
var: contextvars.ContextVar,
sentinel: object = None,
) -> object:
"""
Return var's current value, or sentinel if unset (ignores ContextVar default).
Example:
val = get_or_default(TENANT_ID, sentinel="<anonymous>")
"""
_missing = object()
result = var.get(_missing)
return sentinel if result is _missing else result
def dump_context() -> dict[str, object]:
"""
Return all ContextVar bindings in the current context as {name: value}.
Useful for structured log output or debugging.
Example:
log.info("context", extra=dump_context())
"""
ctx = contextvars.copy_context()
return {var.name: value for var, value in ctx.items()}
# ─────────────────────────────────────────────────────────────────────────────
# 2. Request-scoped storage
# ─────────────────────────────────────────────────────────────────────────────
REQUEST_ID: contextvars.ContextVar[str | None] = contextvars.ContextVar("request_id", default=None)
TRACE_ID: contextvars.ContextVar[str | None] = contextvars.ContextVar("trace_id", default=None)
USER_ID: contextvars.ContextVar[int | None] = contextvars.ContextVar("user_id", default=None)
TENANT_ID: contextvars.ContextVar[str | None] = contextvars.ContextVar("tenant_id", default=None)
@dataclass
class RequestContext:
"""
Convenience wrapper to set multiple context vars for a request / task.
Example:
ctx = RequestContext.new(user_id=7, tenant_id="acme")
with ctx:
handle_request()
"""
request_id: str
trace_id: str
user_id: int | None = None
tenant_id: str | None = None
_tokens: list = field(default_factory=list, repr=False)
@classmethod
def new(
cls,
user_id: int | None = None,
tenant_id: str | None = None,
) -> "RequestContext":
return cls(
request_id=str(uuid.uuid4()),
trace_id=str(uuid.uuid4()),
user_id=user_id,
tenant_id=tenant_id,
)
def __enter__(self) -> "RequestContext":
self._tokens = [
REQUEST_ID.set(self.request_id),
TRACE_ID.set(self.trace_id),
USER_ID.set(self.user_id),
TENANT_ID.set(self.tenant_id),
]
return self
def __exit__(self, *_: object) -> None:
for tok in reversed(self._tokens):
tok.var.reset(tok)
@staticmethod
def current() -> dict[str, object]:
return {
"request_id": REQUEST_ID.get(),
"trace_id": TRACE_ID.get(),
"user_id": USER_ID.get(),
"tenant_id": TENANT_ID.get(),
}
# ─────────────────────────────────────────────────────────────────────────────
# 3. Isolated context execution
# ─────────────────────────────────────────────────────────────────────────────
def run_isolated(fn: "callable", *args: object, **kwargs: object) -> object:
"""
Run fn(*args, **kwargs) in a copy of the current context.
Any ContextVar.set() calls inside fn are isolated and do not
affect the caller's context.
Example:
result = run_isolated(process_task, task_data)
"""
ctx = contextvars.copy_context()
return ctx.run(fn, *args, **kwargs)
def snapshot_and_run(
overrides: dict[contextvars.ContextVar, object],
fn: "callable",
*args: object,
**kwargs: object,
) -> object:
"""
Copy the current context, apply overrides, then run fn in that snapshot.
The caller's context is not modified.
Example:
result = snapshot_and_run(
{TENANT_ID: "beta", USER_ID: 99},
compute_report,
)
"""
ctx = contextvars.copy_context()
def _with_overrides() -> object:
for var, val in overrides.items():
var.set(val)
return fn(*args, **kwargs)
return ctx.run(_with_overrides)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Thread-safe context propagation
# ─────────────────────────────────────────────────────────────────────────────
def spawn_with_context(
fn: "callable",
*args: object,
**kwargs: object,
) -> threading.Thread:
"""
Start a daemon thread that inherits the current context snapshot.
Example:
with RequestContext.new(user_id=10, tenant_id="acme"):
spawn_with_context(background_job, payload)
"""
ctx = contextvars.copy_context()
t = threading.Thread(
target=lambda: ctx.run(fn, *args, **kwargs),
daemon=True,
)
t.start()
return t
# ─────────────────────────────────────────────────────────────────────────────
# 5. Audit-trail decorator
# ─────────────────────────────────────────────────────────────────────────────
_audit: list[str] = []
def traced(fn: "callable") -> "callable":
"""
Decorator: read REQUEST_ID and TENANT_ID from context, record timing.
Example:
@traced
def place_order(order): ...
"""
def wrapper(*args: object, **kwargs: object) -> object:
rid = REQUEST_ID.get() or "(no-request)"
tid = TENANT_ID.get() or "(no-tenant)"
t0 = time.perf_counter()
try:
result = fn(*args, **kwargs)
elapsed = (time.perf_counter() - t0) * 1000
_audit.append(f"[OK] {fn.__name__} rid={rid} tid={tid} {elapsed:.1f}ms")
return result
except Exception as exc:
elapsed = (time.perf_counter() - t0) * 1000
_audit.append(f"[ERR] {fn.__name__} rid={rid} tid={tid} {elapsed:.1f}ms err={exc}")
raise
return wrapper
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== contextvars demo ===")
# ── RequestContext ────────────────────────────────────────────────────────
print("\n--- RequestContext ---")
print(f" before: {RequestContext.current()}")
with RequestContext.new(user_id=42, tenant_id="acme"):
print(f" inside: user_id={USER_ID.get()} tenant={TENANT_ID.get()}")
print(f" request_id prefix: {REQUEST_ID.get()[:8]}...")
print(f" after: user_id={USER_ID.get()} tenant={TENANT_ID.get()}")
# ── scoped_set ────────────────────────────────────────────────────────────
print("\n--- scoped_set ---")
DEMO_VAR: contextvars.ContextVar[str] = contextvars.ContextVar("demo", default="outer")
with scoped_set(DEMO_VAR, "inner"):
print(f" inside scoped_set: {DEMO_VAR.get()}")
print(f" after scoped_set: {DEMO_VAR.get()}")
# ── run_isolated ──────────────────────────────────────────────────────────
print("\n--- run_isolated ---")
with RequestContext.new(user_id=1, tenant_id="x"):
def mutate() -> str:
USER_ID.set(999)
TENANT_ID.set("y")
return f"inside: user={USER_ID.get()} tenant={TENANT_ID.get()}"
inner = run_isolated(mutate)
print(f" {inner}")
print(f" outside (unchanged): user={USER_ID.get()} tenant={TENANT_ID.get()}")
# ── snapshot_and_run ──────────────────────────────────────────────────────
print("\n--- snapshot_and_run ---")
with RequestContext.new(user_id=5, tenant_id="base"):
def report() -> str:
return f"user={USER_ID.get()} tenant={TENANT_ID.get()}"
r = snapshot_and_run({TENANT_ID: "override", USER_ID: 77}, report)
print(f" snapshot result: {r}")
print(f" caller unchanged: user={USER_ID.get()} tenant={TENANT_ID.get()}")
# ── spawn_with_context ────────────────────────────────────────────────────
print("\n--- spawn_with_context ---")
results: list[str] = []
with RequestContext.new(user_id=10, tenant_id="acme"):
t = spawn_with_context(
lambda: results.append(
f"thread sees user={USER_ID.get()} tenant={TENANT_ID.get()}"
)
)
t.join(timeout=2)
print(f" {results[0] if results else 'no result'}")
# ── traced decorator ──────────────────────────────────────────────────────
print("\n--- traced decorator ---")
@traced
def compute(x: int) -> int:
return x * 2
with RequestContext.new(user_id=3, tenant_id="demo"):
result = compute(21)
print(f" compute(21) = {result}")
print(f" audit log: {_audit[-1]}")
# ── dump_context ──────────────────────────────────────────────────────────
print("\n--- dump_context ---")
with RequestContext.new(user_id=99, tenant_id="z"):
ctx_dump = dump_context()
names = {k: v for k, v in ctx_dump.items() if v is not None}
print(f" context vars with values: {list(names.keys())}")
print("\n=== done ===")
For the threading.local alternative — threading.local() provides per-thread storage via attribute assignment; it is simpler but does not propagate to asyncio tasks or work correctly in async code where multiple coroutines share a single thread — use contextvars.ContextVar for all new code that might run under asyncio, since asyncio.create_task() propagates a context snapshot automatically; use threading.local only in pure-thread code where async is not involved. For the flask.g / werkzeug.local alternative — flask.g (backed by contextvars since Flask 2.2) stores per-request state inside Flask web apps — use flask.g inside Flask request handlers for web-specific per-request storage; use contextvars.ContextVar directly when building framework-agnostic libraries, middleware, gRPC services, or async microservices where Flask is not present. The Claude Skills 360 bundle includes contextvars skill sets covering scoped_set()/get_or_default()/dump_context() helpers, RequestContext with new()/current() multi-var request scoping, run_isolated()/snapshot_and_run() isolated context execution, spawn_with_context() context-propagating thread spawner, and @traced audit-trail decorator. Start with the free tier to try context variable patterns and contextvars pipeline code generation.