aio-pika is an asyncio-native RabbitMQ client wrapping pika. pip install aio-pika. Connect: import aio_pika; conn = await aio_pika.connect_robust("amqp://guest:guest@localhost/"). Channel: ch = await conn.channel(). Declare queue: q = await ch.declare_queue("tasks", durable=True). Publish: await ch.default_exchange.publish(aio_pika.Message(b"hello", delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key="tasks"). Consume iterator: async with q.iterator() as qi: async for msg in qi: async with msg.process(): print(msg.body). Consume callback: await q.consume(callback). Ack: await msg.ack(). Nack: await msg.nack(requeue=True). Reject: await msg.reject(). Exchange: ex = await ch.declare_exchange("logs", aio_pika.ExchangeType.FANOUT). Topic: ExchangeType.TOPIC. Direct: ExchangeType.DIRECT. Bind: await q.bind(ex, routing_key="info.#"). Prefetch: await ch.set_qos(prefetch_count=10). Robust: aio_pika.connect_robust(url, reconnect_interval=5) — auto-reconnects on disconnect. Context manager: async with await aio_pika.connect(url) as conn:. Message headers: Message(body, headers={"x-retry": "1"}). Expiry: Message(body, expiration=30000). Close: await conn.close(). Claude Code generates aio-pika async producers, consumer workers, topic routers, and FastAPI lifespan integrations.
CLAUDE.md for aio-pika
## aio-pika Stack
- Version: aio-pika >= 9.0 | pip install aio-pika
- Connect: await aio_pika.connect_robust("amqp://guest:guest@localhost/")
- Publish: await exchange.publish(Message(body, delivery_mode=PERSISTENT), routing_key="q")
- Consume: async with queue.iterator() as qi: async for msg in qi: async with msg.process(): ...
- Exchange: await ch.declare_exchange("name", ExchangeType.TOPIC)
- Prefetch: await ch.set_qos(prefetch_count=10)
aio-pika Async RabbitMQ Pipeline
# app/amqp.py — aio-pika producer, consumer, topic exchange, DLQ, and FastAPI integration
from __future__ import annotations
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from dataclasses import asdict, dataclass
from typing import Any, AsyncGenerator, Callable
import aio_pika
from aio_pika import (
Channel,
DeliveryMode,
ExchangeType,
Message,
RobustConnection,
)
from aio_pika.abc import AbstractIncomingMessage
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
async def connect(
url: str = "amqp://guest:guest@localhost/",
reconnect_interval: float = 5.0,
) -> RobustConnection:
"""
Open a RobustConnection that auto-reconnects on broker restarts.
url: AMQP URL — amqp://user:pass@host:5672/vhost
Usage:
conn = await connect("amqp://user:pass@rabbitmq:5672/")
ch = await conn.channel()
"""
return await aio_pika.connect_robust(
url,
reconnect_interval=reconnect_interval,
)
@asynccontextmanager
async def connection_ctx(
url: str = "amqp://guest:guest@localhost/",
prefetch_count: int = 10,
) -> AsyncGenerator[Channel, None]:
"""
Async context manager yielding a ready Channel.
Handles open/close and QoS setup.
Usage:
async with connection_ctx("amqp://...") as ch:
await publish(ch, "tasks", {"job": "send_email"})
"""
conn = await connect(url)
async with conn:
ch = await conn.channel()
await ch.set_qos(prefetch_count=prefetch_count)
yield ch
# ─────────────────────────────────────────────────────────────────────────────
# 2. Queue / exchange setup
# ─────────────────────────────────────────────────────────────────────────────
async def declare_queue(
ch: Channel,
name: str,
durable: bool = True,
exclusive: bool = False,
auto_delete: bool = False,
ttl_ms: int | None = None,
dead_letter_exchange: str | None = None,
) -> aio_pika.Queue:
"""
Declare a queue.
ttl_ms: per-message TTL before routing to DLX.
dead_letter_exchange: DLX name for expired/rejected messages.
"""
args: dict[str, Any] = {}
if ttl_ms is not None:
args["x-message-ttl"] = ttl_ms
if dead_letter_exchange:
args["x-dead-letter-exchange"] = dead_letter_exchange
return await ch.declare_queue(
name,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=args or None,
)
async def declare_exchange(
ch: Channel,
name: str,
exchange_type: ExchangeType = ExchangeType.DIRECT,
durable: bool = True,
) -> aio_pika.Exchange:
"""
Declare an exchange.
exchange_type: DIRECT | FANOUT | TOPIC | HEADERS
"""
return await ch.declare_exchange(
name,
exchange_type,
durable=durable,
)
async def setup_dlq(
ch: Channel,
main_queue: str,
dlq_name: str | None = None,
ttl_ms: int | None = None,
) -> tuple[aio_pika.Queue, aio_pika.Queue]:
"""
Set up main queue + Dead Letter Queue pattern.
Rejected / expired messages in main_queue are routed to dlq.
Returns (main_queue_obj, dlq_obj).
Example:
main_q, dlq = await setup_dlq(ch, "orders", ttl_ms=30_000)
"""
dlq = dlq_name or f"{main_queue}.dlq"
dlx = f"{main_queue}.dlx"
await declare_exchange(ch, dlx, ExchangeType.FANOUT)
dlq_obj = await declare_queue(ch, dlq, durable=True)
await dlq_obj.bind(dlx)
main_obj = await declare_queue(
ch, main_queue,
durable=True,
ttl_ms=ttl_ms,
dead_letter_exchange=dlx,
)
return main_obj, dlq_obj
# ─────────────────────────────────────────────────────────────────────────────
# 3. Publisher
# ─────────────────────────────────────────────────────────────────────────────
def build_message(
body: dict | str | bytes,
persistent: bool = True,
content_type: str = "application/json",
headers: dict | None = None,
expiration_ms: int | None = None,
correlation_id: str | None = None,
reply_to: str | None = None,
) -> Message:
"""Build an aio_pika Message from Python objects."""
if isinstance(body, dict):
raw = json.dumps(body).encode()
content_type = "application/json"
elif isinstance(body, str):
raw = body.encode()
else:
raw = body
return Message(
raw,
delivery_mode=DeliveryMode.PERSISTENT if persistent else DeliveryMode.NOT_PERSISTENT,
content_type=content_type,
headers=headers,
expiration=expiration_ms,
correlation_id=correlation_id,
reply_to=reply_to,
)
async def publish(
ch: Channel,
body: dict | str | bytes,
queue: str = "",
exchange: str = "",
routing_key: str | None = None,
persistent: bool = True,
headers: dict | None = None,
expiration_ms: int | None = None,
) -> None:
"""
Publish a message to a queue (default exchange) or named exchange.
Example:
await publish(ch, {"task": "send_email", "to": "[email protected]"}, queue="tasks")
await publish(ch, {"event": "user.created"}, exchange="events", routing_key="user.#")
"""
msg = build_message(body, persistent=persistent, headers=headers,
expiration_ms=expiration_ms)
rk = routing_key if routing_key is not None else queue
if exchange:
ex = await ch.get_exchange(exchange)
await ex.publish(msg, routing_key=rk)
else:
await ch.default_exchange.publish(msg, routing_key=rk)
class Publisher:
"""
Long-lived publisher with a single RobustConnection.
Re-opens channel transparently on reconnect.
Usage:
pub = Publisher("amqp://guest:guest@localhost/", queue="tasks")
await pub.start()
await pub.send({"task": "resize_image", "url": "..."})
await pub.stop()
"""
def __init__(
self,
url: str = "amqp://guest:guest@localhost/",
queue: str = "default",
exchange: str = "",
routing_key: str | None = None,
) -> None:
self._url = url
self._queue = queue
self._exc = exchange
self._rk = routing_key
self._conn: RobustConnection | None = None
self._ch: Channel | None = None
async def start(self) -> None:
self._conn = await connect(self._url)
self._ch = await self._conn.channel()
if self._queue:
await declare_queue(self._ch, self._queue)
async def stop(self) -> None:
if self._conn and not self._conn.is_closed:
await self._conn.close()
async def send(self, body: dict | str | bytes, **kwargs: Any) -> None:
await publish(self._ch, body, queue=self._queue,
exchange=self._exc, routing_key=self._rk, **kwargs)
async def __aenter__(self) -> "Publisher":
await self.start()
return self
async def __aexit__(self, *_: Any) -> None:
await self.stop()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Consumer
# ─────────────────────────────────────────────────────────────────────────────
async def consume_iter(
ch: Channel,
queue: str,
handler: Callable[[dict | str | bytes, AbstractIncomingMessage], Any],
prefetch: int = 10,
decode_json: bool = True,
) -> None:
"""
Consume messages with async-for iterator pattern.
handler(body, msg) — awaited if coroutine.
Acks on success, nacks (no requeue) on exception.
Example:
async def process(body, msg):
await db.insert(body)
await consume_iter(ch, "tasks", process, prefetch=5)
"""
await ch.set_qos(prefetch_count=prefetch)
q = await declare_queue(ch, queue)
async with q.iterator() as queue_iter:
async for message in queue_iter:
try:
parsed = json.loads(message.body) if decode_json else message.body
except (json.JSONDecodeError, UnicodeDecodeError):
parsed = message.body
try:
async with message.process():
if asyncio.iscoroutinefunction(handler):
await handler(parsed, message)
else:
handler(parsed, message)
except Exception as e:
log.error("Consumer error at delivery tag %s: %s",
message.delivery_tag, e)
await message.nack(requeue=False)
async def consume_callback(
ch: Channel,
queue: str,
callback: Callable[[AbstractIncomingMessage], Any],
prefetch: int = 10,
) -> str:
"""
Register a callback-style consumer.
Returns consumer tag (use to cancel later).
Example:
async def on_message(msg: AbstractIncomingMessage):
async with msg.process():
data = json.loads(msg.body)
...
tag = await consume_callback(ch, "events", on_message)
"""
await ch.set_qos(prefetch_count=prefetch)
q = await declare_queue(ch, queue)
tag = await q.consume(callback)
return tag
# ─────────────────────────────────────────────────────────────────────────────
# 5. Event bus pattern
# ─────────────────────────────────────────────────────────────────────────────
class AsyncEventBus:
"""
Lightweight async event bus backed by a TOPIC exchange.
Routing key convention: "<entity>.<event>" — e.g. "user.created".
Subscribers bind with wildcards: "user.#", "*.created", "#".
Usage:
bus = AsyncEventBus("amqp://...", exchange="events")
await bus.start()
await bus.publish("user.created", {"id": 42})
# In another coroutine / task:
await bus.subscribe("user.#", "user-svc-q", handler)
await bus.stop()
"""
def __init__(
self,
url: str = "amqp://guest:guest@localhost/",
exchange: str = "events",
) -> None:
self._url = url
self._exc = exchange
self._conn: RobustConnection | None = None
self._pub_ch: Channel | None = None
async def start(self) -> None:
self._conn = await connect(self._url)
self._pub_ch = await self._conn.channel()
await declare_exchange(self._pub_ch, self._exc, ExchangeType.TOPIC)
async def stop(self) -> None:
if self._conn and not self._conn.is_closed:
await self._conn.close()
async def publish(self, routing_key: str, payload: dict) -> None:
if self._pub_ch is None:
raise RuntimeError("Call start() first")
msg = build_message(payload)
ex = await self._pub_ch.get_exchange(self._exc)
await ex.publish(msg, routing_key=routing_key)
async def subscribe(
self,
routing_key_pattern: str,
queue_name: str,
handler: Callable[[dict, AbstractIncomingMessage], Any],
prefetch: int = 10,
) -> None:
"""
Bind queue to exchange and start consuming.
Runs until cancelled.
"""
ch = await self._conn.channel()
ex = await declare_exchange(ch, self._exc, ExchangeType.TOPIC)
q = await declare_queue(ch, queue_name)
await q.bind(ex, routing_key=routing_key_pattern)
await consume_iter(ch, queue_name, handler, prefetch=prefetch)
# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI lifespan integration
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = '''
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.amqp import AsyncEventBus
bus = AsyncEventBus("amqp://guest:guest@localhost/", exchange="events")
@asynccontextmanager
async def lifespan(app: FastAPI):
await bus.start()
yield
await bus.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/orders/")
async def create_order(order: dict):
order_id = 42 # from DB
await bus.publish("order.created", {**order, "id": order_id})
return {"id": order_id}
# Background subscriber (run in a separate task at startup):
# async def on_order(body, msg): await fulfillment_svc.process(body)
# asyncio.create_task(bus.subscribe("order.#", "fulfillment-q", on_order))
'''
# ─────────────────────────────────────────────────────────────────────────────
# Demo (API shape only — no RabbitMQ broker needed)
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
async def demo():
print("aio-pika API demo (no broker needed for this output)")
print("=== Message construction ===")
msg = build_message({"event": "user.created", "id": 42})
print(f" Message body: {msg.body}")
print(f" Delivery mode: {msg.delivery_mode}")
print("\n=== Publisher class ===")
pub = Publisher(queue="orders")
print(f" Publisher created: {type(pub).__name__}")
print("\n=== AsyncEventBus class ===")
bus = AsyncEventBus(exchange="platform-events")
print(f" Bus created: {type(bus).__name__}")
print("\nTo test with real RabbitMQ:")
print(" docker run -d -p 5672:5672 rabbitmq:3-management")
asyncio.run(demo())
For the pika alternative — pika’s BlockingConnection is synchronous and thread-model based; aio-pika wraps pika in a native asyncio API with async with, async for, and await msg.ack() — it integrates naturally with FastAPI, aiohttp, and Quart without blocking the event loop. For the kombu alternative — kombu is Celery’s message transport abstraction supporting RabbitMQ, Redis, SQS, and others through a unified API; aio-pika gives direct asyncio-native access to AMQP 0-9-1 protocol features (DLX, TTL, topic routing, QoS) specifically for RabbitMQ without Celery’s dependency weight. The Claude Skills 360 bundle includes aio-pika skill sets covering connect_robust() with reconnect_interval, connection_ctx() async context manager, declare_queue() with durable/TTL/DLX args, declare_exchange() with DIRECT/FANOUT/TOPIC types, setup_dlq() dead-letter queue, build_message() with DeliveryMode, publish() to default and named exchanges, Publisher class with start/stop lifecycle, consume_iter() async-for pattern, consume_callback() tag-based consumer, and AsyncEventBus topic routing. Start with the free tier to try async RabbitMQ AMQP code generation.