contextlib provides utilities for creating and working with with-statement context managers. from contextlib import contextmanager, asynccontextmanager, ExitStack, suppress, redirect_stdout, nullcontext. contextmanager: @contextmanager def cm(): setup; try: yield value; finally: teardown. asynccontextmanager: @asynccontextmanager async def acm(): setup; try: yield value; finally: await teardown. ExitStack: with ExitStack() as stack: f = stack.enter_context(cm()); stack.callback(fn) — dynamic cleanup. AsyncExitStack: async equivalent. suppress: with suppress(FileNotFoundError, KeyError): risky_op() — swallow specific exceptions. redirect_stdout: with redirect_stdout(buf): fn() — capture stdout. redirect_stderr: with redirect_stderr(io.StringIO()) as buf: fn(). closing: with closing(urllib.request.urlopen(url)) as r: data = r.read() — calls .close() on exit. nullcontext: with nullcontext(val) if cond else real_cm(): — no-op placeholder. chdir: with contextlib.chdir("/tmp"): os.getcwd() (Python 3.11+). AbstractContextManager: base class with __enter__/__exit__. aclosing: async with aclosing(aiter) as it: — calls aclose() on async generator. contextdecorator: methods with __enter__/__exit__ can also be used as decorators. Claude Code generates resource managers, test fixtures, output capture, and multi-cleanup stacks.
CLAUDE.md for contextlib
## contextlib Stack
- Stdlib: from contextlib import contextmanager, asynccontextmanager, ExitStack, suppress
- Simple CM: @contextmanager def cm(): setup; try: yield val; finally: teardown
- Async CM: @asynccontextmanager async def acm(): setup; try: yield val; finally: await teardown
- Dynamic: with ExitStack() as stack: stack.enter_context(cm) | stack.callback(fn)
- Suppress: with suppress(ExcType): risky() — preferred over bare except: pass
- Capture: with redirect_stdout(io.StringIO()) as out: fn(); out.getvalue()
contextlib Resource Pipeline
# app/resources.py — contextmanager, ExitStack, suppress, redirect, timer, temp
from __future__ import annotations
import io
import logging
import os
import sys
import tempfile
import threading
import time
from contextlib import (
AbstractContextManager,
AsyncExitStack,
ExitStack,
asynccontextmanager,
contextmanager,
nullcontext,
redirect_stderr,
redirect_stdout,
suppress,
)
from pathlib import Path
from typing import Any, AsyncIterator, Callable, Generator, Iterator
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Timer context managers
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def timer(name: str = "", logger=None) -> Generator[dict, None, None]:
"""
Measure elapsed time; yield a dict updated on exit.
Example:
with timer("database query") as t:
rows = db.fetchall(sql)
print(f"Query took {t['elapsed']:.3f}s")
"""
_log = logger or log
info: dict = {"start": None, "elapsed": None}
info["start"] = time.perf_counter()
try:
yield info
finally:
info["elapsed"] = time.perf_counter() - info["start"]
if name:
_log.debug("%s: %.3fs", name, info["elapsed"])
class Stopwatch(AbstractContextManager):
"""
Reusable timer that records laps.
Example:
sw = Stopwatch()
with sw:
do_phase_1()
sw.lap("phase 1")
do_phase_2()
sw.lap("phase 2")
print(sw.laps) # [("phase 1", 0.12), ("phase 2", 0.34)]
print(sw.elapsed) # 0.46
"""
def __init__(self) -> None:
self._start = 0.0
self._laps: list[tuple[str, float]] = []
def __enter__(self) -> Stopwatch:
self._start = time.perf_counter()
self._laps = []
return self
def __exit__(self, *_) -> None:
pass # elapsed calculated lazily
def lap(self, name: str = "") -> float:
elapsed = time.perf_counter() - self._start
self._laps.append((name, elapsed))
return elapsed
@property
def elapsed(self) -> float:
return time.perf_counter() - self._start
@property
def laps(self) -> list[tuple[str, float]]:
return list(self._laps)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Output capture
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def capture_output() -> Generator[dict[str, str], None, None]:
"""
Capture stdout and stderr as strings.
Example:
with capture_output() as out:
print("hello")
print("error", file=sys.stderr)
print(out["stdout"]) # "hello\n"
print(out["stderr"]) # "error\n"
"""
stdout_buf = io.StringIO()
stderr_buf = io.StringIO()
output: dict[str, str] = {}
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
try:
yield output
finally:
output["stdout"] = stdout_buf.getvalue()
output["stderr"] = stderr_buf.getvalue()
@contextmanager
def silence() -> Generator[None, None, None]:
"""
Suppress all stdout and stderr output within the block.
Example:
with silence():
noisy_library_call()
"""
devnull = open(os.devnull, "w")
with redirect_stdout(devnull), redirect_stderr(devnull):
try:
yield
finally:
devnull.close()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Temporary file/directory helpers
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def temp_file(
suffix: str = "",
prefix: str = "tmp_",
content: str | bytes | None = None,
encoding: str = "utf-8",
) -> Generator[Path, None, None]:
"""
Create a named temp file; yield its Path; delete on exit.
Example:
with temp_file(".json", content='{"key":"value"}') as p:
data = json.loads(p.read_text())
"""
mode = "wb" if isinstance(content, bytes) else "w"
kwargs: dict = {} if isinstance(content, bytes) else {"encoding": encoding}
fd, path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
p = Path(path)
try:
if content is not None:
with open(fd, mode, **kwargs) as f:
f.write(content)
else:
os.close(fd)
yield p
finally:
p.unlink(missing_ok=True)
@contextmanager
def temp_dir(prefix: str = "tmp_") -> Generator[Path, None, None]:
"""
Create a temp directory; yield its Path; remove tree on exit.
Example:
with temp_dir() as d:
(d / "output.csv").write_text("a,b\n1,2\n")
"""
with tempfile.TemporaryDirectory(prefix=prefix) as d:
yield Path(d)
@contextmanager
def atomic_write(path: str | Path, mode: str = "w", encoding: str = "utf-8") -> Generator:
"""
Write to a temporary file then replace target atomically.
Example:
with atomic_write("config.json") as f:
json.dump(config, f, indent=2)
"""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(p.suffix + ".tmp")
is_binary = "b" in mode
kwargs = {} if is_binary else {"encoding": encoding}
try:
with open(tmp, mode, **kwargs) as f:
yield f
tmp.replace(p)
except Exception:
tmp.unlink(missing_ok=True)
raise
# ─────────────────────────────────────────────────────────────────────────────
# 4. ExitStack patterns
# ─────────────────────────────────────────────────────────────────────────────
def open_files(paths: list[str | Path], mode: str = "r") -> ExitStack:
"""
Open multiple files, guaranteed to all close even on error.
Returns the ExitStack; use as a context manager.
Example:
with open_files(["a.csv","b.csv"]) as stack:
# not typical usage — see open_all()
"""
stack = ExitStack()
for p in paths:
stack.enter_context(open(p, mode))
return stack
@contextmanager
def open_all(
paths: list[str | Path],
mode: str = "r",
encoding: str = "utf-8",
) -> Generator[list, None, None]:
"""
Open all files; yield the list of file handles; close all on exit.
Example:
with open_all(["in1.txt","in2.txt"]) as handles:
for h in handles:
process(h.read())
"""
with ExitStack() as stack:
handles = [
stack.enter_context(open(p, mode, encoding=encoding))
for p in paths
]
yield handles
@contextmanager
def managed_resources(*cms) -> Generator[list, None, None]:
"""
Enter multiple context managers; yield their values; exit all.
Example:
with managed_resources(open("a.txt"), open("b.txt"), lock) as (fa, fb, _):
fa.write(fb.read())
"""
with ExitStack() as stack:
yield [stack.enter_context(cm) for cm in cms]
# ─────────────────────────────────────────────────────────────────────────────
# 5. Async context managers
# ─────────────────────────────────────────────────────────────────────────────
@asynccontextmanager
async def async_timer(name: str = "") -> AsyncIterator[dict]:
"""
Async version of timer().
Example:
async with async_timer("fetch") as t:
data = await client.get("/api/data")
print(f"took {t['elapsed']:.3f}s")
"""
info: dict = {"elapsed": None}
t0 = time.perf_counter()
try:
yield info
finally:
info["elapsed"] = time.perf_counter() - t0
if name:
log.debug("async %s: %.3fs", name, info["elapsed"])
@asynccontextmanager
async def multi_async(*cms) -> AsyncIterator[list]:
"""
Enter multiple async context managers via AsyncExitStack.
Example:
async with multi_async(aiofiles.open("a"), aiofiles.open("b")) as (fa, fb):
await fa.write(await fb.read())
"""
async with AsyncExitStack() as stack:
results = []
for cm in cms:
results.append(await stack.enter_async_context(cm))
yield results
# ─────────────────────────────────────────────────────────────────────────────
# 6. Convenience wrappers
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def env_override(**env_vars: str) -> Generator[None, None, None]:
"""
Temporarily set environment variables; restore on exit.
Example:
with env_override(DATABASE_URL="sqlite:///:memory:", DEBUG="1"):
run_tests()
"""
original = {k: os.environ.get(k) for k in env_vars}
os.environ.update(env_vars)
try:
yield
finally:
for k, v in original.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
@contextmanager
def chdir(path: str | Path) -> Generator[Path, None, None]:
"""
Temporarily change the working directory.
Example:
with chdir("/repo"):
subprocess.run(["make", "test"])
"""
before = Path.cwd()
os.chdir(path)
try:
yield Path(path)
finally:
os.chdir(before)
def optional_cm(condition: bool, cm):
"""
Use a real context manager if condition is True, else nullcontext.
Example:
with optional_cm(verbose, timer("load")) as t:
data = load_file(path)
if verbose:
print(f"loaded in {t['elapsed']:.3f}s")
"""
return cm if condition else nullcontext()
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== contextlib demo ===")
print("\n--- timer ---")
with timer("sleep test") as t:
time.sleep(0.05)
print(f" elapsed: {t['elapsed']:.3f}s")
print("\n--- Stopwatch ---")
sw = Stopwatch()
with sw:
time.sleep(0.02)
sw.lap("phase 1")
time.sleep(0.03)
sw.lap("phase 2")
print(f" laps: {[(n, f'{e:.3f}s') for n,e in sw.laps]}")
print("\n--- capture_output ---")
with capture_output() as out:
print("hello stdout")
print("hello stderr", file=sys.stderr)
print(f" stdout: {out['stdout']!r}")
print(f" stderr: {out['stderr']!r}")
print("\n--- temp_file ---")
with temp_file(".json", content='{"ok": true}') as p:
data = json.loads(p.read_text())
print(f" read: {data} exists: {p.exists()}")
print(f" after: {p.exists()}")
print("\n--- temp_dir ---")
with temp_dir() as d:
(d / "test.txt").write_text("hello")
print(f" files: {[f.name for f in d.iterdir()]}")
print(f" dir exists after: {d.exists()}")
print("\n--- atomic_write ---")
with temp_dir() as d:
target = d / "config.json"
with atomic_write(target) as f:
json.dump({"env": "test"}, f)
print(f" written: {json.loads(target.read_text())}")
print("\n--- suppress ---")
with suppress(FileNotFoundError):
Path("/nonexistent/path/file.txt").read_text()
print(" suppress worked — no crash")
print("\n--- env_override ---")
os.environ.pop("TEST_VAR", None)
with env_override(TEST_VAR="hello", PATH=os.environ["PATH"]):
print(f" inside: TEST_VAR={os.environ.get('TEST_VAR')!r}")
print(f" after: TEST_VAR={os.environ.get('TEST_VAR')!r}")
print("\n=== done ===")
For the trio alternative — trio is a third-party async library built around structured concurrency with native support for async context managers, nurseries (replace async with AsyncExitStack), and cancel scopes (replaces manual timeout management); Python’s stdlib contextlib works with both asyncio and any async framework, requiring no additional dependencies — use trio when building async applications where structured concurrency and nursery-based task lifetimes are first-class concerns, contextlib.AsyncExitStack + asynccontextmanager when adding async cleanup to existing asyncio code. For the anyio alternative — anyio provides a compatibility layer across asyncio and trio, including its own create_task_group() (structured concurrency) and CancelScope; contextlib.suppress and asynccontextmanager work regardless of async backend — use anyio when writing async library code that must be backend-agnostic, stdlib contextlib for application-level resource management where backend choice is fixed. The Claude Skills 360 bundle includes contextlib skill sets covering timer()/Stopwatch() elapsed timing, capture_output()/silence() output redirection, temp_file()/temp_dir()/atomic_write() temporary resources, open_all()/managed_resources() ExitStack patterns, async_timer()/multi_async() async context managers, env_override()/chdir()/optional_cm() convenience wrappers. Start with the free tier to try resource lifecycle management and contextlib pipeline code generation.