Claude Code for aio-pika: Async RabbitMQ Client for Python — Claude Skills 360 Blog
Blog / AI / Claude Code for aio-pika: Async RabbitMQ Client for Python
AI

Claude Code for aio-pika: Async RabbitMQ Client for Python

Published: May 14, 2028
Read time: 5 min read
By: Claude Skills 360

aio-pika is an asyncio-native RabbitMQ client wrapping pika. pip install aio-pika. Connect: import aio_pika; conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/"). Channel: ch = await conn.channel(). Declare queue: q = await ch.declare_queue("tasks", durable=True). Publish: await ch.default_exchange.publish(aio_pika.Message(b"hello", delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key="tasks"). Consume iterator: async with q.iterator() as qi: async for msg in qi: async with msg.process(): print(msg.body). Consume callback: await q.consume(callback). Ack: await msg.ack(). Nack: await msg.nack(requeue=True). Reject: await msg.reject(). Exchange: ex = await ch.declare_exchange("logs", aio_pika.ExchangeType.FANOUT). Topic: ExchangeType.TOPIC. Direct: ExchangeType.DIRECT. Bind: await q.bind(ex, routing_key="info.#"). Prefetch: await ch.set_qos(prefetch_count=10). Robust: aio_pika.connect_robust(url, reconnect_interval=5) — auto-reconnects on disconnect. Context manager: async with await aio_pika.connect(url) as conn:. Message headers: Message(body, headers={"x-retry": "1"}). Expiry: Message(body, expiration=30000). Close: await conn.close(). Claude Code generates aio-pika async producers, consumer workers, topic routers, and FastAPI lifespan integrations.

CLAUDE.md for aio-pika

## aio-pika Stack
- Version: aio-pika >= 9.0 | pip install aio-pika
- Connect: await aio_pika.connect_robust("amqp://guest:guest@localhost/")
- Publish: await exchange.publish(Message(body, delivery_mode=PERSISTENT), routing_key="q")
- Consume: async with queue.iterator() as qi: async for msg in qi: async with msg.process(): ...
- Exchange: await ch.declare_exchange("name", ExchangeType.TOPIC)
- Prefetch: await ch.set_qos(prefetch_count=10)

aio-pika Async RabbitMQ Pipeline

# app/amqp.py — aio-pika producer, consumer, topic exchange, DLQ, and FastAPI integration
from __future__ import annotations

import asyncio
import json
import logging
from contextlib import asynccontextmanager
from dataclasses import asdict, dataclass
from typing import Any, AsyncGenerator, Callable

import aio_pika
from aio_pika import (
    Channel,
    DeliveryMode,
    ExchangeType,
    Message,
    RobustConnection,
)
from aio_pika.abc import AbstractIncomingMessage


log = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────

async def connect(
    url: str = "amqp://guest:guest@localhost/",
    reconnect_interval: float = 5.0,
) -> RobustConnection:
    """
    Open a RobustConnection that auto-reconnects on broker restarts.
    url: AMQP URL — amqp://user:pass@host:5672/vhost

    Usage:
        conn = await connect("amqp://user:pass@rabbitmq:5672/")
        ch   = await conn.channel()
    """
    return await aio_pika.connect_robust(
        url,
        reconnect_interval=reconnect_interval,
    )


@asynccontextmanager
async def connection_ctx(
    url: str = "amqp://guest:guest@localhost/",
    prefetch_count: int = 10,
) -> AsyncGenerator[Channel, None]:
    """
    Async context manager yielding a ready Channel.
    Handles open/close and QoS setup.

    Usage:
        async with connection_ctx("amqp://...") as ch:
            await publish(ch, "tasks", {"job": "send_email"})
    """
    conn = await connect(url)
    async with conn:
        ch = await conn.channel()
        await ch.set_qos(prefetch_count=prefetch_count)
        yield ch


# ─────────────────────────────────────────────────────────────────────────────
# 2. Queue / exchange setup
# ─────────────────────────────────────────────────────────────────────────────

