trio brings structured concurrency to Python async. pip install trio. Entry: import trio; trio.run(main). Nursery: async with trio.open_nursery() as nursery: nursery.start_soon(task1); nursery.start_soon(task2, arg). All tasks must finish before nursery exits — or all are cancelled on exception. Sleep: await trio.sleep(1). Checkpoint: await trio.lowlevel.checkpoint(). Cancel scope: with trio.CancelScope(deadline=trio.current_time() + 5): .... Timeout: with trio.move_on_after(5) as scope: await slow(); if scope.cancelled_caught: .... Fail: with trio.fail_after(5): await slow() → raises trio.Cancelled. Nested scopes: inner cancelled before outer. FIFO: trio.open_nursery exits FIFO order. Memory channel: send, recv = trio.open_memory_channel(max_buffer_size=10). await send.send(item). item = await recv.receive(). async for item in recv: process(item). Run in thread: result = await trio.to_thread.run_sync(blocking_fn, arg). From thread: trio.from_thread.run(async_fn, arg). trio.from_thread.run_sync(sync_fn). Capacity limiter: limiter = trio.CapacityLimiter(10). async with limiter: await work(). Testing: import pytest_trio. @pytest.fixture async def client(): .... trio.testing.MockClock(autojump_threshold=0). No callbacks: trio has no callbacks, no futures — only await. Exceptions: if any task raises, all siblings are cancelled and exception propagates. ExceptionGroup: Python 3.11+ — multiple exceptions surfaced as ExceptionGroup. except*. Claude Code generates trio nurseries, cancel scopes, memory channels, and pytest-trio fixtures.
CLAUDE.md for trio
## trio Stack
- Version: trio >= 0.26 | pip install trio pytest-trio
- Entry: trio.run(async_main) — single event loop, no asyncio
- Nursery: async with trio.open_nursery() as n: n.start_soon(fn, *args)
- Timeout: with trio.move_on_after(5) as scope: ... / trio.fail_after(5)
- Channel: send, recv = trio.open_memory_channel(N) — typed producer-consumer
- Thread: await trio.to_thread.run_sync(blocking_fn) — no event loop blocking
- Test: @pytest.fixture async def x() + trio.testing.MockClock(autojump_threshold=0)
trio Structured Concurrency Pipeline
# app/trio_patterns.py — nurseries, cancel scopes, channels, and thread interop
from __future__ import annotations
import time
from typing import Any, AsyncGenerator
import trio
import trio.testing
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic nursery — parallel tasks with automatic cleanup
# ─────────────────────────────────────────────────────────────────────────────
async def fetch_user(user_id: int, results: list[dict]) -> None:
"""Simulate an async API call."""
await trio.sleep(0.05)
results.append({"id": user_id, "name": f"User {user_id}"})
async def fetch_users_parallel(user_ids: list[int]) -> list[dict]:
"""
Nursery guarantees:
1. All tasks finish before the nursery block exits.
2. If any task raises, all remaining tasks are cancelled immediately.
3. The exception propagates to the nursery's parent scope.
"""
results: list[dict] = []
async with trio.open_nursery() as nursery:
for uid in user_ids:
nursery.start_soon(fetch_user, uid, results)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 2. Cancel scopes — timeout and soft cancellation
# ─────────────────────────────────────────────────────────────────────────────
async def slow_operation(name: str, seconds: float) -> str:
await trio.sleep(seconds)
return f"{name} done"
async def with_timeout(seconds: float) -> str | None:
"""
move_on_after: cancelled_caught=True if timed out — returns None.
Non-raising — useful when partial results are acceptable.
"""
with trio.move_on_after(seconds) as scope:
result = await slow_operation("data_fetch", 2.0)
return result
if scope.cancelled_caught:
print(f"[TIMEOUT] after {seconds}s")
return None
return None # unreachable
async def with_deadline() -> str:
"""
fail_after: raises trio.Cancelled if timed out — use when timeout = error.
"""
try:
with trio.fail_after(0.1):
return await slow_operation("critical_fetch", 5.0)
except trio.Cancelled:
raise RuntimeError("critical_fetch timed out") from None
async def nested_scopes() -> dict:
"""
Nested cancel scopes: inner scope timeout fires first.
Outer scope protects the outer block independently.
"""
results = {}
with trio.move_on_after(5) as outer:
with trio.move_on_after(0.1) as inner:
await trio.sleep(1) # inner fires first
results["inner_cancelled"] = inner.cancelled_caught
# Execution continues here — outer scope is still active
results["outer_active"] = not outer.cancelled_caught
return results
# ─────────────────────────────────────────────────────────────────────────────
# 3. Memory channels — producer/consumer pipelines
# ─────────────────────────────────────────────────────────────────────────────
async def producer(send: trio.MemorySendChannel, count: int) -> None:
"""Send `count` items then close the channel."""
async with send:
for i in range(count):
await trio.sleep(0.01)
await send.send({"id": i, "value": i * i})
async def consumer(recv: trio.MemoryReceiveChannel, results: list[dict]) -> None:
"""Receive items until the channel closes."""
async with recv:
async for item in recv:
results.append(item)
async def producer_consumer_pipeline(count: int = 10) -> list[dict]:
"""
open_memory_channel(N) — buffer N items before send blocks.
Clone channels for multiple producers/consumers.
"""
results: list[dict] = []
send, recv = trio.open_memory_channel(max_buffer_size=5)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send, count)
nursery.start_soon(consumer, recv, results)
return results
async def fan_out_pipeline(jobs: list[str], workers: int = 3) -> list[str]:
"""
Single producer → multiple workers via cloned receive channels.
Each worker gets an independent copy of the receive channel.
"""
send, recv = trio.open_memory_channel(max_buffer_size=len(jobs))
results: list[str] = []
async def worker(worker_recv: trio.MemoryReceiveChannel) -> None:
async with worker_recv:
async for job in worker_recv:
await trio.sleep(0.01)
results.append(f"done:{job}")
async with trio.open_nursery() as nursery:
for _ in range(workers):
nursery.start_soon(worker, recv.clone())
recv.close() # workers hold clones; close the original
async with send:
for job in jobs:
await send.send(job)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 4. Thread interop — blocking code without blocking the event loop
# ─────────────────────────────────────────────────────────────────────────────
def blocking_db_query(query: str) -> list[dict]:
"""Simulates a synchronous DB call that takes 50ms."""
time.sleep(0.05)
return [{"query": query, "rows": 42}]
async def run_blocking_queries(queries: list[str]) -> list[list[dict]]:
"""
run_sync runs blocking_db_query in a thread pool so the event loop
stays free. trio runs as many as the default CapacityLimiter allows
(default: 40 threads).
"""
results = []
async with trio.open_nursery() as nursery:
async def fetch(q: str) -> None:
result = await trio.to_thread.run_sync(blocking_db_query, q)
results.append(result)
for q in queries:
nursery.start_soon(fetch, q)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 5. CapacityLimiter — bound concurrency
# ─────────────────────────────────────────────────────────────────────────────
async def rate_limited_fetch(
urls: list[str],
max_concurrent: int = 5,
) -> list[dict]:
"""
CapacityLimiter prevents more than max_concurrent tasks running at once.
Tasks queue automatically — no manual semaphore bookkeeping needed.
"""
limiter = trio.CapacityLimiter(max_concurrent)
results: list[dict] = []
async def fetch_one(url: str) -> None:
async with limiter:
await trio.sleep(0.02) # simulated HTTP
results.append({"url": url, "status": 200})
async with trio.open_nursery() as nursery:
for url in urls:
nursery.start_soon(fetch_one, url)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 6. Exception handling — ExceptionGroup (Python 3.11+)
# ─────────────────────────────────────────────────────────────────────────────
async def flaky_task(task_id: int, fail: bool) -> None:
await trio.sleep(0.01 * task_id)
if fail:
raise ValueError(f"Task {task_id} failed")
async def handle_partial_failures(task_count: int = 5) -> dict:
"""
When multiple tasks raise, trio collects them into an ExceptionGroup.
Use except* (Python 3.11) to handle each exception type independently.
"""
succeeded = 0
failed_ids: list[int] = []
try:
async with trio.open_nursery() as nursery:
for i in range(task_count):
nursery.start_soon(flaky_task, i, i % 2 == 1) # odd IDs fail
except* ValueError as eg:
for exc in eg.exceptions:
failed_ids.append(int(str(exc).split()[1]))
else:
succeeded = task_count
succeeded = task_count - len(failed_ids)
return {"succeeded": succeeded, "failed": failed_ids}
# ─────────────────────────────────────────────────────────────────────────────
# 7. start_soon vs start — wait for task to initialize
# ─────────────────────────────────────────────────────────────────────────────
async def server_task(*, task_status=trio.TASK_STATUS_IGNORED) -> None:
"""
Call task_status.started() after initialization is complete.
nursery.start() waits for started() before returning — useful for
ensuring a server is listening before continuing.
"""
# ... bind socket, set up state ...
await trio.sleep(0.01) # simulated init
task_status.started("ready") # signal that init is done
# continue serving...
await trio.sleep(10_000)
async def start_and_wait() -> str:
"""nursery.start() returns the value passed to task_status.started()."""
async with trio.open_nursery() as nursery:
status = await nursery.start(server_task)
print(f"Server ready: {status}")
nursery.cancel_scope.cancel() # shut down demo
return status
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
async def main() -> None:
print("=== Parallel fetch ===")
users = await fetch_users_parallel([1, 2, 3, 4, 5])
print(f" fetched {len(users)} users")
print("\n=== move_on_after ===")
result = await with_timeout(0.05)
print(f" result: {result}")
print("\n=== Nested scopes ===")
scopes = await nested_scopes()
print(f" {scopes}")
print("\n=== Producer/consumer ===")
items = await producer_consumer_pipeline(8)
print(f" received {len(items)} items")
print("\n=== Fan-out pipeline ===")
done = await fan_out_pipeline([f"job_{i}" for i in range(9)], workers=3)
print(f" completed {len(done)} jobs")
print("\n=== Thread interop ===")
query_results = await run_blocking_queries(["SELECT 1", "SELECT 2", "SELECT 3"])
print(f" {len(query_results)} query results")
print("\n=== Rate-limited fetch ===")
fetched = await rate_limited_fetch(
[f"https://api.example.com/{i}" for i in range(15)],
max_concurrent=4,
)
print(f" fetched {len(fetched)} URLs")
print("\n=== start and wait ===")
status = await start_and_wait()
print(f" status: {status}")
if __name__ == "__main__":
trio.run(main)
For the asyncio.gather alternative — asyncio.gather(*coros) runs tasks concurrently but does not cancel siblings on failure (unless return_exceptions=False and you add explicit cancellation), tasks can outlive their logical scope, and cancellation propagation requires manual shield/cancel boilerplate, while trio’s nursery guarantees that when any task raises, all sibling tasks are cancelled immediately and the exception propagates exactly once to the nursery’s parent — there is no way to create a task that outlives its nursery, making resource leaks and “zombie tasks” structurally impossible. For the asyncio.Semaphore alternative — asyncio.Semaphore(N) requires manual async with sem: guards and does not integrate with task trees for automatic release on cancellation, while trio.CapacityLimiter(N) participates in trio’s cancel scope so that a cancelled task releases its slot immediately, and the default to_thread limiter enforces a global 40-thread cap automatically without any semaphore declaration. The Claude Skills 360 bundle includes trio skill sets covering trio.run entry point, open_nursery for parallel task spawning, move_on_after and fail_after cancel scopes, nested scope timeout composition, open_memory_channel producer-consumer and fan-out pipelines, to_thread.run_sync for blocking code, from_thread.run for calling trio from threads, CapacityLimiter for bounded concurrency, nursery.start with task_status.started() for initialization, ExceptionGroup handling with except*, and pytest-trio async fixtures. Start with the free tier to try structured concurrency code generation.