Claude Code for pyzmq: ZeroMQ Messaging in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for pyzmq: ZeroMQ Messaging in Python
AI

Claude Code for pyzmq: ZeroMQ Messaging in Python

Published: April 4, 2028
Read time: 5 min read
By: Claude Skills 360

pyzmq provides ZeroMQ bindings for Python — lightweight, brokerless messaging for distributed systems. pip install pyzmq. Context: import zmq; ctx = zmq.Context(). REQ-REP: sock = ctx.socket(zmq.REP); sock.bind("tcp://*:5555"). msg = sock.recv(); sock.send(reply). Client: sock = ctx.socket(zmq.REQ); sock.connect("tcp://localhost:5555"); sock.send(b"Hello"); reply = sock.recv(). PUB: pub = ctx.socket(zmq.PUB); pub.bind("tcp://*:5556"); pub.send_multipart([topic.encode(), msg]). SUB: sub = ctx.socket(zmq.SUB); sub.connect(...); sub.setsockopt(zmq.SUBSCRIBE, b"topic"). PUSH-PULL: push.send(work); result = pull.recv(). JSON: sock.send_json({"cmd":"ping"}); data = sock.recv_json(). Multipart: sock.send_multipart([id, b"", payload]). Poller: poller = zmq.Poller(); poller.register(sock, zmq.POLLIN); events = dict(poller.poll(timeout=1000)). LINGER: sock.setsockopt(zmq.LINGER, 0). HWM: sock.setsockopt(zmq.SNDHWM, 1000). RCVTIMEO: sock.setsockopt(zmq.RCVTIMEO, 5000). Async: import zmq.asyncio; ctx = zmq.asyncio.Context(); await sock.recv(). DEALER-ROUTER: async routing without blocking. Identity: sock.setsockopt(zmq.IDENTITY, b"worker-1"). msgpack: sock.send(msgpack.packb(data)); msgpack.unpackb(sock.recv()). ctx.term(). Claude Code generates pyzmq messaging servers, work queues, and pub-sub pipelines.

CLAUDE.md for pyzmq

## pyzmq Stack
- Version: pyzmq >= 26.0 | pip install pyzmq
- Context: ctx = zmq.Context() | async: zmq.asyncio.Context()
- REQ-REP: server bind tcp://*:port; client connect; send/recv pairs
- PUB-SUB: pub.send_multipart([topic_bytes, msg]); sub.setsockopt(SUBSCRIBE, b"topic")
- PUSH-PULL: push.send(work_bytes); pull.recv() — round-robin distribution
- JSON: sock.send_json(dict) | sock.recv_json() — auto-serializes
- Poller: zmq.Poller + poll(timeout_ms) — multiplex multiple sockets

pyzmq Messaging Pipeline

# app/messaging.py — pyzmq REQ/REP, PUB/SUB, PUSH/PULL, async, and poller
from __future__ import annotations

import asyncio
import json
import threading
import time
from typing import Any, Callable, Iterator

import zmq
import zmq.asyncio


# ─────────────────────────────────────────────────────────────────────────────
# 1. Context management
# ─────────────────────────────────────────────────────────────────────────────

def make_context(io_threads: int = 1) -> zmq.Context:
    """Create a ZeroMQ context."""
    return zmq.Context(io_threads)


def configure_socket(
    sock: zmq.Socket,
    linger: int = 0,
    rcvtimeo: int = -1,
    sndtimeo: int = -1,
    sndhwm: int = 0,
    rcvhwm: int = 0,
) -> zmq.Socket:
    """
    Apply common socket options.
    linger=0: socket closes immediately on ctx.term() (no queued messages).
    rcvtimeo: milliseconds to wait on recv before raising EAGAIN (-1 = infinite).
    """
    sock.setsockopt(zmq.LINGER, linger)
    if rcvtimeo >= 0:
        sock.setsockopt(zmq.RCVTIMEO, rcvtimeo)
    if sndtimeo >= 0:
        sock.setsockopt(zmq.SNDTIMEO, sndtimeo)
    if sndhwm > 0:
        sock.setsockopt(zmq.SNDHWM, sndhwm)
    if rcvhwm > 0:
        sock.setsockopt(zmq.RCVHWM, rcvhwm)
    return sock