async def declare_queue(
    ch: Channel,
    name: str,
    durable: bool = True,
    exclusive: bool = False,
    auto_delete: bool = False,
    ttl_ms: int | None = None,
    dead_letter_exchange: str | None = None,
) -> aio_pika.Queue:
    """
    Declare a queue.
    ttl_ms: per-message TTL before routing to DLX.
    dead_letter_exchange: DLX name for expired/rejected messages.
    """
    args: dict[str, Any] = {}
    if ttl_ms is not None:
        args["x-message-ttl"] = ttl_ms
    if dead_letter_exchange:
        args["x-dead-letter-exchange"] = dead_letter_exchange

    return await ch.declare_queue(
        name,
        durable=durable,
        exclusive=exclusive,
        auto_delete=auto_delete,
        arguments=args or None,
    )


async def declare_exchange(
    ch: Channel,
    name: str,
    exchange_type: ExchangeType = ExchangeType.DIRECT,
    durable: bool = True,
) -> aio_pika.Exchange:
    """
    Declare an exchange.
    exchange_type: DIRECT | FANOUT | TOPIC | HEADERS
    """
    return await ch.declare_exchange(
        name,
        exchange_type,
        durable=durable,
    )


async def setup_dlq(
    ch: Channel,
    main_queue: str,
    dlq_name: str | None = None,
    ttl_ms: int | None = None,
) -> tuple[aio_pika.Queue, aio_pika.Queue]:
    """
    Set up main queue + Dead Letter Queue pattern.
    Rejected / expired messages in main_queue are routed to dlq.
    Returns (main_queue_obj, dlq_obj).

    Example:
        main_q, dlq = await setup_dlq(ch, "orders", ttl_ms=30_000)
    """
    dlq  = dlq_name or f"{main_queue}.dlq"
    dlx  = f"{main_queue}.dlx"

    await declare_exchange(ch, dlx, ExchangeType.FANOUT)
    dlq_obj  = await declare_queue(ch, dlq, durable=True)
    await dlq_obj.bind(dlx)
    main_obj = await declare_queue(
        ch, main_queue,
        durable=True,
        ttl_ms=ttl_ms,
        dead_letter_exchange=dlx,
    )
    return main_obj, dlq_obj


# ─────────────────────────────────────────────────────────────────────────────
# 3. Publisher
# ─────────────────────────────────────────────────────────────────────────────

def build_message(
    body: dict | str | bytes,
    persistent: bool = True,
    content_type: str = "application/json",
    headers: dict | None = None,
    expiration_ms: int | None = None,
    correlation_id: str | None = None,
    reply_to: str | None = None,
) -> Message:
    """Build an aio_pika Message from Python objects."""
    if isinstance(body, dict):
        raw = json.dumps(body).encode()
        content_type = "application/json"
    elif isinstance(body, str):
        raw = body.encode()
    else:
        raw = body

    return Message(
        raw,
        delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
        content_type=content_type,
        headers=headers,
        expiration=expiration_ms,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )


async def publish(
    ch: Channel,
    body: dict | str | bytes,
    queue: str = "",
    exchange: str = "",
    routing_key: str | None = None,
    persistent: bool = True,
    headers: dict | None = None,
    expiration_ms: int | None = None,
) -> None:
    """
    Publish a message to a queue (default exchange) or named exchange.

    Example:
        await publish(ch, {"task": "send_email", "to": "[email protected]"}, queue="tasks")
        await publish(ch, {"event": "user.created"}, exchange="events", routing_key="user.#")
    """
    msg = build_message(body, persistent=persistent, headers=headers,
                        expiration_ms=expiration_ms)
    rk  = routing_key if routing_key is not None else queue

    if exchange:
        ex = await ch.get_exchange(exchange)
        await ex.publish(msg, routing_key=rk)
    else:
        await ch.default_exchange.publish(msg, routing_key=rk)


