Claude Code for trio: Structured Concurrency in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for trio: Structured Concurrency in Python
AI

Claude Code for trio: Structured Concurrency in Python

Published: January 24, 2028
Read time: 5 min read
By: Claude Skills 360

trio brings structured concurrency to Python async. pip install trio. Entry: import trio; trio.run(main). Nursery: async with trio.open_nursery() as nursery: nursery.start_soon(task1); nursery.start_soon(task2, arg). All tasks must finish before nursery exits — or all are cancelled on exception. Sleep: await trio.sleep(1). Checkpoint: await trio.lowlevel.checkpoint(). Cancel scope: with trio.CancelScope(deadline=trio.current_time() + 5): .... Timeout: with trio.move_on_after(5) as scope: await slow(); if scope.cancelled_caught: .... Fail: with trio.fail_after(5): await slow() → raises trio.Cancelled. Nested scopes: inner cancelled before outer. FIFO: trio.open_nursery exits FIFO order. Memory channel: send, recv = trio.open_memory_channel(max_buffer_size=10). await send.send(item). item = await recv.receive(). async for item in recv: process(item). Run in thread: result = await trio.to_thread.run_sync(blocking_fn, arg). From thread: trio.from_thread.run(async_fn, arg). trio.from_thread.run_sync(sync_fn). Capacity limiter: limiter = trio.CapacityLimiter(10). async with limiter: await work(). Testing: import pytest_trio. @pytest.fixture async def client(): .... trio.testing.MockClock(autojump_threshold=0). No callbacks: trio has no callbacks, no futures — only await. Exceptions: if any task raises, all siblings are cancelled and exception propagates. ExceptionGroup: Python 3.11+ — multiple exceptions surfaced as ExceptionGroup. except*. Claude Code generates trio nurseries, cancel scopes, memory channels, and pytest-trio fixtures.

CLAUDE.md for trio

## trio Stack
- Version: trio >= 0.26 | pip install trio pytest-trio
- Entry: trio.run(async_main) — single event loop, no asyncio
- Nursery: async with trio.open_nursery() as n: n.start_soon(fn, *args)
- Timeout: with trio.move_on_after(5) as scope: ... / trio.fail_after(5)
- Channel: send, recv = trio.open_memory_channel(N) — typed producer-consumer
- Thread: await trio.to_thread.run_sync(blocking_fn) — no event loop blocking
- Test: @pytest.fixture async def x() + trio.testing.MockClock(autojump_threshold=0)

trio Structured Concurrency Pipeline

# app/trio_patterns.py — nurseries, cancel scopes, channels, and thread interop
from __future__ import annotations

import time
from typing import Any, AsyncGenerator

import trio
import trio.testing


# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic nursery — parallel tasks with automatic cleanup
# ─────────────────────────────────────────────────────────────────────────────

async def fetch_user(user_id: int, results: list[dict]) -> None:
    """Simulate an async API call."""
    await trio.sleep(0.05)
    results.append({"id": user_id, "name": f"User {user_id}"})


async def fetch_users_parallel(user_ids: list[int]) -> list[dict]:
    """
    Nursery guarantees:
    1. All tasks finish before the nursery block exits.
    2. If any task raises, all remaining tasks are cancelled immediately.
    3. The exception propagates to the nursery's parent scope.
    """
    results: list[dict] = []
    async with trio.open_nursery() as nursery:
        for uid in user_ids:
            nursery.start_soon(fetch_user, uid, results)
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 2. Cancel scopes — timeout and soft cancellation
# ─────────────────────────────────────────────────────────────────────────────

async def slow_operation(name: str, seconds: float) -> str:
    await trio.sleep(seconds)
    return f"{name} done"


async def with_timeout(seconds: float) -> str | None:
    """
    move_on_after: cancelled_caught=True if timed out — returns None.
    Non-raising — useful when partial results are acceptable.
    """
    with trio.move_on_after(seconds) as scope:
        result = await slow_operation("data_fetch", 2.0)
        return result

    if scope.cancelled_caught:
        print(f"[TIMEOUT] after {seconds}s")
        return None
    return None   # unreachable


async def with_deadline() -> str:
    """
    fail_after: raises trio.Cancelled if timed out — use when timeout = error.
    """
    try:
        with trio.fail_after(0.1):
            return await slow_operation("critical_fetch", 5.0)
    except trio.Cancelled:
        raise RuntimeError("critical_fetch timed out") from None


