Claude Code for asyncio: Async I/O in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for asyncio: Async I/O in Python
AI

Claude Code for asyncio: Async I/O in Python

Published: July 20, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s asyncio module runs coroutines on an event loop for I/O-bound concurrency. import asyncio. run: asyncio.run(main()) — entry point. async def / await: async def fetch(): await asyncio.sleep(1). create_task: task = asyncio.create_task(coro()) — schedule concurrent execution. gather: results = await asyncio.gather(coro1(), coro2()) — parallel, returns list. wait_for: await asyncio.wait_for(coro(), timeout=5.0) — raises TimeoutError. sleep: await asyncio.sleep(0) — yield control; await asyncio.sleep(n) — delay. timeout: async with asyncio.timeout(5.0): — context manager (Python 3.11+). Queue: q = asyncio.Queue(maxsize=10); await q.put(item); item = await q.get(); q.task_done(). PriorityQueue / LifoQueue. Event: e = asyncio.Event(); await e.wait(); e.set(); e.clear(). Lock: async with lock:. Semaphore: async with asyncio.Semaphore(5): — limit concurrency. TaskGroup: async with asyncio.TaskGroup() as tg: tg.create_task(coro()) (Python 3.11+). to_thread: await asyncio.to_thread(blocking_fn, *args) — run in thread pool. as_completed: for coro in asyncio.as_completed(coros): result = await coro. shield: await asyncio.shield(task) — ignore cancellation. CancelledError: raised on cancellation. Claude Code generates async API clients, producer-consumer pipelines, concurrent scrapers, and TCP servers.

CLAUDE.md for asyncio

## asyncio Stack
- Stdlib: import asyncio
- Entry: asyncio.run(main())
- Parallel: results = await asyncio.gather(*coros, return_exceptions=True)
- Timeout: await asyncio.wait_for(coro(), timeout=30.0)
- Limit: async with asyncio.Semaphore(N): await coro()
- Thread: result = await asyncio.to_thread(blocking_fn, arg)
- Queue: asyncio.Queue(maxsize=N)  — producer/consumer

asyncio Concurrent I/O Pipeline

# app/asyncutil.py — gather, TaskGroup, Semaphore, Queue, to_thread, retry
from __future__ import annotations

import asyncio
import functools
import logging
import time
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Callable, Coroutine, TypeVar

log = logging.getLogger(__name__)

T = TypeVar("T")
R = TypeVar("R")


# ─────────────────────────────────────────────────────────────────────────────
# 1. Concurrent execution helpers
# ─────────────────────────────────────────────────────────────────────────────

async def gather_safe(
    *coros: Coroutine,
    return_exceptions: bool = True,
) -> list[Any]:
    """
    Run coroutines concurrently; return results list.
    Exceptions are returned as values when return_exceptions=True.

    Example:
        results = await gather_safe(fetch(url1), fetch(url2), fetch(url3))
        values  = [r for r in results if not isinstance(r, Exception)]
    """
    return await asyncio.gather(*coros, return_exceptions=return_exceptions)


async def run_concurrently(
    fn: Callable[..., Coroutine],
    items: list[Any],
    max_concurrency: int = 10,
    timeout: float | None = None,
) -> list[Any]:
    """
    Apply async fn to each item with bounded concurrency.
    Returns list of results or exceptions in input order.

    Example:
        results = await run_concurrently(fetch_page, urls, max_concurrency=5)
    """
    semaphore = asyncio.Semaphore(max_concurrency)

    async def bounded(item):
        async with semaphore:
            if timeout:
                return await asyncio.wait_for(fn(item), timeout=timeout)
            return await fn(item)

    return await asyncio.gather(
        *(bounded(item) for item in items),
        return_exceptions=True,
    )


async def first_success(
    *coros: Coroutine,
) -> Any:
    """
    Return the result of the first coroutine to succeed; cancel the rest.

    Example:
        result = await first_success(fetch_primary(url), fetch_mirror(url))
    """
    tasks = [asyncio.create_task(c) for c in coros]
    try:
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for t in pending:
            t.cancel()
        return next(iter(done)).result()
    except Exception:
        for t in tasks:
            t.cancel()
        raise


async def with_timeout(coro: Coroutine, timeout: float, default: Any = None) -> Any:
    """
    Run coro with a timeout; return default if timeout expires.

    Example:
        data = await with_timeout(fetch(url), timeout=5.0, default={})
    """
    try:
        return await asyncio.wait_for(coro, timeout=timeout)
    except (asyncio.TimeoutError, TimeoutError):
        return default