# ─────────────────────────────────────────────────────────────────────────────
# 2. REQ-REP (request-reply)
# ─────────────────────────────────────────────────────────────────────────────

class ReplyServer:
    """
    REP socket server — receives requests and sends replies synchronously.

    Usage:
        def handle(msg: dict) -> dict:
            return {"pong": msg.get("ping")}

        server = ReplyServer("tcp://*:5555", handle)
        server.run()  # blocks
    """

    def __init__(
        self,
        address: str,
        handler: Callable[[dict], dict],
        ctx: zmq.Context | None = None,
    ):
        self._address = address
        self._handler = handler
        self._ctx = ctx or make_context()
        self._running = False

    def run(self, max_messages: int | None = None) -> None:
        sock = configure_socket(self._ctx.socket(zmq.REP))
        sock.bind(self._address)
        self._running = True
        count = 0
        try:
            while self._running:
                if max_messages and count >= max_messages:
                    break
                try:
                    msg = sock.recv_json(flags=zmq.NOBLOCK)
                    reply = self._handler(msg)
                    sock.send_json(reply)
                    count += 1
                except zmq.Again:
                    time.sleep(0.01)
        finally:
            sock.close()

    def stop(self):
        self._running = False


class RequestClient:
    """
    REQ socket client — sends a JSON request and waits for a reply.

    Usage:
        with RequestClient("tcp://localhost:5555") as client:
            reply = client.request({"ping": "hello"})
    """

    def __init__(self, address: str, timeout: int = 5000, ctx: zmq.Context | None = None):
        self._address = address
        self._timeout = timeout
        self._ctx = ctx or make_context()
        self._sock: zmq.Socket | None = None

    def __enter__(self) -> "RequestClient":
        self._sock = configure_socket(
            self._ctx.socket(zmq.REQ), rcvtimeo=self._timeout
        )
        self._sock.connect(self._address)
        return self

    def __exit__(self, *_):
        if self._sock:
            self._sock.close()

    def request(self, data: dict) -> dict:
        """Send a request dict and return the reply dict."""
        self._sock.send_json(data)
        return self._sock.recv_json()


# ─────────────────────────────────────────────────────────────────────────────
# 3. PUB-SUB (publish-subscribe)
# ─────────────────────────────────────────────────────────────────────────────

class Publisher:
    """
    PUB socket — broadcasts messages to all subscribers on a topic.

    Usage:
        with Publisher("tcp://*:5556") as pub:
            pub.publish("metrics", {"cpu": 42.1})
    """

    def __init__(self, address: str, ctx: zmq.Context | None = None):
        self._ctx = ctx or make_context()
        self._sock = configure_socket(self._ctx.socket(zmq.PUB))
        self._sock.bind(address)
        time.sleep(0.05)  # allow subscribers to connect

    def __enter__(self) -> "Publisher":
        return self

    def __exit__(self, *_):
        self._sock.close()

    def publish(self, topic: str, data: Any) -> None:
        """Publish a JSON message to a topic."""
        payload = json.dumps(data).encode()
        self._sock.send_multipart([topic.encode(), payload])

    def publish_raw(self, topic: bytes, data: bytes) -> None:
        self._sock.send_multipart([topic, data])


class Subscriber:
    """
    SUB socket — receives messages matching subscribed topics.

    Usage:
        with Subscriber("tcp://localhost:5556", topics=["metrics"]) as sub:
            for topic, msg in sub.messages(10):
                print(topic, msg)
    """

    def __init__(
        self,
        address: str,
        topics: list[str] | None = None,
        timeout: int = 1000,
        ctx: zmq.Context | None = None,
    ):
        self._ctx = ctx or make_context()
        self._sock = configure_socket(
            self._ctx.socket(zmq.SUB), rcvtimeo=timeout
        )
        self._sock.connect(address)
        for topic in (topics or [""]):
            self._sock.setsockopt(zmq.SUBSCRIBE, topic.encode())

    def __enter__(self) -> "Subscriber":
        return self

    def __exit__(self, *_):
        self._sock.close()

    def recv(self) -> tuple[str, Any] | None:
        """Receive one message. Returns (topic, parsed_json) or None on timeout."""
        try:
            parts = self._sock.recv_multipart()
            topic = parts[0].decode()
            data  = json.loads(parts[1])
            return topic, data
        except zmq.Again:
            return None

    def messages(self, count: int | None = None) -> Iterator[tuple[str, Any]]:
        """Yield (topic, msg) tuples up to count (or forever)."""
        received = 0
        while count is None or received < count:
            msg = self.recv()
            if msg is not None:
                yield msg
                received += 1