async def nested_scopes() -> dict:
    """
    Nested cancel scopes: inner scope timeout fires first.
    Outer scope protects the outer block independently.
    """
    results = {}
    with trio.move_on_after(5) as outer:
        with trio.move_on_after(0.1) as inner:
            await trio.sleep(1)          # inner fires first
        results["inner_cancelled"] = inner.cancelled_caught
        # Execution continues here — outer scope is still active
        results["outer_active"] = not outer.cancelled_caught
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 3. Memory channels — producer/consumer pipelines
# ─────────────────────────────────────────────────────────────────────────────

async def producer(send: trio.MemorySendChannel, count: int) -> None:
    """Send `count` items then close the channel."""
    async with send:
        for i in range(count):
            await trio.sleep(0.01)
            await send.send({"id": i, "value": i * i})


async def consumer(recv: trio.MemoryReceiveChannel, results: list[dict]) -> None:
    """Receive items until the channel closes."""
    async with recv:
        async for item in recv:
            results.append(item)


async def producer_consumer_pipeline(count: int = 10) -> list[dict]:
    """
    open_memory_channel(N) — buffer N items before send blocks.
    Clone channels for multiple producers/consumers.
    """
    results: list[dict] = []
    send, recv = trio.open_memory_channel(max_buffer_size=5)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send, count)
        nursery.start_soon(consumer, recv, results)
    return results


async def fan_out_pipeline(jobs: list[str], workers: int = 3) -> list[str]:
    """
    Single producer → multiple workers via cloned receive channels.
    Each worker gets an independent copy of the receive channel.
    """
    send, recv = trio.open_memory_channel(max_buffer_size=len(jobs))
    results: list[str] = []

    async def worker(worker_recv: trio.MemoryReceiveChannel) -> None:
        async with worker_recv:
            async for job in worker_recv:
                await trio.sleep(0.01)
                results.append(f"done:{job}")

    async with trio.open_nursery() as nursery:
        for _ in range(workers):
            nursery.start_soon(worker, recv.clone())
        recv.close()  # workers hold clones; close the original

        async with send:
            for job in jobs:
                await send.send(job)

    return results


# ─────────────────────────────────────────────────────────────────────────────
# 4. Thread interop — blocking code without blocking the event loop
# ─────────────────────────────────────────────────────────────────────────────

def blocking_db_query(query: str) -> list[dict]:
    """Simulates a synchronous DB call that takes 50ms."""
    time.sleep(0.05)
    return [{"query": query, "rows": 42}]


async def run_blocking_queries(queries: list[str]) -> list[list[dict]]:
    """
    run_sync runs blocking_db_query in a thread pool so the event loop
    stays free. trio runs as many as the default CapacityLimiter allows
    (default: 40 threads).
    """
    results = []
    async with trio.open_nursery() as nursery:
        async def fetch(q: str) -> None:
            result = await trio.to_thread.run_sync(blocking_db_query, q)
            results.append(result)

        for q in queries:
            nursery.start_soon(fetch, q)
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 5. CapacityLimiter — bound concurrency
# ─────────────────────────────────────────────────────────────────────────────

async def rate_limited_fetch(
    urls: list[str],
    max_concurrent: int = 5,
) -> list[dict]:
    """
    CapacityLimiter prevents more than max_concurrent tasks running at once.
    Tasks queue automatically — no manual semaphore bookkeeping needed.
    """
    limiter = trio.CapacityLimiter(max_concurrent)
    results: list[dict] = []

    async def fetch_one(url: str) -> None:
        async with limiter:
            await trio.sleep(0.02)                   # simulated HTTP
            results.append({"url": url, "status": 200})

    async with trio.open_nursery() as nursery:
        for url in urls:
            nursery.start_soon(fetch_one, url)

    return results


# ─────────────────────────────────────────────────────────────────────────────
# 6. Exception handling — ExceptionGroup (Python 3.11+)
# ─────────────────────────────────────────────────────────────────────────────

async def flaky_task(task_id: int, fail: bool) -> None:
    await trio.sleep(0.01 * task_id)
    if fail:
        raise ValueError(f"Task {task_id} failed")


