Python’s asyncio module runs coroutines on an event loop for I/O-bound concurrency. import asyncio. run: asyncio.run(main()) — entry point. async def / await: async def fetch(): await asyncio.sleep(1). create_task: task = asyncio.create_task(coro()) — schedule concurrent execution. gather: results = await asyncio.gather(coro1(), coro2()) — parallel, returns list. wait_for: await asyncio.wait_for(coro(), timeout=5.0) — raises TimeoutError. sleep: await asyncio.sleep(0) — yield control; await asyncio.sleep(n) — delay. timeout: async with asyncio.timeout(5.0): — context manager (Python 3.11+). Queue: q = asyncio.Queue(maxsize=10); await q.put(item); item = await q.get(); q.task_done(). PriorityQueue / LifoQueue. Event: e = asyncio.Event(); await e.wait(); e.set(); e.clear(). Lock: async with lock:. Semaphore: async with asyncio.Semaphore(5): — limit concurrency. TaskGroup: async with asyncio.TaskGroup() as tg: tg.create_task(coro()) (Python 3.11+). to_thread: await asyncio.to_thread(blocking_fn, *args) — run in thread pool. as_completed: for coro in asyncio.as_completed(coros): result = await coro. shield: await asyncio.shield(task) — ignore cancellation. CancelledError: raised on cancellation. Claude Code generates async API clients, producer-consumer pipelines, concurrent scrapers, and TCP servers.
CLAUDE.md for asyncio
## asyncio Stack
- Stdlib: import asyncio
- Entry: asyncio.run(main())
- Parallel: results = await asyncio.gather(*coros, return_exceptions=True)
- Timeout: await asyncio.wait_for(coro(), timeout=30.0)
- Limit: async with asyncio.Semaphore(N): await coro()
- Thread: result = await asyncio.to_thread(blocking_fn, arg)
- Queue: asyncio.Queue(maxsize=N) — producer/consumer
asyncio Concurrent I/O Pipeline
# app/asyncutil.py — gather, TaskGroup, Semaphore, Queue, to_thread, retry
from __future__ import annotations
import asyncio
import functools
import logging
import time
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Callable, Coroutine, TypeVar
log = logging.getLogger(__name__)
T = TypeVar("T")
R = TypeVar("R")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Concurrent execution helpers
# ─────────────────────────────────────────────────────────────────────────────
async def gather_safe(
*coros: Coroutine,
return_exceptions: bool = True,
) -> list[Any]:
"""
Run coroutines concurrently; return results list.
Exceptions are returned as values when return_exceptions=True.
Example:
results = await gather_safe(fetch(url1), fetch(url2), fetch(url3))
values = [r for r in results if not isinstance(r, Exception)]
"""
return await asyncio.gather(*coros, return_exceptions=return_exceptions)
async def run_concurrently(
fn: Callable[..., Coroutine],
items: list[Any],
max_concurrency: int = 10,
timeout: float | None = None,
) -> list[Any]:
"""
Apply async fn to each item with bounded concurrency.
Returns list of results or exceptions in input order.
Example:
results = await run_concurrently(fetch_page, urls, max_concurrency=5)
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded(item):
async with semaphore:
if timeout:
return await asyncio.wait_for(fn(item), timeout=timeout)
return await fn(item)
return await asyncio.gather(
*(bounded(item) for item in items),
return_exceptions=True,
)
async def first_success(
*coros: Coroutine,
) -> Any:
"""
Return the result of the first coroutine to succeed; cancel the rest.
Example:
result = await first_success(fetch_primary(url), fetch_mirror(url))
"""
tasks = [asyncio.create_task(c) for c in coros]
try:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
return next(iter(done)).result()
except Exception:
for t in tasks:
t.cancel()
raise
async def with_timeout(coro: Coroutine, timeout: float, default: Any = None) -> Any:
"""
Run coro with a timeout; return default if timeout expires.
Example:
data = await with_timeout(fetch(url), timeout=5.0, default={})
"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except (asyncio.TimeoutError, TimeoutError):
return default
# ─────────────────────────────────────────────────────────────────────────────
# 2. Async retry decorator
# ─────────────────────────────────────────────────────────────────────────────
def async_retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple = (Exception,),
):
"""
Retry an async function on specified exceptions with exponential backoff.
Example:
@async_retry(max_attempts=3, delay=0.5, exceptions=(aiohttp.ClientError,))
async def fetch(url: str) -> bytes:
...
"""
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
d = delay
for attempt in range(1, max_attempts + 1):
try:
return await fn(*args, **kwargs)
except exceptions as exc:
if attempt == max_attempts:
raise
log.warning("%s attempt %d/%d failed: %s — retrying in %.1fs",
fn.__name__, attempt, max_attempts, exc, d)
await asyncio.sleep(d)
d *= backoff
return wrapper
return decorator
# ─────────────────────────────────────────────────────────────────────────────
# 3. Producer-consumer queue
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class WorkResult:
item: Any
result: Any = None
error: Exception | None = None
@property
def ok(self) -> bool:
return self.error is None
async def process_queue(
items: list[Any],
worker_fn: Callable[..., Coroutine],
n_workers: int = 4,
maxsize: int = 0,
) -> list[WorkResult]:
"""
Process items through an async queue with n_workers consumers.
Returns results in completion order.
Example:
results = await process_queue(urls, fetch_url, n_workers=8)
successes = [r for r in results if r.ok]
"""
queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
output: list[WorkResult] = []
async def producer():
for item in items:
await queue.put(item)
for _ in range(n_workers):
await queue.put(None) # sentinel
async def worker():
while True:
item = await queue.get()
if item is None:
queue.task_done()
return
try:
result = await worker_fn(item)
output.append(WorkResult(item=item, result=result))
except Exception as exc:
output.append(WorkResult(item=item, error=exc))
finally:
queue.task_done()
await asyncio.gather(
producer(),
*[worker() for _ in range(n_workers)],
)
return output
# ─────────────────────────────────────────────────────────────────────────────
# 4. Running blocking code
# ─────────────────────────────────────────────────────────────────────────────
async def run_blocking(fn: Callable, *args, **kwargs) -> Any:
"""
Run a blocking function in a thread pool without blocking the event loop.
Example:
data = await run_blocking(open("large.csv").read)
result = await run_blocking(expensive_cpu_fn, param)
"""
return await asyncio.to_thread(fn, *args, **kwargs)
async def delay_call(fn: Callable, delay: float, *args, **kwargs) -> Any:
"""
Call fn after a delay.
Example:
asyncio.create_task(delay_call(cleanup, delay=60.0))
"""
await asyncio.sleep(delay)
return fn(*args, **kwargs)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Async generator utilities
# ─────────────────────────────────────────────────────────────────────────────
async def arange(start: int, stop: int, step: int = 1) -> AsyncIterator[int]:
"""
Async generator equivalent of range() — yields values with event-loop yield.
Example:
async for i in arange(0, 10):
await process(i)
"""
i = start
while i < stop:
yield i
i += step
await asyncio.sleep(0) # yield control
async def amap(
fn: Callable[..., Coroutine],
items: list[Any],
max_concurrency: int = 10,
) -> AsyncIterator[Any]:
"""
Async map with bounded concurrency, yielding results as they complete.
Example:
async for result in amap(fetch, urls, max_concurrency=5):
process(result)
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded(item):
async with semaphore:
return await fn(item)
tasks = [asyncio.create_task(bounded(item)) for item in items]
for coro in asyncio.as_completed(tasks):
yield await coro
@dataclass
class AsyncRateLimiter:
"""
Token-bucket rate limiter for async code.
Example:
limiter = AsyncRateLimiter(rate=10, per=1.0) # 10 calls/sec
async for url in urls:
await limiter.acquire()
asyncio.create_task(fetch(url))
"""
rate: float # max calls
per: float # per this many seconds
_tokens: float = field(init=False)
_last: float = field(init=False)
_lock: asyncio.Lock = field(init=False)
def __post_init__(self):
self._tokens = self.rate
self._last = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: float = 1.0) -> None:
async with self._lock:
while True:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self.rate, self._tokens + elapsed * (self.rate / self.per))
self._last = now
if self._tokens >= tokens:
self._tokens -= tokens
return
wait = (tokens - self._tokens) * (self.per / self.rate)
await asyncio.sleep(wait)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
async def _demo():
print("=== asyncio demo ===")
print("\n--- gather_safe ---")
async def task(n, fail=False):
await asyncio.sleep(n * 0.05)
if fail:
raise ValueError(f"task {n} failed")
return n * 10
results = await gather_safe(task(1), task(2), task(3, fail=True))
for r in results:
print(f" {r!r}")
print("\n--- run_concurrently ---")
async def slow_double(x):
await asyncio.sleep(0.02)
return x * 2
t0 = time.monotonic()
outputs = await run_concurrently(slow_double, list(range(10)), max_concurrency=4)
elapsed = time.monotonic() - t0
print(f" results: {outputs}")
print(f" elapsed: {elapsed:.2f}s (4 concurrent × 0.02s = ~0.05s expected)")
print("\n--- with_timeout ---")
async def slow():
await asyncio.sleep(10)
return "done"
val = await with_timeout(slow(), timeout=0.05, default="timed_out")
print(f" result: {val!r}")
print("\n--- async_retry ---")
call_count = 0
@async_retry(max_attempts=3, delay=0.01, exceptions=(RuntimeError,))
async def flaky():
nonlocal call_count
call_count += 1
if call_count < 3:
raise RuntimeError("not yet")
return "ok"
result = await flaky()
print(f" result: {result!r} calls: {call_count}")
print("\n--- process_queue ---")
async def compute(n):
await asyncio.sleep(0.01)
return n ** 2
work_results = await process_queue(list(range(8)), compute, n_workers=4)
print(f" {len(work_results)} results, successes: {sum(1 for r in work_results if r.ok)}")
print(f" values: {sorted(r.result for r in work_results if r.ok)}")
print("\n--- run_blocking ---")
import os
cwd = await run_blocking(os.getcwd)
print(f" cwd (from thread): {cwd!r}")
print("\n--- arange ---")
nums = []
async for i in arange(0, 5):
nums.append(i)
print(f" arange(0, 5): {nums}")
print("\n--- AsyncRateLimiter ---")
limiter = AsyncRateLimiter(rate=5.0, per=1.0)
t0 = time.monotonic()
for _ in range(5):
await limiter.acquire()
print(f" 5 calls acquired in {time.monotonic() - t0:.3f}s (should be ~0s, all tokens available)")
print("\n=== done ===")
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING)
asyncio.run(_demo())
For the trio alternative — trio (PyPI, claude-code-trio) enforces strict structured concurrency with nurseries (async with trio.open_nursery() as n: n.start_soon(coro)), making task lifetimes explicit and cancellation deterministic; asyncio.TaskGroup (Python 3.11+) borrows the structured-concurrency model but is tied to the asyncio event loop — use trio for new applications where airtight cancellation semantics and a no-implicit-task-escape discipline matter, asyncio when using libraries that require it (FastAPI, aiohttp, Celery) or targeting Python <3.11 without the TaskGroup API. For the anyio alternative — anyio (PyPI, claude-code-anyio) provides a compatibility shim running the same async code on asyncio or trio, with anyio.create_task_group(), anyio.sleep(), and anyio.to_thread.run_sync() that work identically on both backends; stdlib asyncio has no portability between event loops — use anyio for library code that should be backend-agnostic, asyncio directly for applications where asyncio is the committed runtime. The Claude Skills 360 bundle includes asyncio skill sets covering gather_safe()/run_concurrently()/first_success()/with_timeout() parallel helpers, async_retry() decorator with exponential backoff, process_queue() producer-consumer with n_workers, run_blocking()/delay_call() thread-pool bridges, arange()/amap() async generators, and AsyncRateLimiter token-bucket throttle. Start with the free tier to try concurrent I/O and asyncio pipeline code generation.