kombu is a messaging library that abstracts AMQP, Redis, SQS, and other transports. pip install kombu. Connect: from kombu import Connection; conn = Connection("amqp://guest:guest@localhost//"). Redis: Connection("redis://localhost:6379/0"). SQS: Connection("sqs://"). Exchange: from kombu import Exchange; ex = Exchange("tasks", type="direct"). Queue: from kombu import Queue; q = Queue("emails", exchange=ex, routing_key="emails"). Producer: with conn.Producer() as p: p.publish({"task": "send"}, exchange=ex, routing_key="emails", serializer="json"). Consumer: with conn.Consumer(q, callbacks=[on_message]): conn.drain_events(). SimpleQueue: with conn.SimpleQueue("tasks") as sq: sq.put({"job": "process"}); msg = sq.get(block=True); msg.ack(). Declare: q.declare(). Exchange types: "direct" "fanout" "topic" "headers". Serializer: "json" "msgpack" "yaml" "pickle". Compression: "zlib" "bzip2". Retry: conn.ensure_connection(max_retries=3). Context: with conn: .... Channel: ch = conn.channel(); p = Producer(ch). Ack: message.ack(). Reject: message.reject(requeue=True). conn.as_uri(). conn.transport_cls. Claude Code generates kombu producers, consumer loops, multi-broker setups, and Celery-compatible task pipelines.
CLAUDE.md for kombu
## kombu Stack
- Version: kombu >= 5.3 | pip install kombu
- Connect: Connection("amqp://user:pass@host//") | Connection("redis://host:6379/0")
- Exchange/Queue: Exchange("name", type="direct") | Queue("q", exchange=ex, routing_key="rk")
- Publish: with conn.Producer() as p: p.publish(body, exchange=ex, routing_key="rk", serializer="json")
- Consume: with conn.Consumer(queues, callbacks=[cb]): conn.drain_events(timeout=1)
- SimpleQueue: conn.SimpleQueue("name") — dict-like, auto-declares
kombu Multi-Broker Messaging Pipeline
# app/messaging.py — kombu producer, consumer, fanout, multi-broker, and helpers
from __future__ import annotations
import json
import logging
import socket
import time
from contextlib import contextmanager
from dataclasses import asdict, dataclass
from typing import Any, Callable
from kombu import Connection, Exchange, Producer, Queue
from kombu.mixins import ConsumerMixin
from kombu.transport import pyamqp
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
def make_connection(
url: str = "amqp://guest:guest@localhost//",
heartbeat: float = 60.0,
connect_timeout: float = 4.0,
ssl: bool = False,
transport_options: dict | None = None,
) -> Connection:
"""
Create a kombu Connection.
url: broker URL — supports AMQP, redis://, sqs://, memory:// (in-process for tests)
Examples:
make_connection("amqp://guest:guest@localhost//")
make_connection("redis://localhost:6379/0")
make_connection("memory://") # in-memory for unit tests
"""
return Connection(
url,
heartbeat=heartbeat,
connect_timeout=connect_timeout,
ssl=ssl,
transport_options=transport_options or {},
)
@contextmanager
def connection_ctx(url: str = "amqp://guest:guest@localhost//", **kwargs):
"""
Context manager yielding a connected Connection.
Usage:
with connection_ctx("redis://localhost/0") as conn:
publish(conn, "events", {"type": "user.created"})
"""
conn = make_connection(url, **kwargs)
with conn:
yield conn
def ensure_connected(
conn: Connection,
max_retries: int = 5,
interval_start: float = 2.0,
interval_step: float = 1.0,
interval_max: float = 10.0,
) -> Connection:
"""
Ensure connection is open, reconnecting with exponential back-off.
Returns the connected connection.
"""
conn.ensure_connection(
max_retries=max_retries,
interval_start=interval_start,
interval_step=interval_step,
interval_max=interval_max,
)
return conn
# ─────────────────────────────────────────────────────────────────────────────
# 2. Exchange and queue setup
# ─────────────────────────────────────────────────────────────────────────────
def make_exchange(
name: str,
exchange_type: str = "direct",
durable: bool = True,
auto_delete: bool = False,
) -> Exchange:
"""
Create a kombu Exchange.
exchange_type: "direct" | "fanout" | "topic" | "headers"
"""
return Exchange(name, type=exchange_type, durable=durable, auto_delete=auto_delete)
def make_queue(
name: str,
exchange: Exchange | None = None,
routing_key: str = "",
durable: bool = True,
exclusive: bool = False,
auto_delete: bool = False,
message_ttl: int | None = None,
dead_letter_exchange: str | None = None,
) -> Queue:
"""
Create a kombu Queue.
message_ttl: milliseconds before message is routed to DLX.
dead_letter_exchange: exchange name for expired/rejected messages.
"""
kwargs: dict = {
"durable": durable,
"exclusive": exclusive,
"auto_delete": auto_delete,
}
queue_args: dict = {}
if message_ttl is not None:
queue_args["x-message-ttl"] = message_ttl
if dead_letter_exchange:
queue_args["x-dead-letter-exchange"] = dead_letter_exchange
if queue_args:
kwargs["queue_arguments"] = queue_args
if exchange:
return Queue(name, exchange=exchange, routing_key=routing_key, **kwargs)
return Queue(name, **kwargs)
def declare_entities(
conn: Connection,
*entities: Exchange | Queue,
) -> None:
"""
Declare exchanges and queues on the broker.
Idempotent — safe to call on startup.
"""
ch = conn.channel()
for entity in entities:
entity.declare(channel=ch)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Publishing
# ─────────────────────────────────────────────────────────────────────────────
def publish(
conn: Connection,
body: dict | str | bytes,
exchange: Exchange | str = "",
routing_key: str = "",
serializer: str = "json",
compression: str | None = None,
headers: dict | None = None,
delivery_mode: int = 2,
expiration: str | None = None,
retry: bool = True,
retry_policy: dict | None = None,
) -> None:
"""
Publish a message.
delivery_mode: 2 = persistent (survives broker restart), 1 = transient.
expiration: message TTL as string milliseconds e.g. "30000".
retry: auto-retry on transient connection errors.
Example:
publish(conn, {"user_id": 42, "action": "signup"}, exchange=user_ex, routing_key="user.signup")
"""
with conn.Producer() as producer:
producer.publish(
body,
exchange=exchange,
routing_key=routing_key,
serializer=serializer,
compression=compression,
headers=headers or {},
delivery_mode=delivery_mode,
expiration=expiration,
retry=retry,
retry_policy=retry_policy or {
"interval_start": 0,
"interval_step": 1,
"interval_max": 5,
"max_retries": 3,
},
)
class MessagePublisher:
"""
Reusable publisher with a persistent connection and channel.
Usage:
pub = MessagePublisher("redis://localhost/0", exchange_name="events")
pub.send({"event": "order.created", "order_id": 123}, routing_key="order.created")
pub.close()
"""
def __init__(
self,
url: str = "amqp://guest:guest@localhost//",
exchange_name: str = "default",
exchange_type: str = "topic",
default_routing_key: str = "#",
serializer: str = "json",
) -> None:
self._conn = make_connection(url)
self._exchange = make_exchange(exchange_name, exchange_type)
self._default_rk = default_routing_key
self._serializer = serializer
self._producer: Producer | None = None
self._connect()
def _connect(self) -> None:
ensure_connected(self._conn)
self._producer = self._conn.Producer()
def send(
self,
body: dict | str | bytes,
routing_key: str | None = None,
headers: dict | None = None,
) -> None:
rk = routing_key or self._default_rk
try:
self._producer.publish(body, exchange=self._exchange, routing_key=rk,
serializer=self._serializer, headers=headers or {},
delivery_mode=2, retry=True)
except Exception:
log.warning("Publish failed — reconnecting")
self._connect()
self._producer.publish(body, exchange=self._exchange, routing_key=rk,
serializer=self._serializer, delivery_mode=2, retry=True)
def close(self) -> None:
self._conn.release()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Consumer (ConsumerMixin pattern)
# ─────────────────────────────────────────────────────────────────────────────
def make_consumer_worker(
conn: Connection,
queues: list[Queue],
handler: Callable[[dict, Any], None],
prefetch_count: int = 10,
) -> type:
"""
Build and return a ConsumerMixin worker class.
Instantiate and call .run() to start consuming.
Example:
Worker = make_consumer_worker(conn, [email_q, sms_q], handle_notification)
Worker(conn).run()
"""
class Worker(ConsumerMixin):
def get_consumers(self, Consumer, channel):
return [Consumer(
queues,
callbacks=[self.on_message],
prefetch_count=prefetch_count,
)]
def on_message(self, body, message):
try:
if callable(handler):
handler(body, message)
message.ack()
except Exception as e:
log.error("Message handler error: %s", e)
message.reject(requeue=False)
return Worker
def consume_once(
conn: Connection,
queues: list[Queue] | Queue,
handler: Callable[[dict, Any], None],
timeout: float = 5.0,
limit: int = 100,
) -> int:
"""
Drain up to `limit` messages with a timeout. Returns count processed.
Useful for batch processing without a long-running loop.
Example:
processed = consume_once(conn, [tasks_q], process_task, timeout=2.0, limit=50)
"""
if isinstance(queues, Queue):
queues = [queues]
processed = [0]
def on_message(body, message):
try:
handler(body, message)
message.ack()
except Exception as e:
log.error("Handler error: %s", e)
message.reject(requeue=False)
finally:
processed[0] += 1
with conn.Consumer(queues, callbacks=[on_message]):
try:
while processed[0] < limit:
conn.drain_events(timeout=timeout)
except socket.timeout:
pass
return processed[0]
def simple_queue_send(conn: Connection, name: str, body: dict) -> None:
"""
Send a message using kombu SimpleQueue (auto-declares queue).
Works with any transport including Redis.
"""
with conn.SimpleQueue(name) as sq:
sq.put(body, serializer="json")
def simple_queue_recv(conn: Connection, name: str, block: bool = True, timeout: float = 5.0) -> dict | None:
"""
Receive one message from a SimpleQueue.
Returns None on timeout.
"""
with conn.SimpleQueue(name) as sq:
try:
msg = sq.get(block=block, timeout=timeout)
body = msg.payload
msg.ack()
return body
except sq.Empty:
return None
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# In-memory transport — no broker required for this demo
BROKER_URL = "memory://"
print("=== Exchanges and queues ===")
task_ex = make_exchange("tasks", "direct")
email_q = make_queue("emails", exchange=task_ex, routing_key="emails")
sms_q = make_queue("sms", exchange=task_ex, routing_key="sms")
print(f" Exchange: {task_ex}")
print(f" Queues: {email_q}, {sms_q}")
print("\n=== SimpleQueue round-trip (in-memory) ===")
conn = make_connection(BROKER_URL)
simple_queue_send(conn, "demo_queue", {"task": "send_email", "to": "[email protected]"})
msg = simple_queue_recv(conn, "demo_queue", block=True, timeout=2.0)
print(f" Received: {msg}")
conn.release()
print("\n=== MessagePublisher (in-memory) ===")
pub = MessagePublisher(BROKER_URL, exchange_name="events", exchange_type="topic")
pub.send({"event": "user.created", "id": 42}, routing_key="user.created")
print(" Published user.created event")
pub.close()
print("\nTo test with real RabbitMQ:")
print(" docker run -d -p 5672:5672 rabbitmq:3-management")
print(" BROKER_URL = 'amqp://guest:guest@localhost//'")
print("\nTo test with Redis:")
print(" docker run -d -p 6379:6379 redis:7")
print(" BROKER_URL = 'redis://localhost:6379/0'")
For the pika / aio-pika alternative — pika and aio-pika are AMQP-only clients giving direct RabbitMQ protocol access; kombu adds a transport abstraction layer so the same Producer/Consumer code runs against RabbitMQ, Redis, SQS, MongoDB, or an in-memory broker by changing only the connection URL — ideal for applications that need broker portability or test isolation with memory://. For the aiokafka alternative — aiokafka provides asyncio-native Kafka streaming with consumer groups and offset management; kombu targets traditional task queue patterns (direct/topic exchange, ack/nack, DLX) with synchronous or Celery-integrated usage — use aiokafka for high-throughput event streaming, kombu when you want Celery-compatible task queues that can swap brokers. The Claude Skills 360 bundle includes kombu skill sets covering make_connection() with all transport URLs, connection_ctx() manager, ensure_connected() with retry back-off, make_exchange()/make_queue() with type/DLX options, declare_entities() idempotent setup, publish() with retry policy and serialization, MessagePublisher reusable class, make_consumer_worker() ConsumerMixin builder, consume_once() batch drainer, simple_queue_send/recv() Redis-compatible helpers, and in-memory transport for unit testing. Start with the free tier to try multi-broker message transport code generation.