Claude Code for anyio: Async Framework Compatibility Layer — Claude Skills 360 Blog
Blog / AI / Claude Code for anyio: Async Framework Compatibility Layer
AI

Claude Code for anyio: Async Framework Compatibility Layer

Published: January 16, 2028
Read time: 5 min read
By: Claude Skills 360

anyio provides async I/O that runs on asyncio and trio. pip install anyio. Run: import anyio; anyio.run(main, backend="asyncio") or backend="trio". Tasks: async with anyio.create_task_group() as tg: tg.start_soon(job1); tg.start_soon(job2) — both run concurrently; exceptions from any task cancel the group. Cancel: with anyio.move_on_after(5): await slow_io() — silently continues after timeout. with anyio.fail_after(5): await slow_io() — raises TimeoutError. Sleep: await anyio.sleep(1.0). Threads: result = await anyio.to_thread.run_sync(blocking_fn, arg). anyio.from_thread.run_sync(fn_in_event_loop) from a sync thread. await anyio.to_thread.run_sync(fn, cancellable=True). TCP: async with await anyio.connect_tcp("host", 8080) as stream: await stream.send(b"hello"); data = await stream.receive(4096). Server: async with anyio.create_tcp_listener(local_port=8080) as listener: await listener.serve(handler). Memory streams (channels): send_stream, recv_stream = anyio.create_memory_object_stream[bytes](max_buffer_size=100). await send_stream.send(item). async for item in recv_stream: .... Sync primitives: lock = anyio.Lock(). sema = anyio.Semaphore(10). event = anyio.Event(); await event.wait(); event.set(). CapacityLimiter: limiter = anyio.CapacityLimiter(20). async with limiter: await task(). ASGI: anyio underpins Starlette and FastAPI — they use anyio internally. pytest: pip install anyio[trio] pytest-anyio. @pytest.mark.anyio async def test_fn(): ... — runs on both backends. anyio_backend fixture to restrict. pytest --anyio-backends=asyncio,trio. Claude Code generates anyio task groups, timeout guards, thread bridges, and backend-parametrized tests.

CLAUDE.md for anyio

## anyio Stack
- Version: anyio >= 4.4 | pip install "anyio[trio]" pytest-anyio
- Run: anyio.run(main, backend="asyncio") | backend="trio"
- TaskGroup: async with anyio.create_task_group() as tg: tg.start_soon(coro)
- Timeout: with anyio.move_on_after(N): ... | fail_after raises TimeoutError
- Threads: await anyio.to_thread.run_sync(blocking_fn) — no GIL blocking
- Channels: send, recv = anyio.create_memory_object_stream[T](max_buffer_size=N)
- Test: @pytest.mark.anyio — runs on asyncio + trio automatically

anyio Async Compatibility Pipeline

# app/anyio_patterns.py — anyio task groups, channels, threads, and testing
from __future__ import annotations

import time
from typing import AsyncIterator

import anyio
import anyio.abc
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream


# ─────────────────────────────────────────────────────────────────────────────
# 1. TaskGroup — structured concurrency
# ─────────────────────────────────────────────────────────────────────────────

async def fetch_user(user_id: int) -> dict:
    """Simulated async fetch — replace with real DB / HTTP call."""
    await anyio.sleep(0.01)
    return {"id": user_id, "email": f"user{user_id}@example.com"}


async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    """
    Fetch multiple users concurrently with a TaskGroup.
    All tasks run concurrently; if any raises, the group is cancelled.
    """
    results: list[dict | None] = [None] * len(user_ids)

    async with anyio.create_task_group() as tg:
        async def _fetch(index: int, uid: int) -> None:
            results[index] = await fetch_user(uid)

        for i, uid in enumerate(user_ids):
            tg.start_soon(_fetch, i, uid)

    return [r for r in results if r is not None]


async def parallel_pipeline(items: list[int]) -> dict[str, list]:
    """Run two independent processing pipelines concurrently."""
    fast_results: list[int] = []
    slow_results: list[int] = []

    async def fast_process() -> None:
        for item in items:
            await anyio.sleep(0)
            fast_results.append(item * 2)

    async def slow_process() -> None:
        for item in items:
            await anyio.sleep(0.001)
            slow_results.append(item ** 2)

    async with anyio.create_task_group() as tg:
        tg.start_soon(fast_process)
        tg.start_soon(slow_process)

    return {"fast": fast_results, "slow": slow_results}


# ─────────────────────────────────────────────────────────────────────────────
# 2. Cancellation — move_on_after and fail_after
# ─────────────────────────────────────────────────────────────────────────────