async def handle_partial_failures(task_count: int = 5) -> dict:
    """
    When multiple tasks raise, trio collects them into an ExceptionGroup.
    Use except* (Python 3.11) to handle each exception type independently.
    """
    succeeded = 0
    failed_ids: list[int] = []

    try:
        async with trio.open_nursery() as nursery:
            for i in range(task_count):
                nursery.start_soon(flaky_task, i, i % 2 == 1)   # odd IDs fail
    except* ValueError as eg:
        for exc in eg.exceptions:
            failed_ids.append(int(str(exc).split()[1]))
    else:
        succeeded = task_count

    succeeded = task_count - len(failed_ids)
    return {"succeeded": succeeded, "failed": failed_ids}


# ─────────────────────────────────────────────────────────────────────────────
# 7. start_soon vs start — wait for task to initialize
# ─────────────────────────────────────────────────────────────────────────────

async def server_task(*, task_status=trio.TASK_STATUS_IGNORED) -> None:
    """
    Call task_status.started() after initialization is complete.
    nursery.start() waits for started() before returning — useful for
    ensuring a server is listening before continuing.
    """
    # ... bind socket, set up state ...
    await trio.sleep(0.01)         # simulated init
    task_status.started("ready")   # signal that init is done
    # continue serving...
    await trio.sleep(10_000)


async def start_and_wait() -> str:
    """nursery.start() returns the value passed to task_status.started()."""
    async with trio.open_nursery() as nursery:
        status = await nursery.start(server_task)
        print(f"Server ready: {status}")
        nursery.cancel_scope.cancel()   # shut down demo
    return status


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

async def main() -> None:
    print("=== Parallel fetch ===")
    users = await fetch_users_parallel([1, 2, 3, 4, 5])
    print(f"  fetched {len(users)} users")

    print("\n=== move_on_after ===")
    result = await with_timeout(0.05)
    print(f"  result: {result}")

    print("\n=== Nested scopes ===")
    scopes = await nested_scopes()
    print(f"  {scopes}")

    print("\n=== Producer/consumer ===")
    items = await producer_consumer_pipeline(8)
    print(f"  received {len(items)} items")

    print("\n=== Fan-out pipeline ===")
    done = await fan_out_pipeline([f"job_{i}" for i in range(9)], workers=3)
    print(f"  completed {len(done)} jobs")

    print("\n=== Thread interop ===")
    query_results = await run_blocking_queries(["SELECT 1", "SELECT 2", "SELECT 3"])
    print(f"  {len(query_results)} query results")

    print("\n=== Rate-limited fetch ===")
    fetched = await rate_limited_fetch(
        [f"https://api.example.com/{i}" for i in range(15)],
        max_concurrent=4,
    )
    print(f"  fetched {len(fetched)} URLs")

    print("\n=== start and wait ===")
    status = await start_and_wait()
    print(f"  status: {status}")


if __name__ == "__main__":
    trio.run(main)

For the asyncio.gather alternative — asyncio.gather(*coros) runs tasks concurrently but does not cancel siblings on failure (unless return_exceptions=False and you add explicit cancellation), tasks can outlive their logical scope, and cancellation propagation requires manual shield/cancel boilerplate, while trio’s nursery guarantees that when any task raises, all sibling tasks are cancelled immediately and the exception propagates exactly once to the nursery’s parent — there is no way to create a task that outlives its nursery, making resource leaks and “zombie tasks” structurally impossible. For the asyncio.Semaphore alternative — asyncio.Semaphore(N) requires manual async with sem: guards and does not integrate with task trees for automatic release on cancellation, while trio.CapacityLimiter(N) participates in trio’s cancel scope so that a cancelled task releases its slot immediately, and the default to_thread limiter enforces a global 40-thread cap automatically without any semaphore declaration. The Claude Skills 360 bundle includes trio skill sets covering trio.run entry point, open_nursery for parallel task spawning, move_on_after and fail_after cancel scopes, nested scope timeout composition, open_memory_channel producer-consumer and fan-out pipelines, to_thread.run_sync for blocking code, from_thread.run for calling trio from threads, CapacityLimiter for bounded concurrency, nursery.start with task_status.started() for initialization, ExceptionGroup handling with except*, and pytest-trio async fixtures. Start with the free tier to try structured concurrency code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free