anyio provides async I/O that runs on asyncio and trio. pip install anyio. Run: import anyio; anyio.run(main, backend="asyncio") or backend="trio". Tasks: async with anyio.create_task_group() as tg: tg.start_soon(job1); tg.start_soon(job2) — both run concurrently; exceptions from any task cancel the group. Cancel: with anyio.move_on_after(5): await slow_io() — silently continues after timeout. with anyio.fail_after(5): await slow_io() — raises TimeoutError. Sleep: await anyio.sleep(1.0). Threads: result = await anyio.to_thread.run_sync(blocking_fn, arg). anyio.from_thread.run_sync(fn_in_event_loop) from a sync thread. await anyio.to_thread.run_sync(fn, cancellable=True). TCP: async with await anyio.connect_tcp("host", 8080) as stream: await stream.send(b"hello"); data = await stream.receive(4096). Server: async with anyio.create_tcp_listener(local_port=8080) as listener: await listener.serve(handler). Memory streams (channels): send_stream, recv_stream = anyio.create_memory_object_stream[bytes](max_buffer_size=100). await send_stream.send(item). async for item in recv_stream: .... Sync primitives: lock = anyio.Lock(). sema = anyio.Semaphore(10). event = anyio.Event(); await event.wait(); event.set(). CapacityLimiter: limiter = anyio.CapacityLimiter(20). async with limiter: await task(). ASGI: anyio underpins Starlette and FastAPI — they use anyio internally. pytest: pip install anyio[trio] pytest-anyio. @pytest.mark.anyio async def test_fn(): ... — runs on both backends. anyio_backend fixture to restrict. pytest --anyio-backends=asyncio,trio. Claude Code generates anyio task groups, timeout guards, thread bridges, and backend-parametrized tests.
CLAUDE.md for anyio
## anyio Stack
- Version: anyio >= 4.4 | pip install "anyio[trio]" pytest-anyio
- Run: anyio.run(main, backend="asyncio") | backend="trio"
- TaskGroup: async with anyio.create_task_group() as tg: tg.start_soon(coro)
- Timeout: with anyio.move_on_after(N): ... | fail_after raises TimeoutError
- Threads: await anyio.to_thread.run_sync(blocking_fn) — no GIL blocking
- Channels: send, recv = anyio.create_memory_object_stream[T](max_buffer_size=N)
- Test: @pytest.mark.anyio — runs on asyncio + trio automatically
anyio Async Compatibility Pipeline
# app/anyio_patterns.py — anyio task groups, channels, threads, and testing
from __future__ import annotations
import time
from typing import AsyncIterator
import anyio
import anyio.abc
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
# ─────────────────────────────────────────────────────────────────────────────
# 1. TaskGroup — structured concurrency
# ─────────────────────────────────────────────────────────────────────────────
async def fetch_user(user_id: int) -> dict:
"""Simulated async fetch — replace with real DB / HTTP call."""
await anyio.sleep(0.01)
return {"id": user_id, "email": f"user{user_id}@example.com"}
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
"""
Fetch multiple users concurrently with a TaskGroup.
All tasks run concurrently; if any raises, the group is cancelled.
"""
results: list[dict | None] = [None] * len(user_ids)
async with anyio.create_task_group() as tg:
async def _fetch(index: int, uid: int) -> None:
results[index] = await fetch_user(uid)
for i, uid in enumerate(user_ids):
tg.start_soon(_fetch, i, uid)
return [r for r in results if r is not None]
async def parallel_pipeline(items: list[int]) -> dict[str, list]:
"""Run two independent processing pipelines concurrently."""
fast_results: list[int] = []
slow_results: list[int] = []
async def fast_process() -> None:
for item in items:
await anyio.sleep(0)
fast_results.append(item * 2)
async def slow_process() -> None:
for item in items:
await anyio.sleep(0.001)
slow_results.append(item ** 2)
async with anyio.create_task_group() as tg:
tg.start_soon(fast_process)
tg.start_soon(slow_process)
return {"fast": fast_results, "slow": slow_results}
# ─────────────────────────────────────────────────────────────────────────────
# 2. Cancellation — move_on_after and fail_after
# ─────────────────────────────────────────────────────────────────────────────
async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict | None:
"""
move_on_after: if the block takes longer than `timeout` seconds,
silently exit the block (returns None here).
"""
with anyio.move_on_after(timeout) as scope:
await anyio.sleep(0.01) # simulated network call
return {"url": url, "status": 200}
# scope.cancelled_caught is True if the timeout fired
if scope.cancelled_caught:
return None
return None
async def fetch_or_fail(url: str, timeout: float = 5.0) -> dict:
"""
fail_after: raises TimeoutError if the block exceeds timeout.
Use when the caller must handle the timeout explicitly.
"""
with anyio.fail_after(timeout):
await anyio.sleep(0.01)
return {"url": url, "status": 200}
async def fetch_many_with_deadline(
urls: list[str], total_timeout: float = 10.0,
) -> list[dict | None]:
"""Apply a single deadline to a group of parallel fetches."""
results: list[dict | None] = [None] * len(urls)
with anyio.move_on_after(total_timeout) as scope:
async with anyio.create_task_group() as tg:
async def _fetch(i: int, url: str) -> None:
results[i] = await fetch_with_timeout(url, timeout=2.0)
for i, url in enumerate(urls):
tg.start_soon(_fetch, i, url)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 3. Thread bridge — run blocking code without blocking the event loop
# ─────────────────────────────────────────────────────────────────────────────
def read_large_file(path: str) -> bytes:
"""Blocking file I/O — safe to run via to_thread.run_sync."""
with open(path, "rb") as f:
return f.read()
def cpu_intensive_hash(data: bytes) -> str:
"""CPU-bound — offload to a thread so the event loop stays responsive."""
import hashlib
return hashlib.sha256(data).hexdigest()
async def hash_file(path: str) -> str:
"""Read and hash a file without blocking the event loop."""
data = await anyio.to_thread.run_sync(read_large_file, path)
digest = await anyio.to_thread.run_sync(cpu_intensive_hash, data)
return digest
async def parallel_hashes(paths: list[str]) -> dict[str, str]:
"""Hash multiple files concurrently using threads for I/O."""
digests: dict[str, str] = {}
limiter = anyio.CapacityLimiter(4) # max 4 concurrent threads
async def _hash(p: str) -> None:
async with limiter:
digests[p] = await hash_file(p)
async with anyio.create_task_group() as tg:
for path in paths:
tg.start_soon(_hash, path)
return digests
# ─────────────────────────────────────────────────────────────────────────────
# 4. Memory streams — in-process producer/consumer channels
# ─────────────────────────────────────────────────────────────────────────────
async def producer_consumer_demo() -> list[int]:
"""
Memory object streams are typed, buffered channels for passing objects
between tasks — similar to Go channels or asyncio.Queue but typed.
"""
send_stream: MemoryObjectSendStream[int]
recv_stream: MemoryObjectReceiveStream[int]
send_stream, recv_stream = anyio.create_memory_object_stream[int](max_buffer_size=10)
results: list[int] = []
async def producer() -> None:
async with send_stream:
for i in range(5):
await send_stream.send(i * 2)
await anyio.sleep(0)
async def consumer() -> None:
async with recv_stream:
async for item in recv_stream:
results.append(item)
async with anyio.create_task_group() as tg:
tg.start_soon(producer)
tg.start_soon(consumer)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 5. Synchronization primitives
# ─────────────────────────────────────────────────────────────────────────────
async def rate_limited_requests(
items: list[str], max_concurrent: int = 5,
) -> list[str]:
"""Limit concurrency with a Semaphore — avoid overwhelming downstream services."""
semaphore = anyio.Semaphore(max_concurrent)
results: list[str] = []
lock = anyio.Lock()
async def _process(item: str) -> None:
async with semaphore:
await anyio.sleep(0) # simulated async work
async with lock:
results.append(f"done:{item}")
async with anyio.create_task_group() as tg:
for item in items:
tg.start_soon(_process, item)
return results
async def event_coordination() -> list[str]:
"""
anyio.Event: set() signals all waiters.
Unlike asyncio.Event, it works on both asyncio and trio.
"""
ready = anyio.Event()
log: list[str] = []
async def waiter(name: str) -> None:
await ready.wait()
log.append(f"{name} unblocked")
async def setter() -> None:
await anyio.sleep(0.01)
log.append("setter: setting event")
ready.set()
async with anyio.create_task_group() as tg:
tg.start_soon(waiter, "A")
tg.start_soon(waiter, "B")
tg.start_noon(setter)
return log
# ─────────────────────────────────────────────────────────────────────────────
# 6. Entry point — backend selection
# ─────────────────────────────────────────────────────────────────────────────
async def main() -> None:
# Concurrent fetches
users = await fetch_all_users([1, 2, 3, 4, 5])
print(f"Fetched {len(users)} users concurrently")
# Pipeline
results = await parallel_pipeline([1, 2, 3])
print(f"Fast: {results['fast']}")
# Timeout
result = await fetch_with_timeout("https://example.com", timeout=0.001)
print(f"Timeout demo: {result}")
# Channels
items = await producer_consumer_demo()
print(f"Channel items: {items}")
# Concurrency limit
processed = await rate_limited_requests(["a", "b", "c", "d", "e"])
print(f"Rate limited: {len(processed)} processed")
if __name__ == "__main__":
# Switch to trio by changing backend="trio"
anyio.run(main, backend="asyncio")
# ─────────────────────────────────────────────────────────────────────────────
# 7. Tests — pytest-anyio runs on both asyncio and trio
# ─────────────────────────────────────────────────────────────────────────────
# tests/test_anyio_patterns.py
#
# pyproject.toml:
# [tool.pytest.ini_options]
# anyio_backends = ["asyncio", "trio"]
#
# Or per-file:
# pytestmark = pytest.mark.anyio
import pytest
pytestmark = pytest.mark.anyio # all tests in this section run on both backends
async def test_fetch_all_users() -> None:
users = await fetch_all_users([1, 2, 3])
assert len(users) == 3
assert all("email" in u for u in users)
async def test_fetch_with_timeout_succeeds() -> None:
result = await fetch_with_timeout("https://example.com", timeout=5.0)
assert result is not None
assert result["status"] == 200
async def test_fetch_with_timeout_exceeded() -> None:
"""move_on_after returns None when the timeout fires."""
async def slow() -> dict:
await anyio.sleep(10)
return {}
result = None
with anyio.move_on_after(0.01) as scope:
await anyio.sleep(10)
result = {"ok": True}
assert scope.cancelled_caught
assert result is None
async def test_fail_after_raises() -> None:
with pytest.raises(TimeoutError):
with anyio.fail_after(0.01):
await anyio.sleep(10)
async def test_producer_consumer_channel() -> None:
items = await producer_consumer_demo()
assert items == [0, 2, 4, 6, 8]
async def test_rate_limited_requests() -> None:
results = await rate_limited_requests(["a", "b", "c"], max_concurrent=2)
assert len(results) == 3
assert all(r.startswith("done:") for r in results)
async def test_parallel_pipeline() -> None:
result = await parallel_pipeline([1, 2, 3])
assert result["fast"] == [2, 4, 6]
assert result["slow"] == [1, 4, 9]
For the asyncio directly alternative — asyncio.gather() and asyncio.create_task() are asyncio-only APIs, while anyio.create_task_group() runs identically on asyncio and trio — structured concurrency with automatic cancellation propagation: if task A raises, the group cancels task B before re-raising, preventing resource leaks that asyncio.gather(return_exceptions=True) masks. For the asyncio.wait_for alternative — asyncio.wait_for(coro, timeout) only works with asyncio, while anyio.move_on_after(N) and anyio.fail_after(N) work on asyncio and trio, compose correctly inside nested task groups, and expose scope.cancelled_caught to distinguish a timeout from a normal exit — and anyio.to_thread.run_sync(fn, cancellable=True) interrupts a blocking thread when the surrounding scope is cancelled, something asyncio.wait_for + concurrent.futures cannot do. The Claude Skills 360 bundle includes anyio skill sets covering anyio.run with asyncio/trio backend selection, create_task_group for structured concurrency, move_on_after/fail_after timeout scopes, scope.cancelled_caught for timeout detection, to_thread.run_sync for blocking code, CapacityLimiter for thread pool control, create_memory_object_stream typed channels, Lock/Semaphore/Event synchronization primitives, pytest-anyio backend-parametrized tests, and anyio integration with FastAPI and Starlette. Start with the free tier to try structured async code generation.