class Publisher:
    """
    Long-lived publisher with a single RobustConnection.
    Re-opens channel transparently on reconnect.

    Usage:
        pub = Publisher("amqp://guest:guest@localhost/", queue="tasks")
        await pub.start()
        await pub.send({"task": "resize_image", "url": "..."})
        await pub.stop()
    """

    def __init__(
        self,
        url: str = "amqp://guest:guest@localhost/",
        queue: str = "default",
        exchange: str = "",
        routing_key: str | None = None,
    ) -> None:
        self._url    = url
        self._queue  = queue
        self._exc    = exchange
        self._rk     = routing_key
        self._conn: RobustConnection | None = None
        self._ch:   Channel | None = None

    async def start(self) -> None:
        self._conn = await connect(self._url)
        self._ch   = await self._conn.channel()
        if self._queue:
            await declare_queue(self._ch, self._queue)

    async def stop(self) -> None:
        if self._conn and not self._conn.is_closed:
            await self._conn.close()

    async def send(self, body: dict | str | bytes, **kwargs: Any) -> None:
        await publish(self._ch, body, queue=self._queue,
                      exchange=self._exc, routing_key=self._rk, **kwargs)

    async def __aenter__(self) -> "Publisher":
        await self.start()
        return self

    async def __aexit__(self, *_: Any) -> None:
        await self.stop()


# ─────────────────────────────────────────────────────────────────────────────
# 4. Consumer
# ─────────────────────────────────────────────────────────────────────────────

async def consume_iter(
    ch: Channel,
    queue: str,
    handler: Callable[[dict | str | bytes, AbstractIncomingMessage], Any],
    prefetch: int = 10,
    decode_json: bool = True,
) -> None:
    """
    Consume messages with async-for iterator pattern.
    handler(body, msg) — awaited if coroutine.
    Acks on success, nacks (no requeue) on exception.

    Example:
        async def process(body, msg):
            await db.insert(body)
        await consume_iter(ch, "tasks", process, prefetch=5)
    """
    await ch.set_qos(prefetch_count=prefetch)
    q = await declare_queue(ch, queue)

    async with q.iterator() as queue_iter:
        async for message in queue_iter:
            try:
                parsed = json.loads(message.body) if decode_json else message.body
            except (json.JSONDecodeError, UnicodeDecodeError):
                parsed = message.body

            try:
                async with message.process():
                    if asyncio.iscoroutinefunction(handler):
                        await handler(parsed, message)
                    else:
                        handler(parsed, message)
            except Exception as e:
                log.error("Consumer error at delivery tag %s: %s",
                          message.delivery_tag, e)
                await message.nack(requeue=False)


async def consume_callback(
    ch: Channel,
    queue: str,
    callback: Callable[[AbstractIncomingMessage], Any],
    prefetch: int = 10,
) -> str:
    """
    Register a callback-style consumer.
    Returns consumer tag (use to cancel later).

    Example:
        async def on_message(msg: AbstractIncomingMessage):
            async with msg.process():
                data = json.loads(msg.body)
                ...
        tag = await consume_callback(ch, "events", on_message)
    """
    await ch.set_qos(prefetch_count=prefetch)
    q   = await declare_queue(ch, queue)
    tag = await q.consume(callback)
    return tag


# ─────────────────────────────────────────────────────────────────────────────
# 5. Event bus pattern
# ─────────────────────────────────────────────────────────────────────────────

