Claude Code for pika: RabbitMQ AMQP Messaging in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for pika: RabbitMQ AMQP Messaging in Python
AI

Claude Code for pika: RabbitMQ AMQP Messaging in Python

Published: May 11, 2028
Read time: 5 min read
By: Claude Skills 360

pika is a Python RabbitMQ AMQP 0-9-1 client. pip install pika. Connect: import pika; conn = pika.BlockingConnection(pika.ConnectionParameters("localhost")); ch = conn.channel(). Declare queue: ch.queue_declare(queue="tasks", durable=True). Publish: ch.basic_publish(exchange="", routing_key="tasks", body="Hello", properties=pika.BasicProperties(delivery_mode=2)). Consume: ch.basic_consume(queue="tasks", on_message_callback=callback, auto_ack=False). ch.start_consuming(). Callback: def callback(ch, method, properties, body): ...; ch.basic_ack(delivery_tag=method.delivery_tag). Prefetch: ch.basic_qos(prefetch_count=1). Nack: ch.basic_nack(delivery_tag=..., requeue=True). Exchange: ch.exchange_declare(exchange="logs", exchange_type="fanout"). Topic: exchange_type="topic". Direct: exchange_type="direct". Bind: ch.queue_bind(exchange="logs", queue="q1"). Credentials: pika.PlainCredentials("user","pass"). URL: pika.URLParameters("amqp://user:pass@host:5672/vhost"). TLS: ssl.SSLContext in pika.SSLOptions. Heartbeat: ConnectionParameters(heartbeat=600). Reconnect: connection_attempts=5, retry_delay=5. Close: conn.close(). Async: pika.SelectConnection. Thread: use connection.add_callback_threadsafe. Claude Code generates pika publishers, consumer workers, dead-letter queues, and RabbitMQ topology scripts.

CLAUDE.md for pika

## pika Stack
- Version: pika >= 1.3 | pip install pika
- Connect: BlockingConnection(ConnectionParameters("host")) → channel()
- Publish: ch.basic_publish(exchange="", routing_key="q", body=msg, properties=...)
- Consume: ch.basic_consume(queue="q", on_message_callback=cb, auto_ack=False)
- Ack: ch.basic_ack(delivery_tag=method.delivery_tag)
- Durable: queue_declare(durable=True) + BasicProperties(delivery_mode=2)

pika RabbitMQ Messaging Pipeline

# app/rabbit.py — pika publisher, consumer, topic exchange, DLQ, and thread-safe helpers
from __future__ import annotations

import json
import logging
import threading
import time
from contextlib import contextmanager
from typing import Any, Callable

import pika
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic, BasicProperties


log = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────

def make_parameters(
    host: str = "localhost",
    port: int = 5672,
    vhost: str = "/",
    user: str = "guest",
    password: str = "guest",
    heartbeat: int = 600,
    blocked_connection_timeout: int = 300,
    connection_attempts: int = 5,
    retry_delay: float = 2.0,
) -> pika.ConnectionParameters:
    """
    Build pika ConnectionParameters.
    heartbeat: keeps TCP connection alive (0 = disabled, 600 = 10 min).
    """
    credentials = pika.PlainCredentials(user, password)
    return pika.ConnectionParameters(
        host=host,
        port=port,
        virtual_host=vhost,
        credentials=credentials,
        heartbeat=heartbeat,
        blocked_connection_timeout=blocked_connection_timeout,
        connection_attempts=connection_attempts,
        retry_delay=retry_delay,
    )


def connect(
    host: str = "localhost",
    port: int = 5672,
    user: str = "guest",
    password: str = "guest",
    vhost: str = "/",
    url: str | None = None,
) -> pika.BlockingConnection:
    """
    Create a BlockingConnection.
    url: AMQP URL overrides other params (amqp://user:pass@host:5672/vhost).
    """
    if url:
        params = pika.URLParameters(url)
    else:
        params = make_parameters(host=host, port=port, user=user,
                                 password=password, vhost=vhost)
    return pika.BlockingConnection(params)


@contextmanager
def channel_ctx(conn: pika.BlockingConnection):
    """Context manager that opens a channel and closes it on exit."""
    ch = conn.channel()
    try:
        yield ch
    finally:
        try:
            ch.close()
        except Exception:
            pass


