pyzmq provides ZeroMQ bindings for Python — lightweight, brokerless messaging for distributed systems. pip install pyzmq. Context: import zmq; ctx = zmq.Context(). REQ-REP: sock = ctx.socket(zmq.REP); sock.bind("tcp://*:5555"). msg = sock.recv(); sock.send(reply). Client: sock = ctx.socket(zmq.REQ); sock.connect("tcp://localhost:5555"); sock.send(b"Hello"); reply = sock.recv(). PUB: pub = ctx.socket(zmq.PUB); pub.bind("tcp://*:5556"); pub.send_multipart([topic.encode(), msg]). SUB: sub = ctx.socket(zmq.SUB); sub.connect(...); sub.setsockopt(zmq.SUBSCRIBE, b"topic"). PUSH-PULL: push.send(work); result = pull.recv(). JSON: sock.send_json({"cmd":"ping"}); data = sock.recv_json(). Multipart: sock.send_multipart([id, b"", payload]). Poller: poller = zmq.Poller(); poller.register(sock, zmq.POLLIN); events = dict(poller.poll(timeout=1000)). LINGER: sock.setsockopt(zmq.LINGER, 0). HWM: sock.setsockopt(zmq.SNDHWM, 1000). RCVTIMEO: sock.setsockopt(zmq.RCVTIMEO, 5000). Async: import zmq.asyncio; ctx = zmq.asyncio.Context(); await sock.recv(). DEALER-ROUTER: async routing without blocking. Identity: sock.setsockopt(zmq.IDENTITY, b"worker-1"). msgpack: sock.send(msgpack.packb(data)); msgpack.unpackb(sock.recv()). ctx.term(). Claude Code generates pyzmq messaging servers, work queues, and pub-sub pipelines.
CLAUDE.md for pyzmq
## pyzmq Stack
- Version: pyzmq >= 26.0 | pip install pyzmq
- Context: ctx = zmq.Context() | async: zmq.asyncio.Context()
- REQ-REP: server bind tcp://*:port; client connect; send/recv pairs
- PUB-SUB: pub.send_multipart([topic_bytes, msg]); sub.setsockopt(SUBSCRIBE, b"topic")
- PUSH-PULL: push.send(work_bytes); pull.recv() — round-robin distribution
- JSON: sock.send_json(dict) | sock.recv_json() — auto-serializes
- Poller: zmq.Poller + poll(timeout_ms) — multiplex multiple sockets
pyzmq Messaging Pipeline
# app/messaging.py — pyzmq REQ/REP, PUB/SUB, PUSH/PULL, async, and poller
from __future__ import annotations
import asyncio
import json
import threading
import time
from typing import Any, Callable, Iterator
import zmq
import zmq.asyncio
# ─────────────────────────────────────────────────────────────────────────────
# 1. Context management
# ─────────────────────────────────────────────────────────────────────────────
def make_context(io_threads: int = 1) -> zmq.Context:
"""Create a ZeroMQ context."""
return zmq.Context(io_threads)
def configure_socket(
sock: zmq.Socket,
linger: int = 0,
rcvtimeo: int = -1,
sndtimeo: int = -1,
sndhwm: int = 0,
rcvhwm: int = 0,
) -> zmq.Socket:
"""
Apply common socket options.
linger=0: socket closes immediately on ctx.term() (no queued messages).
rcvtimeo: milliseconds to wait on recv before raising EAGAIN (-1 = infinite).
"""
sock.setsockopt(zmq.LINGER, linger)
if rcvtimeo >= 0:
sock.setsockopt(zmq.RCVTIMEO, rcvtimeo)
if sndtimeo >= 0:
sock.setsockopt(zmq.SNDTIMEO, sndtimeo)
if sndhwm > 0:
sock.setsockopt(zmq.SNDHWM, sndhwm)
if rcvhwm > 0:
sock.setsockopt(zmq.RCVHWM, rcvhwm)
return sock
# ─────────────────────────────────────────────────────────────────────────────
# 2. REQ-REP (request-reply)
# ─────────────────────────────────────────────────────────────────────────────
class ReplyServer:
"""
REP socket server — receives requests and sends replies synchronously.
Usage:
def handle(msg: dict) -> dict:
return {"pong": msg.get("ping")}
server = ReplyServer("tcp://*:5555", handle)
server.run() # blocks
"""
def __init__(
self,
address: str,
handler: Callable[[dict], dict],
ctx: zmq.Context | None = None,
):
self._address = address
self._handler = handler
self._ctx = ctx or make_context()
self._running = False
def run(self, max_messages: int | None = None) -> None:
sock = configure_socket(self._ctx.socket(zmq.REP))
sock.bind(self._address)
self._running = True
count = 0
try:
while self._running:
if max_messages and count >= max_messages:
break
try:
msg = sock.recv_json(flags=zmq.NOBLOCK)
reply = self._handler(msg)
sock.send_json(reply)
count += 1
except zmq.Again:
time.sleep(0.01)
finally:
sock.close()
def stop(self):
self._running = False
class RequestClient:
"""
REQ socket client — sends a JSON request and waits for a reply.
Usage:
with RequestClient("tcp://localhost:5555") as client:
reply = client.request({"ping": "hello"})
"""
def __init__(self, address: str, timeout: int = 5000, ctx: zmq.Context | None = None):
self._address = address
self._timeout = timeout
self._ctx = ctx or make_context()
self._sock: zmq.Socket | None = None
def __enter__(self) -> "RequestClient":
self._sock = configure_socket(
self._ctx.socket(zmq.REQ), rcvtimeo=self._timeout
)
self._sock.connect(self._address)
return self
def __exit__(self, *_):
if self._sock:
self._sock.close()
def request(self, data: dict) -> dict:
"""Send a request dict and return the reply dict."""
self._sock.send_json(data)
return self._sock.recv_json()
# ─────────────────────────────────────────────────────────────────────────────
# 3. PUB-SUB (publish-subscribe)
# ─────────────────────────────────────────────────────────────────────────────
class Publisher:
"""
PUB socket — broadcasts messages to all subscribers on a topic.
Usage:
with Publisher("tcp://*:5556") as pub:
pub.publish("metrics", {"cpu": 42.1})
"""
def __init__(self, address: str, ctx: zmq.Context | None = None):
self._ctx = ctx or make_context()
self._sock = configure_socket(self._ctx.socket(zmq.PUB))
self._sock.bind(address)
time.sleep(0.05) # allow subscribers to connect
def __enter__(self) -> "Publisher":
return self
def __exit__(self, *_):
self._sock.close()
def publish(self, topic: str, data: Any) -> None:
"""Publish a JSON message to a topic."""
payload = json.dumps(data).encode()
self._sock.send_multipart([topic.encode(), payload])
def publish_raw(self, topic: bytes, data: bytes) -> None:
self._sock.send_multipart([topic, data])
class Subscriber:
"""
SUB socket — receives messages matching subscribed topics.
Usage:
with Subscriber("tcp://localhost:5556", topics=["metrics"]) as sub:
for topic, msg in sub.messages(10):
print(topic, msg)
"""
def __init__(
self,
address: str,
topics: list[str] | None = None,
timeout: int = 1000,
ctx: zmq.Context | None = None,
):
self._ctx = ctx or make_context()
self._sock = configure_socket(
self._ctx.socket(zmq.SUB), rcvtimeo=timeout
)
self._sock.connect(address)
for topic in (topics or [""]):
self._sock.setsockopt(zmq.SUBSCRIBE, topic.encode())
def __enter__(self) -> "Subscriber":
return self
def __exit__(self, *_):
self._sock.close()
def recv(self) -> tuple[str, Any] | None:
"""Receive one message. Returns (topic, parsed_json) or None on timeout."""
try:
parts = self._sock.recv_multipart()
topic = parts[0].decode()
data = json.loads(parts[1])
return topic, data
except zmq.Again:
return None
def messages(self, count: int | None = None) -> Iterator[tuple[str, Any]]:
"""Yield (topic, msg) tuples up to count (or forever)."""
received = 0
while count is None or received < count:
msg = self.recv()
if msg is not None:
yield msg
received += 1
# ─────────────────────────────────────────────────────────────────────────────
# 4. PUSH-PULL (work distribution)
# ─────────────────────────────────────────────────────────────────────────────
class WorkQueue:
"""
PUSH socket — distributes work items round-robin to PULL workers.
Usage:
with WorkQueue("tcp://*:5557") as q:
for item in items:
q.push(item)
q.push_sentinel(n_workers) # tell workers to stop
"""
SENTINEL = {"__stop__": True}
def __init__(self, address: str, ctx: zmq.Context | None = None):
self._ctx = ctx or make_context()
self._sock = configure_socket(self._ctx.socket(zmq.PUSH))
self._sock.bind(address)
def __enter__(self) -> "WorkQueue":
return self
def __exit__(self, *_):
self._sock.close()
def push(self, item: Any) -> None:
self._sock.send_json(item)
def push_sentinel(self, n: int = 1) -> None:
"""Push n stop sentinels — one per worker."""
for _ in range(n):
self.push(self.SENTINEL)
class Worker:
"""
PULL socket — receives and processes work items.
Usage:
def process(item: dict) -> None:
print("Processing:", item)
with Worker("tcp://localhost:5557", process) as w:
w.run()
"""
def __init__(
self,
address: str,
fn: Callable[[dict], None],
timeout: int = 1000,
ctx: zmq.Context | None = None,
):
self._ctx = ctx or make_context()
self._sock = configure_socket(
self._ctx.socket(zmq.PULL), rcvtimeo=timeout
)
self._sock.connect(address)
self._fn = fn
def __enter__(self) -> "Worker":
return self
def __exit__(self, *_):
self._sock.close()
def run(self) -> None:
"""Process items until sentinel received."""
while True:
try:
item = self._sock.recv_json()
if item == WorkQueue.SENTINEL:
break
self._fn(item)
except zmq.Again:
continue
# ─────────────────────────────────────────────────────────────────────────────
# 5. Async REQ-REP with zmq.asyncio
# ─────────────────────────────────────────────────────────────────────────────
async def async_server(
address: str,
handler: Callable[[dict], dict],
max_messages: int | None = None,
) -> None:
"""
Async REP server with zmq.asyncio.
Run with: asyncio.run(async_server("tcp://*:5558", handler))
"""
ctx = zmq.asyncio.Context()
sock = ctx.socket(zmq.REP)
sock.bind(address)
count = 0
try:
while max_messages is None or count < max_messages:
msg = await sock.recv_json()
reply = handler(msg)
await sock.send_json(reply)
count += 1
finally:
sock.close()
ctx.term()
async def async_request(address: str, data: dict, timeout: float = 5.0) -> dict:
"""
Async REQ client — send a request and await reply.
"""
ctx = zmq.asyncio.Context()
sock = ctx.socket(zmq.REQ)
sock.connect(address)
try:
await sock.send_json(data)
reply = await asyncio.wait_for(sock.recv_json(), timeout=timeout)
return reply
finally:
sock.close(linger=0)
ctx.term()
# ─────────────────────────────────────────────────────────────────────────────
# 6. Poller — multiplex sockets
# ─────────────────────────────────────────────────────────────────────────────
class MultiPoller:
"""
Poll multiple sockets simultaneously.
Usage:
poller = MultiPoller()
poller.add(sock_a, zmq.POLLIN)
poller.add(sock_b, zmq.POLLIN)
ready = poller.poll(timeout=1000)
if sock_a in ready:
msg = sock_a.recv()
"""
def __init__(self):
self._poller = zmq.Poller()
self._sockets: list[zmq.Socket] = []
def add(self, sock: zmq.Socket, events: int = zmq.POLLIN) -> None:
self._poller.register(sock, events)
self._sockets.append(sock)
def poll(self, timeout: int = 1000) -> dict[zmq.Socket, int]:
"""Returns {socket: event_mask} for sockets with activity."""
return dict(self._poller.poll(timeout))
def remove(self, sock: zmq.Socket) -> None:
self._poller.unregister(sock)
self._sockets.remove(sock)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
def demo_req_rep():
"""REQ-REP demo using threads."""
print("=== REQ-REP ===")
ctx = make_context()
results: list[dict] = []
def _server():
def echo(msg: dict) -> dict:
return {"echo": msg.get("data"), "ok": True}
server = ReplyServer("tcp://*:5599", echo, ctx=ctx)
server.run(max_messages=3)
t = threading.Thread(target=_server, daemon=True)
t.start()
time.sleep(0.1)
with RequestClient("tcp://localhost:5599", ctx=ctx) as client:
for i in range(3):
reply = client.request({"data": f"message-{i}"})
results.append(reply)
print(f" Reply: {reply}")
def demo_pub_sub():
"""PUB-SUB demo."""
print("\n=== PUB-SUB ===")
ctx = make_context()
received: list[Any] = []
def _subscriber():
with Subscriber("tcp://localhost:5600", topics=["metrics"], timeout=500, ctx=ctx) as sub:
for topic, msg in sub.messages(3):
received.append((topic, msg))
t = threading.Thread(target=_subscriber, daemon=True)
t.start()
time.sleep(0.1)
with Publisher("tcp://*:5600", ctx=ctx) as pub:
for i in range(3):
pub.publish("metrics", {"cpu": i * 10, "mem": 60})
time.sleep(0.05)
t.join(timeout=2)
for topic, msg in received:
print(f" [{topic}] {msg}")
def demo_push_pull():
"""PUSH-PULL work queue demo."""
print("\n=== PUSH-PULL ===")
ctx = make_context()
processed: list[Any] = []
def _worker():
def process(item: dict) -> None:
processed.append(item)
with Worker("tcp://localhost:5601", process, ctx=ctx) as w:
w.run()
t = threading.Thread(target=_worker, daemon=True)
t.start()
time.sleep(0.1)
with WorkQueue("tcp://*:5601", ctx=ctx) as q:
for i in range(4):
q.push({"task": i, "payload": f"data-{i}"})
q.push_sentinel(1)
t.join(timeout=2)
print(f" Processed {len(processed)} items: {processed}")
if __name__ == "__main__":
demo_req_rep()
demo_pub_sub()
demo_push_pull()
ctx = make_context()
ctx.term()
For the redis pub-sub alternative — Redis pub-sub is broker-based (requires a running Redis server) and adds persistence, message history, and consumer groups; ZeroMQ (pyzmq) is brokerless — no server to run — making it ideal for in-process and LAN messaging where you control both ends and don’t need persistence. For the pika / RabbitMQ alternative — RabbitMQ with pika adds durable queues, dead-letter exchanges, routing keys, and admin UI but requires running a broker process; ZeroMQ is lighter and better for high-throughput pipelines, direct service-to-service messaging, and prototyping distributed architectures before adding a broker. The Claude Skills 360 bundle includes pyzmq skill sets covering zmq.Context, configure_socket() with LINGER/RCVTIMEO/HWM, ReplyServer/RequestClient REQ-REP, Publisher/Subscriber PUB-SUB with topic routing, WorkQueue/Worker PUSH-PULL distribution, async_server()/async_request() with zmq.asyncio, MultiPoller multiplexed polling, send_json()/recv_json() JSON messaging, send_multipart()/recv_multipart() framing, and threading-based multi-process demo. Start with the free tier to try ZeroMQ messaging code generation.