class AsyncEventBus:
    """
    Lightweight async event bus backed by a TOPIC exchange.
    Routing key convention: "<entity>.<event>" — e.g. "user.created".
    Subscribers bind with wildcards: "user.#", "*.created", "#".

    Usage:
        bus = AsyncEventBus("amqp://...", exchange="events")
        await bus.start()
        await bus.publish("user.created", {"id": 42})
        # In another coroutine / task:
        await bus.subscribe("user.#", "user-svc-q", handler)
        await bus.stop()
    """

    def __init__(
        self,
        url: str = "amqp://guest:guest@localhost/",
        exchange: str = "events",
    ) -> None:
        self._url = url
        self._exc = exchange
        self._conn: RobustConnection | None = None
        self._pub_ch: Channel | None = None

    async def start(self) -> None:
        self._conn   = await connect(self._url)
        self._pub_ch = await self._conn.channel()
        await declare_exchange(self._pub_ch, self._exc, ExchangeType.TOPIC)

    async def stop(self) -> None:
        if self._conn and not self._conn.is_closed:
            await self._conn.close()

    async def publish(self, routing_key: str, payload: dict) -> None:
        if self._pub_ch is None:
            raise RuntimeError("Call start() first")
        msg = build_message(payload)
        ex  = await self._pub_ch.get_exchange(self._exc)
        await ex.publish(msg, routing_key=routing_key)

    async def subscribe(
        self,
        routing_key_pattern: str,
        queue_name: str,
        handler: Callable[[dict, AbstractIncomingMessage], Any],
        prefetch: int = 10,
    ) -> None:
        """
        Bind queue to exchange and start consuming.
        Runs until cancelled.
        """
        ch      = await self._conn.channel()
        ex      = await declare_exchange(ch, self._exc, ExchangeType.TOPIC)
        q       = await declare_queue(ch, queue_name)
        await q.bind(ex, routing_key=routing_key_pattern)
        await consume_iter(ch, queue_name, handler, prefetch=prefetch)


# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI lifespan integration
# ─────────────────────────────────────────────────────────────────────────────

FASTAPI_EXAMPLE = '''
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.amqp import AsyncEventBus

bus = AsyncEventBus("amqp://guest:guest@localhost/", exchange="events")

@asynccontextmanager
async def lifespan(app: FastAPI):
    await bus.start()
    yield
    await bus.stop()

app = FastAPI(lifespan=lifespan)

@app.post("/orders/")
async def create_order(order: dict):
    order_id = 42  # from DB
    await bus.publish("order.created", {**order, "id": order_id})
    return {"id": order_id}

# Background subscriber (run in a separate task at startup):
# async def on_order(body, msg): await fulfillment_svc.process(body)
# asyncio.create_task(bus.subscribe("order.#", "fulfillment-q", on_order))
'''


# ─────────────────────────────────────────────────────────────────────────────
# Demo (API shape only — no RabbitMQ broker needed)
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    async def demo():
        print("aio-pika API demo (no broker needed for this output)")
        print("=== Message construction ===")
        msg = build_message({"event": "user.created", "id": 42})
        print(f"  Message body: {msg.body}")
        print(f"  Delivery mode: {msg.delivery_mode}")

        print("\n=== Publisher class ===")
        pub = Publisher(queue="orders")
        print(f"  Publisher created: {type(pub).__name__}")

        print("\n=== AsyncEventBus class ===")
        bus = AsyncEventBus(exchange="platform-events")
        print(f"  Bus created: {type(bus).__name__}")
        print("\nTo test with real RabbitMQ:")
        print("  docker run -d -p 5672:5672 rabbitmq:3-management")

    asyncio.run(demo())

For the pika alternative — pika’s BlockingConnection is synchronous and thread-model based; aio-pika wraps pika in a native asyncio API with async with, async for, and await msg.ack() — it integrates naturally with FastAPI, aiohttp, and Quart without blocking the event loop. For the kombu alternative — kombu is Celery’s message transport abstraction supporting RabbitMQ, Redis, SQS, and others through a unified API; aio-pika gives direct asyncio-native access to AMQP 0-9-1 protocol features (DLX, TTL, topic routing, QoS) specifically for RabbitMQ without Celery’s dependency weight. The Claude Skills 360 bundle includes aio-pika skill sets covering connect_robust() with reconnect_interval, connection_ctx() async context manager, declare_queue() with durable/TTL/DLX args, declare_exchange() with DIRECT/FANOUT/TOPIC types, setup_dlq() dead-letter queue, build_message() with DeliveryMode, publish() to default and named exchanges, Publisher class with start/stop lifecycle, consume_iter() async-for pattern, consume_callback() tag-based consumer, and AsyncEventBus topic routing. Start with the free tier to try async RabbitMQ AMQP code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free