pika is a Python RabbitMQ AMQP 0-9-1 client. pip install pika. Connect: import pika; conn = pika.BlockingConnection(pika.ConnectionParameters("localhost")); ch = conn.channel(). Declare queue: ch.queue_declare(queue="tasks", durable=True). Publish: ch.basic_publish(exchange="", routing_key="tasks", body="Hello", properties=pika.BasicProperties(delivery_mode=2)). Consume: ch.basic_consume(queue="tasks", on_message_callback=callback, auto_ack=False). ch.start_consuming(). Callback: def callback(ch, method, properties, body): ...; ch.basic_ack(delivery_tag=method.delivery_tag). Prefetch: ch.basic_qos(prefetch_count=1). Nack: ch.basic_nack(delivery_tag=..., requeue=True). Exchange: ch.exchange_declare(exchange="logs", exchange_type="fanout"). Topic: exchange_type="topic". Direct: exchange_type="direct". Bind: ch.queue_bind(exchange="logs", queue="q1"). Credentials: pika.PlainCredentials("user","pass"). URL: pika.URLParameters("amqp://user:pass@host:5672/vhost"). TLS: ssl.SSLContext in pika.SSLOptions. Heartbeat: ConnectionParameters(heartbeat=600). Reconnect: connection_attempts=5, retry_delay=5. Close: conn.close(). Async: pika.SelectConnection. Thread: use connection.add_callback_threadsafe. Claude Code generates pika publishers, consumer workers, dead-letter queues, and RabbitMQ topology scripts.
CLAUDE.md for pika
## pika Stack
- Version: pika >= 1.3 | pip install pika
- Connect: BlockingConnection(ConnectionParameters("host")) → channel()
- Publish: ch.basic_publish(exchange="", routing_key="q", body=msg, properties=...)
- Consume: ch.basic_consume(queue="q", on_message_callback=cb, auto_ack=False)
- Ack: ch.basic_ack(delivery_tag=method.delivery_tag)
- Durable: queue_declare(durable=True) + BasicProperties(delivery_mode=2)
pika RabbitMQ Messaging Pipeline
# app/rabbit.py — pika publisher, consumer, topic exchange, DLQ, and thread-safe helpers
from __future__ import annotations
import json
import logging
import threading
import time
from contextlib import contextmanager
from typing import Any, Callable
import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
def make_parameters(
host: str = "localhost",
port: int = 5672,
vhost: str = "/",
user: str = "guest",
password: str = "guest",
heartbeat: int = 600,
blocked_connection_timeout: int = 300,
connection_attempts: int = 5,
retry_delay: float = 2.0,
) -> pika.ConnectionParameters:
"""
Build pika ConnectionParameters.
heartbeat: keeps TCP connection alive (0 = disabled, 600 = 10 min).
"""
credentials = pika.PlainCredentials(user, password)
return pika.ConnectionParameters(
host=host,
port=port,
virtual_host=vhost,
credentials=credentials,
heartbeat=heartbeat,
blocked_connection_timeout=blocked_connection_timeout,
connection_attempts=connection_attempts,
retry_delay=retry_delay,
)
def connect(
host: str = "localhost",
port: int = 5672,
user: str = "guest",
password: str = "guest",
vhost: str = "/",
url: str | None = None,
) -> pika.BlockingConnection:
"""
Create a BlockingConnection.
url: AMQP URL overrides other params (amqp://user:pass@host:5672/vhost).
"""
if url:
params = pika.URLParameters(url)
else:
params = make_parameters(host=host, port=port, user=user,
password=password, vhost=vhost)
return pika.BlockingConnection(params)
@contextmanager
def channel_ctx(conn: pika.BlockingConnection):
"""Context manager that opens a channel and closes it on exit."""
ch = conn.channel()
try:
yield ch
finally:
try:
ch.close()
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# 2. Queue / exchange setup
# ─────────────────────────────────────────────────────────────────────────────
def declare_queue(
ch: BlockingChannel,
queue_name: str,
durable: bool = True,
exclusive: bool = False,
ttl_ms: int | None = None,
dead_letter_exchange: str | None = None,
) -> str:
"""
Declare a queue. Returns the actual queue name (useful for anonymous queues).
durable: survive broker restart.
ttl_ms: message TTL in milliseconds.
dead_letter_exchange: route expired/rejected messages here.
"""
args: dict[str, Any] = {}
if ttl_ms is not None:
args["x-message-ttl"] = ttl_ms
if dead_letter_exchange:
args["x-dead-letter-exchange"] = dead_letter_exchange
result = ch.queue_declare(
queue=queue_name,
durable=durable,
exclusive=exclusive,
arguments=args if args else None,
)
return result.method.queue
def declare_exchange(
ch: BlockingChannel,
exchange: str,
exchange_type: str = "direct",
durable: bool = True,
) -> None:
"""
Declare an exchange.
exchange_type: "direct" | "fanout" | "topic" | "headers"
"""
ch.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=durable)
def bind_queue(
ch: BlockingChannel,
queue: str,
exchange: str,
routing_key: str = "",
) -> None:
"""Bind a queue to an exchange with a routing key."""
ch.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)
def setup_dlq(
ch: BlockingChannel,
main_queue: str,
dlq_name: str | None = None,
ttl_ms: int | None = None,
) -> tuple[str, str]:
"""
Set up a main queue with a Dead Letter Queue.
Expired/rejected messages from main_queue go to dlq.
Returns (main_queue_name, dlq_name).
"""
dlq = dlq_name or f"{main_queue}.dlq"
dlx = f"{main_queue}.dlx"
declare_exchange(ch, dlx, "fanout")
declare_queue(ch, dlq, durable=True)
bind_queue(ch, dlq, dlx)
declare_queue(ch, main_queue, durable=True,
ttl_ms=ttl_ms, dead_letter_exchange=dlx)
return main_queue, dlq
# ─────────────────────────────────────────────────────────────────────────────
# 3. Publisher
# ─────────────────────────────────────────────────────────────────────────────
def publish(
ch: BlockingChannel,
body: str | bytes | dict,
queue: str = "",
exchange: str = "",
routing_key: str | None = None,
persistent: bool = True,
content_type: str = "application/json",
headers: dict | None = None,
correlation_id: str | None = None,
reply_to: str | None = None,
) -> None:
"""
Publish a message.
body: str/bytes/dict (dict is JSON-serialized automatically).
persistent: delivery_mode=2 — survives broker restart.
"""
if isinstance(body, dict):
raw = json.dumps(body).encode()
content_type = "application/json"
elif isinstance(body, str):
raw = body.encode()
else:
raw = body
props = BasicProperties(
delivery_mode=2 if persistent else 1,
content_type=content_type,
headers=headers,
correlation_id=correlation_id,
reply_to=reply_to,
)
rk = routing_key if routing_key is not None else queue
ch.basic_publish(exchange=exchange, routing_key=rk, body=raw, properties=props)
class Publisher:
"""
Reusable publisher with connection management and reconnect.
Usage:
pub = Publisher("localhost", queue="tasks")
pub.send({"task": "send_email", "to": "[email protected]"})
pub.close()
"""
def __init__(
self,
host: str = "localhost",
queue: str = "default",
exchange: str = "",
routing_key: str | None = None,
user: str = "guest",
password: str = "guest",
url: str | None = None,
) -> None:
self._host = host
self._queue = queue
self._exchange = exchange
self._rk = routing_key
self._user = user
self._password = password
self._url = url
self._conn: pika.BlockingConnection | None = None
self._ch: BlockingChannel | None = None
self._connect()
def _connect(self) -> None:
self._conn = connect(self._host, user=self._user,
password=self._password, url=self._url)
self._ch = self._conn.channel()
if self._queue:
declare_queue(self._ch, self._queue)
def send(self, body: str | bytes | dict, **kwargs: Any) -> None:
"""Publish a message, reconnecting once if the connection dropped."""
try:
publish(self._ch, body, queue=self._queue,
exchange=self._exchange,
routing_key=self._rk, **kwargs)
except (pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelWrongStateError):
log.warning("Connection lost — reconnecting")
self._connect()
publish(self._ch, body, queue=self._queue,
exchange=self._exchange, routing_key=self._rk, **kwargs)
def close(self) -> None:
if self._conn and not self._conn.is_closed:
self._conn.close()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Consumer
# ─────────────────────────────────────────────────────────────────────────────
def consume(
conn: pika.BlockingConnection,
queue: str,
callback: Callable[[dict | str | bytes], None],
prefetch: int = 1,
auto_ack: bool = False,
decode_json: bool = True,
) -> None:
"""
Start consuming messages from a queue (blocking).
callback: called with the parsed message body.
prefetch: number of unacked messages the broker sends at once.
decode_json: auto-parse JSON bodies to dict.
"""
ch = conn.channel()
declare_queue(ch, queue)
ch.basic_qos(prefetch_count=prefetch)
def _on_message(ch: BlockingChannel, method: Basic.Deliver,
properties: BasicProperties, body: bytes) -> None:
try:
parsed: Any = json.loads(body) if decode_json else body
except (json.JSONDecodeError, UnicodeDecodeError):
parsed = body
try:
callback(parsed)
if not auto_ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
log.error("Message handler error: %s", e)
if not auto_ack:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_consume(queue=queue, on_message_callback=_on_message,
auto_ack=auto_ack)
log.info("Consuming from %s (CTRL+C to stop)", queue)
try:
ch.start_consuming()
except KeyboardInterrupt:
ch.stop_consuming()
conn.close()
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Demo: publish 3 messages and consume them (requires local RabbitMQ)
try:
conn = connect()
ch = conn.channel()
declare_queue(ch, "demo_tasks")
print("=== Publishing 3 messages ===")
for i in range(3):
publish(ch, {"task": "process", "id": i})
print(f" Published task {i}")
print("\n=== Consuming (3 messages then stop) ===")
consumed = [0]
def handler(msg):
consumed[0] += 1
print(f" Received: {msg}")
if consumed[0] >= 3:
ch.stop_consuming()
ch.basic_qos(prefetch_count=1)
ch.basic_consume("demo_tasks", on_message_callback=lambda c,m,p,b: (
handler(json.loads(b)), c.basic_ack(m.delivery_tag)
), auto_ack=False)
ch.start_consuming()
conn.close()
except pika.exceptions.AMQPConnectionError:
print("RabbitMQ not available — start with: docker run -d -p 5672:5672 rabbitmq")
For the aio-pika alternative — aio-pika wraps pika in an asyncio-native API with async with connection/channel management and awaitable message acknowledgement; pika’s BlockingConnection is better for synchronous workers and scripts, while aio-pika is the right choice when you need to mix RabbitMQ consumption with other async I/O (FastAPI, aiohttp) in the same event loop. For the kombu alternative — kombu is Celery’s underlying message transport abstraction that supports RabbitMQ, Redis, SQS, and more transports through the same API; pika gives direct, protocol-level access to AMQP 0-9-1 features (dead-letter exchanges, message TTL, custom headers, basic_qos) without the Celery dependency overhead. The Claude Skills 360 bundle includes pika skill sets covering make_parameters()/connect() with heartbeat/retry, declare_queue() with durable/TTL/DLQ args, declare_exchange()/bind_queue(), setup_dlq() dead-letter queue setup, publish() with JSON auto-encoding and delivery_mode, Publisher class with reconnect, consume() blocking consumer, basic_ack/basic_nack, and prefetch_count QoS. Start with the free tier to try RabbitMQ AMQP messaging code generation.