# ─────────────────────────────────────────────────────────────────────────────
# 4. PUSH-PULL (work distribution)
# ─────────────────────────────────────────────────────────────────────────────

class WorkQueue:
    """
    PUSH socket — distributes work items round-robin to PULL workers.

    Usage:
        with WorkQueue("tcp://*:5557") as q:
            for item in items:
                q.push(item)
            q.push_sentinel(n_workers)  # tell workers to stop
    """

    SENTINEL = {"__stop__": True}

    def __init__(self, address: str, ctx: zmq.Context | None = None):
        self._ctx = ctx or make_context()
        self._sock = configure_socket(self._ctx.socket(zmq.PUSH))
        self._sock.bind(address)

    def __enter__(self) -> "WorkQueue":
        return self

    def __exit__(self, *_):
        self._sock.close()

    def push(self, item: Any) -> None:
        self._sock.send_json(item)

    def push_sentinel(self, n: int = 1) -> None:
        """Push n stop sentinels — one per worker."""
        for _ in range(n):
            self.push(self.SENTINEL)


class Worker:
    """
    PULL socket — receives and processes work items.

    Usage:
        def process(item: dict) -> None:
            print("Processing:", item)

        with Worker("tcp://localhost:5557", process) as w:
            w.run()
    """

    def __init__(
        self,
        address: str,
        fn: Callable[[dict], None],
        timeout: int = 1000,
        ctx: zmq.Context | None = None,
    ):
        self._ctx = ctx or make_context()
        self._sock = configure_socket(
            self._ctx.socket(zmq.PULL), rcvtimeo=timeout
        )
        self._sock.connect(address)
        self._fn = fn

    def __enter__(self) -> "Worker":
        return self

    def __exit__(self, *_):
        self._sock.close()

    def run(self) -> None:
        """Process items until sentinel received."""
        while True:
            try:
                item = self._sock.recv_json()
                if item == WorkQueue.SENTINEL:
                    break
                self._fn(item)
            except zmq.Again:
                continue


# ─────────────────────────────────────────────────────────────────────────────
# 5. Async REQ-REP with zmq.asyncio
# ─────────────────────────────────────────────────────────────────────────────

async def async_server(
    address: str,
    handler: Callable[[dict], dict],
    max_messages: int | None = None,
) -> None:
    """
    Async REP server with zmq.asyncio.
    Run with: asyncio.run(async_server("tcp://*:5558", handler))
    """
    ctx  = zmq.asyncio.Context()
    sock = ctx.socket(zmq.REP)
    sock.bind(address)
    count = 0
    try:
        while max_messages is None or count < max_messages:
            msg = await sock.recv_json()
            reply = handler(msg)
            await sock.send_json(reply)
            count += 1
    finally:
        sock.close()
        ctx.term()


async def async_request(address: str, data: dict, timeout: float = 5.0) -> dict:
    """
    Async REQ client — send a request and await reply.
    """
    ctx  = zmq.asyncio.Context()
    sock = ctx.socket(zmq.REQ)
    sock.connect(address)
    try:
        await sock.send_json(data)
        reply = await asyncio.wait_for(sock.recv_json(), timeout=timeout)
        return reply
    finally:
        sock.close(linger=0)
        ctx.term()


# ─────────────────────────────────────────────────────────────────────────────
# 6. Poller — multiplex sockets
# ─────────────────────────────────────────────────────────────────────────────