async def fetch_with_timeout(url: str, timeout: float = 5.0) -> dict | None:
    """
    move_on_after: if the block takes longer than `timeout` seconds,
    silently exit the block (returns None here).
    """
    with anyio.move_on_after(timeout) as scope:
        await anyio.sleep(0.01)   # simulated network call
        return {"url": url, "status": 200}
    # scope.cancelled_caught is True if the timeout fired
    if scope.cancelled_caught:
        return None
    return None


async def fetch_or_fail(url: str, timeout: float = 5.0) -> dict:
    """
    fail_after: raises TimeoutError if the block exceeds timeout.
    Use when the caller must handle the timeout explicitly.
    """
    with anyio.fail_after(timeout):
        await anyio.sleep(0.01)
        return {"url": url, "status": 200}


async def fetch_many_with_deadline(
    urls: list[str], total_timeout: float = 10.0,
) -> list[dict | None]:
    """Apply a single deadline to a group of parallel fetches."""
    results: list[dict | None] = [None] * len(urls)

    with anyio.move_on_after(total_timeout) as scope:
        async with anyio.create_task_group() as tg:
            async def _fetch(i: int, url: str) -> None:
                results[i] = await fetch_with_timeout(url, timeout=2.0)

            for i, url in enumerate(urls):
                tg.start_soon(_fetch, i, url)

    return results


# ─────────────────────────────────────────────────────────────────────────────
# 3. Thread bridge — run blocking code without blocking the event loop
# ─────────────────────────────────────────────────────────────────────────────

def read_large_file(path: str) -> bytes:
    """Blocking file I/O — safe to run via to_thread.run_sync."""
    with open(path, "rb") as f:
        return f.read()


def cpu_intensive_hash(data: bytes) -> str:
    """CPU-bound — offload to a thread so the event loop stays responsive."""
    import hashlib
    return hashlib.sha256(data).hexdigest()


async def hash_file(path: str) -> str:
    """Read and hash a file without blocking the event loop."""
    data = await anyio.to_thread.run_sync(read_large_file, path)
    digest = await anyio.to_thread.run_sync(cpu_intensive_hash, data)
    return digest


async def parallel_hashes(paths: list[str]) -> dict[str, str]:
    """Hash multiple files concurrently using threads for I/O."""
    digests: dict[str, str] = {}
    limiter = anyio.CapacityLimiter(4)   # max 4 concurrent threads

    async def _hash(p: str) -> None:
        async with limiter:
            digests[p] = await hash_file(p)

    async with anyio.create_task_group() as tg:
        for path in paths:
            tg.start_soon(_hash, path)

    return digests


# ─────────────────────────────────────────────────────────────────────────────
# 4. Memory streams — in-process producer/consumer channels
# ─────────────────────────────────────────────────────────────────────────────

async def producer_consumer_demo() -> list[int]:
    """
    Memory object streams are typed, buffered channels for passing objects
    between tasks — similar to Go channels or asyncio.Queue but typed.
    """
    send_stream: MemoryObjectSendStream[int]
    recv_stream: MemoryObjectReceiveStream[int]
    send_stream, recv_stream = anyio.create_memory_object_stream[int](max_buffer_size=10)

    results: list[int] = []

    async def producer() -> None:
        async with send_stream:
            for i in range(5):
                await send_stream.send(i * 2)
                await anyio.sleep(0)

    async def consumer() -> None:
        async with recv_stream:
            async for item in recv_stream:
                results.append(item)

    async with anyio.create_task_group() as tg:
        tg.start_soon(producer)
        tg.start_soon(consumer)

    return results


# ─────────────────────────────────────────────────────────────────────────────
# 5. Synchronization primitives
# ─────────────────────────────────────────────────────────────────────────────

async def rate_limited_requests(
    items: list[str], max_concurrent: int = 5,
) -> list[str]:
    """Limit concurrency with a Semaphore — avoid overwhelming downstream services."""
    semaphore = anyio.Semaphore(max_concurrent)
    results: list[str] = []
    lock = anyio.Lock()

    async def _process(item: str) -> None:
        async with semaphore:
            await anyio.sleep(0)   # simulated async work
            async with lock:
                results.append(f"done:{item}")

    async with anyio.create_task_group() as tg:
        for item in items:
            tg.start_soon(_process, item)

    return results