# ─────────────────────────────────────────────────────────────────────────────
# 2. Queue / exchange setup
# ─────────────────────────────────────────────────────────────────────────────

def declare_queue(
    ch: BlockingChannel,
    queue_name: str,
    durable: bool = True,
    exclusive: bool = False,
    ttl_ms: int | None = None,
    dead_letter_exchange: str | None = None,
) -> str:
    """
    Declare a queue. Returns the actual queue name (useful for anonymous queues).
    durable: survive broker restart.
    ttl_ms: message TTL in milliseconds.
    dead_letter_exchange: route expired/rejected messages here.
    """
    args: dict[str, Any] = {}
    if ttl_ms is not None:
        args["x-message-ttl"] = ttl_ms
    if dead_letter_exchange:
        args["x-dead-letter-exchange"] = dead_letter_exchange

    result = ch.queue_declare(
        queue=queue_name,
        durable=durable,
        exclusive=exclusive,
        arguments=args if args else None,
    )
    return result.method.queue


def declare_exchange(
    ch: BlockingChannel,
    exchange: str,
    exchange_type: str = "direct",
    durable: bool = True,
) -> None:
    """
    Declare an exchange.
    exchange_type: "direct" | "fanout" | "topic" | "headers"
    """
    ch.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=durable)


def bind_queue(
    ch: BlockingChannel,
    queue: str,
    exchange: str,
    routing_key: str = "",
) -> None:
    """Bind a queue to an exchange with a routing key."""
    ch.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)


def setup_dlq(
    ch: BlockingChannel,
    main_queue: str,
    dlq_name: str | None = None,
    ttl_ms: int | None = None,
) -> tuple[str, str]:
    """
    Set up a main queue with a Dead Letter Queue.
    Expired/rejected messages from main_queue go to dlq.
    Returns (main_queue_name, dlq_name).
    """
    dlq = dlq_name or f"{main_queue}.dlq"
    dlx = f"{main_queue}.dlx"

    declare_exchange(ch, dlx, "fanout")
    declare_queue(ch, dlq, durable=True)
    bind_queue(ch, dlq, dlx)
    declare_queue(ch, main_queue, durable=True,
                  ttl_ms=ttl_ms, dead_letter_exchange=dlx)

    return main_queue, dlq


# ─────────────────────────────────────────────────────────────────────────────
# 3. Publisher
# ─────────────────────────────────────────────────────────────────────────────

def publish(
    ch: BlockingChannel,
    body: str | bytes | dict,
    queue: str = "",
    exchange: str = "",
    routing_key: str | None = None,
    persistent: bool = True,
    content_type: str = "application/json",
    headers: dict | None = None,
    correlation_id: str | None = None,
    reply_to: str | None = None,
) -> None:
    """
    Publish a message.
    body: str/bytes/dict (dict is JSON-serialized automatically).
    persistent: delivery_mode=2 — survives broker restart.
    """
    if isinstance(body, dict):
        raw = json.dumps(body).encode()
        content_type = "application/json"
    elif isinstance(body, str):
        raw = body.encode()
    else:
        raw = body

    props = BasicProperties(
        delivery_mode=2 if persistent else 1,
        content_type=content_type,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
    )
    rk = routing_key if routing_key is not None else queue
    ch.basic_publish(exchange=exchange, routing_key=rk, body=raw, properties=props)


class Publisher:
    """
    Reusable publisher with connection management and reconnect.

    Usage:
        pub = Publisher("localhost", queue="tasks")
        pub.send({"task": "send_email", "to": "[email protected]"})
        pub.close()
    """

    def __init__(
        self,
        host: str = "localhost",
        queue: str = "default",
        exchange: str = "",
        routing_key: str | None = None,
        user: str = "guest",
        password: str = "guest",
        url: str | None = None,
    ) -> None:
        self._host     = host
        self._queue    = queue
        self._exchange = exchange
        self._rk       = routing_key
        self._user     = user
        self._password = password
        self._url      = url
        self._conn: pika.BlockingConnection | None = None
        self._ch:   BlockingChannel | None = None
        self._connect()

    def _connect(self) -> None:
        self._conn = connect(self._host, user=self._user,
                             password=self._password, url=self._url)
        self._ch   = self._conn.channel()
        if self._queue:
            declare_queue(self._ch, self._queue)

    def send(self, body: str | bytes | dict, **kwargs: Any) -> None:
        """Publish a message, reconnecting once if the connection dropped."""
        try:
            publish(self._ch, body, queue=self._queue,
                    exchange=self._exchange,
                    routing_key=self._rk, **kwargs)
        except (pika.exceptions.AMQPConnectionError,
                pika.exceptions.ChannelWrongStateError):
            log.warning("Connection lost — reconnecting")
            self._connect()
            publish(self._ch, body, queue=self._queue,
                    exchange=self._exchange, routing_key=self._rk, **kwargs)

    def close(self) -> None:
        if self._conn and not self._conn.is_closed:
            self._conn.close()