class MultiPoller:
    """
    Poll multiple sockets simultaneously.

    Usage:
        poller = MultiPoller()
        poller.add(sock_a, zmq.POLLIN)
        poller.add(sock_b, zmq.POLLIN)
        ready = poller.poll(timeout=1000)
        if sock_a in ready:
            msg = sock_a.recv()
    """

    def __init__(self):
        self._poller = zmq.Poller()
        self._sockets: list[zmq.Socket] = []

    def add(self, sock: zmq.Socket, events: int = zmq.POLLIN) -> None:
        self._poller.register(sock, events)
        self._sockets.append(sock)

    def poll(self, timeout: int = 1000) -> dict[zmq.Socket, int]:
        """Returns {socket: event_mask} for sockets with activity."""
        return dict(self._poller.poll(timeout))

    def remove(self, sock: zmq.Socket) -> None:
        self._poller.unregister(sock)
        self._sockets.remove(sock)


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

def demo_req_rep():
    """REQ-REP demo using threads."""
    print("=== REQ-REP ===")
    ctx = make_context()
    results: list[dict] = []

    def _server():
        def echo(msg: dict) -> dict:
            return {"echo": msg.get("data"), "ok": True}
        server = ReplyServer("tcp://*:5599", echo, ctx=ctx)
        server.run(max_messages=3)

    t = threading.Thread(target=_server, daemon=True)
    t.start()
    time.sleep(0.1)

    with RequestClient("tcp://localhost:5599", ctx=ctx) as client:
        for i in range(3):
            reply = client.request({"data": f"message-{i}"})
            results.append(reply)
            print(f"  Reply: {reply}")


def demo_pub_sub():
    """PUB-SUB demo."""
    print("\n=== PUB-SUB ===")
    ctx = make_context()
    received: list[Any] = []

    def _subscriber():
        with Subscriber("tcp://localhost:5600", topics=["metrics"], timeout=500, ctx=ctx) as sub:
            for topic, msg in sub.messages(3):
                received.append((topic, msg))

    t = threading.Thread(target=_subscriber, daemon=True)
    t.start()
    time.sleep(0.1)

    with Publisher("tcp://*:5600", ctx=ctx) as pub:
        for i in range(3):
            pub.publish("metrics", {"cpu": i * 10, "mem": 60})
            time.sleep(0.05)

    t.join(timeout=2)
    for topic, msg in received:
        print(f"  [{topic}] {msg}")


def demo_push_pull():
    """PUSH-PULL work queue demo."""
    print("\n=== PUSH-PULL ===")
    ctx = make_context()
    processed: list[Any] = []

    def _worker():
        def process(item: dict) -> None:
            processed.append(item)

        with Worker("tcp://localhost:5601", process, ctx=ctx) as w:
            w.run()

    t = threading.Thread(target=_worker, daemon=True)
    t.start()
    time.sleep(0.1)

    with WorkQueue("tcp://*:5601", ctx=ctx) as q:
        for i in range(4):
            q.push({"task": i, "payload": f"data-{i}"})
        q.push_sentinel(1)

    t.join(timeout=2)
    print(f"  Processed {len(processed)} items: {processed}")


if __name__ == "__main__":
    demo_req_rep()
    demo_pub_sub()
    demo_push_pull()
    ctx = make_context()
    ctx.term()

For the redis pub-sub alternative — Redis pub-sub is broker-based (requires a running Redis server) and adds persistence, message history, and consumer groups; ZeroMQ (pyzmq) is brokerless — no server to run — making it ideal for in-process and LAN messaging where you control both ends and don’t need persistence. For the pika / RabbitMQ alternative — RabbitMQ with pika adds durable queues, dead-letter exchanges, routing keys, and admin UI but requires running a broker process; ZeroMQ is lighter and better for high-throughput pipelines, direct service-to-service messaging, and prototyping distributed architectures before adding a broker. The Claude Skills 360 bundle includes pyzmq skill sets covering zmq.Context, configure_socket() with LINGER/RCVTIMEO/HWM, ReplyServer/RequestClient REQ-REP, Publisher/Subscriber PUB-SUB with topic routing, WorkQueue/Worker PUSH-PULL distribution, async_server()/async_request() with zmq.asyncio, MultiPoller multiplexed polling, send_json()/recv_json() JSON messaging, send_multipart()/recv_multipart() framing, and threading-based multi-process demo. Start with the free tier to try ZeroMQ messaging code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free