async def event_coordination() -> list[str]:
    """
    anyio.Event: set() signals all waiters.
    Unlike asyncio.Event, it works on both asyncio and trio.
    """
    ready = anyio.Event()
    log: list[str] = []

    async def waiter(name: str) -> None:
        await ready.wait()
        log.append(f"{name} unblocked")

    async def setter() -> None:
        await anyio.sleep(0.01)
        log.append("setter: setting event")
        ready.set()

    async with anyio.create_task_group() as tg:
        tg.start_soon(waiter, "A")
        tg.start_soon(waiter, "B")
        tg.start_noon(setter)

    return log


# ─────────────────────────────────────────────────────────────────────────────
# 6. Entry point — backend selection
# ─────────────────────────────────────────────────────────────────────────────

async def main() -> None:
    # Concurrent fetches
    users = await fetch_all_users([1, 2, 3, 4, 5])
    print(f"Fetched {len(users)} users concurrently")

    # Pipeline
    results = await parallel_pipeline([1, 2, 3])
    print(f"Fast: {results['fast']}")

    # Timeout
    result = await fetch_with_timeout("https://example.com", timeout=0.001)
    print(f"Timeout demo: {result}")

    # Channels
    items = await producer_consumer_demo()
    print(f"Channel items: {items}")

    # Concurrency limit
    processed = await rate_limited_requests(["a", "b", "c", "d", "e"])
    print(f"Rate limited: {len(processed)} processed")


if __name__ == "__main__":
    # Switch to trio by changing backend="trio"
    anyio.run(main, backend="asyncio")


# ─────────────────────────────────────────────────────────────────────────────
# 7. Tests — pytest-anyio runs on both asyncio and trio
# ─────────────────────────────────────────────────────────────────────────────

# tests/test_anyio_patterns.py
#
# pyproject.toml:
#   [tool.pytest.ini_options]
#   anyio_backends = ["asyncio", "trio"]
#
# Or per-file:
#   pytestmark = pytest.mark.anyio

import pytest

pytestmark = pytest.mark.anyio  # all tests in this section run on both backends


async def test_fetch_all_users() -> None:
    users = await fetch_all_users([1, 2, 3])
    assert len(users) == 3
    assert all("email" in u for u in users)


async def test_fetch_with_timeout_succeeds() -> None:
    result = await fetch_with_timeout("https://example.com", timeout=5.0)
    assert result is not None
    assert result["status"] == 200


async def test_fetch_with_timeout_exceeded() -> None:
    """move_on_after returns None when the timeout fires."""
    async def slow() -> dict:
        await anyio.sleep(10)
        return {}

    result = None
    with anyio.move_on_after(0.01) as scope:
        await anyio.sleep(10)
        result = {"ok": True}
    assert scope.cancelled_caught
    assert result is None


async def test_fail_after_raises() -> None:
    with pytest.raises(TimeoutError):
        with anyio.fail_after(0.01):
            await anyio.sleep(10)


async def test_producer_consumer_channel() -> None:
    items = await producer_consumer_demo()
    assert items == [0, 2, 4, 6, 8]


async def test_rate_limited_requests() -> None:
    results = await rate_limited_requests(["a", "b", "c"], max_concurrent=2)
    assert len(results) == 3
    assert all(r.startswith("done:") for r in results)


async def test_parallel_pipeline() -> None:
    result = await parallel_pipeline([1, 2, 3])
    assert result["fast"] == [2, 4, 6]
    assert result["slow"] == [1, 4, 9]

For the asyncio directly alternative — asyncio.gather() and asyncio.create_task() are asyncio-only APIs, while anyio.create_task_group() runs identically on asyncio and trio — structured concurrency with automatic cancellation propagation: if task A raises, the group cancels task B before re-raising, preventing resource leaks that asyncio.gather(return_exceptions=True) masks. For the asyncio.wait_for alternative — asyncio.wait_for(coro, timeout) only works with asyncio, while anyio.move_on_after(N) and anyio.fail_after(N) work on asyncio and trio, compose correctly inside nested task groups, and expose scope.cancelled_caught to distinguish a timeout from a normal exit — and anyio.to_thread.run_sync(fn, cancellable=True) interrupts a blocking thread when the surrounding scope is cancelled, something asyncio.wait_for + concurrent.futures cannot do. The Claude Skills 360 bundle includes anyio skill sets covering anyio.run with asyncio/trio backend selection, create_task_group for structured concurrency, move_on_after/fail_after timeout scopes, scope.cancelled_caught for timeout detection, to_thread.run_sync for blocking code, CapacityLimiter for thread pool control, create_memory_object_stream typed channels, Lock/Semaphore/Event synchronization primitives, pytest-anyio backend-parametrized tests, and anyio integration with FastAPI and Starlette. Start with the free tier to try structured async code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free