# ─────────────────────────────────────────────────────────────────────────────
# 2. Async retry decorator
# ─────────────────────────────────────────────────────────────────────────────

def async_retry(
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple = (Exception,),
):
    """
    Retry an async function on specified exceptions with exponential backoff.

    Example:
        @async_retry(max_attempts=3, delay=0.5, exceptions=(aiohttp.ClientError,))
        async def fetch(url: str) -> bytes:
            ...
    """
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            d = delay
            for attempt in range(1, max_attempts + 1):
                try:
                    return await fn(*args, **kwargs)
                except exceptions as exc:
                    if attempt == max_attempts:
                        raise
                    log.warning("%s attempt %d/%d failed: %s — retrying in %.1fs",
                                fn.__name__, attempt, max_attempts, exc, d)
                    await asyncio.sleep(d)
                    d *= backoff
        return wrapper
    return decorator


# ─────────────────────────────────────────────────────────────────────────────
# 3. Producer-consumer queue
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class WorkResult:
    item:      Any
    result:    Any   = None
    error:     Exception | None = None

    @property
    def ok(self) -> bool:
        return self.error is None


async def process_queue(
    items: list[Any],
    worker_fn: Callable[..., Coroutine],
    n_workers: int = 4,
    maxsize: int = 0,
) -> list[WorkResult]:
    """
    Process items through an async queue with n_workers consumers.
    Returns results in completion order.

    Example:
        results = await process_queue(urls, fetch_url, n_workers=8)
        successes = [r for r in results if r.ok]
    """
    queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
    output: list[WorkResult] = []

    async def producer():
        for item in items:
            await queue.put(item)
        for _ in range(n_workers):
            await queue.put(None)  # sentinel

    async def worker():
        while True:
            item = await queue.get()
            if item is None:
                queue.task_done()
                return
            try:
                result = await worker_fn(item)
                output.append(WorkResult(item=item, result=result))
            except Exception as exc:
                output.append(WorkResult(item=item, error=exc))
            finally:
                queue.task_done()

    await asyncio.gather(
        producer(),
        *[worker() for _ in range(n_workers)],
    )
    return output


# ─────────────────────────────────────────────────────────────────────────────
# 4. Running blocking code
# ─────────────────────────────────────────────────────────────────────────────

async def run_blocking(fn: Callable, *args, **kwargs) -> Any:
    """
    Run a blocking function in a thread pool without blocking the event loop.

    Example:
        data = await run_blocking(open("large.csv").read)
        result = await run_blocking(expensive_cpu_fn, param)
    """
    return await asyncio.to_thread(fn, *args, **kwargs)


async def delay_call(fn: Callable, delay: float, *args, **kwargs) -> Any:
    """
    Call fn after a delay.

    Example:
        asyncio.create_task(delay_call(cleanup, delay=60.0))
    """
    await asyncio.sleep(delay)
    return fn(*args, **kwargs)


# ─────────────────────────────────────────────────────────────────────────────
# 5. Async generator utilities
# ─────────────────────────────────────────────────────────────────────────────

async def arange(start: int, stop: int, step: int = 1) -> AsyncIterator[int]:
    """
    Async generator equivalent of range() — yields values with event-loop yield.

    Example:
        async for i in arange(0, 10):
            await process(i)
    """
    i = start
    while i < stop:
        yield i
        i += step
        await asyncio.sleep(0)  # yield control


async def amap(
    fn: Callable[..., Coroutine],
    items: list[Any],
    max_concurrency: int = 10,
) -> AsyncIterator[Any]:
    """
    Async map with bounded concurrency, yielding results as they complete.

    Example:
        async for result in amap(fetch, urls, max_concurrency=5):
            process(result)
    """
    semaphore = asyncio.Semaphore(max_concurrency)

    async def bounded(item):
        async with semaphore:
            return await fn(item)

    tasks = [asyncio.create_task(bounded(item)) for item in items]
    for coro in asyncio.as_completed(tasks):
        yield await coro