# ─────────────────────────────────────────────────────────────────────────────
# 4. Consumer
# ─────────────────────────────────────────────────────────────────────────────

def consume(
    conn: pika.BlockingConnection,
    queue: str,
    callback: Callable[[dict | str | bytes], None],
    prefetch: int = 1,
    auto_ack: bool = False,
    decode_json: bool = True,
) -> None:
    """
    Start consuming messages from a queue (blocking).
    callback: called with the parsed message body.
    prefetch: number of unacked messages the broker sends at once.
    decode_json: auto-parse JSON bodies to dict.
    """
    ch = conn.channel()
    declare_queue(ch, queue)
    ch.basic_qos(prefetch_count=prefetch)

    def _on_message(ch: BlockingChannel, method: Basic.Deliver,
                    properties: BasicProperties, body: bytes) -> None:
        try:
            parsed: Any = json.loads(body) if decode_json else body
        except (json.JSONDecodeError, UnicodeDecodeError):
            parsed = body
        try:
            callback(parsed)
            if not auto_ack:
                ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            log.error("Message handler error: %s", e)
            if not auto_ack:
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

    ch.basic_consume(queue=queue, on_message_callback=_on_message,
                     auto_ack=auto_ack)
    log.info("Consuming from %s (CTRL+C to stop)", queue)
    try:
        ch.start_consuming()
    except KeyboardInterrupt:
        ch.stop_consuming()
    conn.close()


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Demo: publish 3 messages and consume them (requires local RabbitMQ)
    try:
        conn = connect()
        ch   = conn.channel()
        declare_queue(ch, "demo_tasks")

        print("=== Publishing 3 messages ===")
        for i in range(3):
            publish(ch, {"task": "process", "id": i})
            print(f"  Published task {i}")

        print("\n=== Consuming (3 messages then stop) ===")
        consumed = [0]

        def handler(msg):
            consumed[0] += 1
            print(f"  Received: {msg}")
            if consumed[0] >= 3:
                ch.stop_consuming()

        ch.basic_qos(prefetch_count=1)
        ch.basic_consume("demo_tasks", on_message_callback=lambda c,m,p,b: (
            handler(json.loads(b)), c.basic_ack(m.delivery_tag)
        ), auto_ack=False)
        ch.start_consuming()
        conn.close()

    except pika.exceptions.AMQPConnectionError:
        print("RabbitMQ not available — start with: docker run -d -p 5672:5672 rabbitmq")

For the aio-pika alternative — aio-pika wraps pika in an asyncio-native API with async with connection/channel management and awaitable message acknowledgement; pika’s BlockingConnection is better for synchronous workers and scripts, while aio-pika is the right choice when you need to mix RabbitMQ consumption with other async I/O (FastAPI, aiohttp) in the same event loop. For the kombu alternative — kombu is Celery’s underlying message transport abstraction that supports RabbitMQ, Redis, SQS, and more transports through the same API; pika gives direct, protocol-level access to AMQP 0-9-1 features (dead-letter exchanges, message TTL, custom headers, basic_qos) without the Celery dependency overhead. The Claude Skills 360 bundle includes pika skill sets covering make_parameters()/connect() with heartbeat/retry, declare_queue() with durable/TTL/DLQ args, declare_exchange()/bind_queue(), setup_dlq() dead-letter queue setup, publish() with JSON auto-encoding and delivery_mode, Publisher class with reconnect, consume() blocking consumer, basic_ack/basic_nack, and prefetch_count QoS. Start with the free tier to try RabbitMQ AMQP messaging code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free