@dataclass
class AsyncRateLimiter:
    """
    Token-bucket rate limiter for async code.

    Example:
        limiter = AsyncRateLimiter(rate=10, per=1.0)  # 10 calls/sec
        async for url in urls:
            await limiter.acquire()
            asyncio.create_task(fetch(url))
    """
    rate: float    # max calls
    per:  float    # per this many seconds
    _tokens:    float = field(init=False)
    _last:      float = field(init=False)
    _lock:      asyncio.Lock = field(init=False)

    def __post_init__(self):
        self._tokens = self.rate
        self._last   = time.monotonic()
        self._lock   = asyncio.Lock()

    async def acquire(self, tokens: float = 1.0) -> None:
        async with self._lock:
            while True:
                now = time.monotonic()
                elapsed = now - self._last
                self._tokens = min(self.rate, self._tokens + elapsed * (self.rate / self.per))
                self._last = now
                if self._tokens >= tokens:
                    self._tokens -= tokens
                    return
                wait = (tokens - self._tokens) * (self.per / self.rate)
                await asyncio.sleep(wait)


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

async def _demo():
    print("=== asyncio demo ===")

    print("\n--- gather_safe ---")
    async def task(n, fail=False):
        await asyncio.sleep(n * 0.05)
        if fail:
            raise ValueError(f"task {n} failed")
        return n * 10

    results = await gather_safe(task(1), task(2), task(3, fail=True))
    for r in results:
        print(f"  {r!r}")

    print("\n--- run_concurrently ---")
    async def slow_double(x):
        await asyncio.sleep(0.02)
        return x * 2

    t0 = time.monotonic()
    outputs = await run_concurrently(slow_double, list(range(10)), max_concurrency=4)
    elapsed = time.monotonic() - t0
    print(f"  results: {outputs}")
    print(f"  elapsed: {elapsed:.2f}s (4 concurrent × 0.02s = ~0.05s expected)")

    print("\n--- with_timeout ---")
    async def slow():
        await asyncio.sleep(10)
        return "done"

    val = await with_timeout(slow(), timeout=0.05, default="timed_out")
    print(f"  result: {val!r}")

    print("\n--- async_retry ---")
    call_count = 0

    @async_retry(max_attempts=3, delay=0.01, exceptions=(RuntimeError,))
    async def flaky():
        nonlocal call_count
        call_count += 1
        if call_count < 3:
            raise RuntimeError("not yet")
        return "ok"

    result = await flaky()
    print(f"  result: {result!r}  calls: {call_count}")

    print("\n--- process_queue ---")
    async def compute(n):
        await asyncio.sleep(0.01)
        return n ** 2

    work_results = await process_queue(list(range(8)), compute, n_workers=4)
    print(f"  {len(work_results)} results, successes: {sum(1 for r in work_results if r.ok)}")
    print(f"  values: {sorted(r.result for r in work_results if r.ok)}")

    print("\n--- run_blocking ---")
    import os
    cwd = await run_blocking(os.getcwd)
    print(f"  cwd (from thread): {cwd!r}")

    print("\n--- arange ---")
    nums = []
    async for i in arange(0, 5):
        nums.append(i)
    print(f"  arange(0, 5): {nums}")

    print("\n--- AsyncRateLimiter ---")
    limiter = AsyncRateLimiter(rate=5.0, per=1.0)
    t0 = time.monotonic()
    for _ in range(5):
        await limiter.acquire()
    print(f"  5 calls acquired in {time.monotonic() - t0:.3f}s (should be ~0s, all tokens available)")

    print("\n=== done ===")


if __name__ == "__main__":
    logging.basicConfig(level=logging.WARNING)
    asyncio.run(_demo())

For the trio alternative — trio (PyPI, claude-code-trio) enforces strict structured concurrency with nurseries (async with trio.open_nursery() as n: n.start_soon(coro)), making task lifetimes explicit and cancellation deterministic; asyncio.TaskGroup (Python 3.11+) borrows the structured-concurrency model but is tied to the asyncio event loop — use trio for new applications where airtight cancellation semantics and a no-implicit-task-escape discipline matter, asyncio when using libraries that require it (FastAPI, aiohttp, Celery) or targeting Python <3.11 without the TaskGroup API. For the anyio alternative — anyio (PyPI, claude-code-anyio) provides a compatibility shim running the same async code on asyncio or trio, with anyio.create_task_group(), anyio.sleep(), and anyio.to_thread.run_sync() that work identically on both backends; stdlib asyncio has no portability between event loops — use anyio for library code that should be backend-agnostic, asyncio directly for applications where asyncio is the committed runtime. The Claude Skills 360 bundle includes asyncio skill sets covering gather_safe()/run_concurrently()/first_success()/with_timeout() parallel helpers, async_retry() decorator with exponential backoff, process_queue() producer-consumer with n_workers, run_blocking()/delay_call() thread-pool bridges, arange()/amap() async generators, and AsyncRateLimiter token-bucket throttle. Start with the free tier to try concurrent I/O and asyncio pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free