Message queues decouple services, absorb traffic spikes, and make systems resilient to downstream failures. They also introduce complexity: ordering guarantees, at-least-once vs exactly-once delivery, dead letter handling, consumer group management. Claude Code handles messaging infrastructure well because these are patterns with established solutions — and it knows the sharp edges.
This guide covers Kafka and RabbitMQ with Claude Code: producers, consumers, dead letter queues, exactly-once processing, and event-driven patterns.
Kafka Fundamentals with Claude Code
CLAUDE.md for a Kafka-heavy project
## Messaging
- Kafka cluster: localhost:9092 (dev), kafka.internal:9092 (prod)
- Schema registry: http://localhost:8081
- Topics follow naming: {domain}.{entity}.{event} (e.g., orders.order.completed)
- Consumer group ID format: {service-name}-{topic}-consumer
- Always use schemas for new topics (Avro via schema registry)
- Dead letter topics: {original-topic}.dlq
- Retention: 7 days default, 30 days for compliance topics
Producer Setup
Create a Kafka producer for the orders service.
Messages should be schema-validated and delivered reliably.
Use exactly-once semantics for payment events.
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'orders-service',
brokers: process.env.KAFKA_BROKERS?.split(',') ?? ['localhost:9092'],
ssl: process.env.NODE_ENV === 'production',
sasl: process.env.NODE_ENV === 'production' ? {
mechanism: 'plain',
username: process.env.KAFKA_USERNAME!,
password: process.env.KAFKA_PASSWORD!,
} : undefined,
});
const registry = new SchemaRegistry({
host: process.env.SCHEMA_REGISTRY_URL ?? 'http://localhost:8081',
});
// Standard producer — at-least-once delivery
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
compression: CompressionTypes.GZIP,
retry: {
initialRetryTime: 300,
retries: 10,
},
});
// Idempotent producer for payment events — exactly-once within a partition
const idempotentProducer = kafka.producer({
idempotent: true, // Prevents duplicate messages on retry
transactionalId: 'orders-payment-producer', // Enables transactions
maxInFlightRequests: 5, // Required with idempotent: true
});
async function publishOrderCompleted(order: Order): Promise<void> {
const schema = await registry.getLatestSchemaId('orders.order.completed-value');
const encodedValue = await registry.encode(schema, {
orderId: order.id,
userId: order.userId,
totalCents: order.totalCents,
items: order.items,
completedAt: new Date().toISOString(),
});
await producer.send({
topic: 'orders.order.completed',
messages: [{
key: order.id, // Partition key — same order always goes to same partition
value: encodedValue,
headers: {
'event-type': 'order.completed',
'schema-version': '1',
'source-service': 'orders',
'correlation-id': order.correlationId,
},
}],
});
}
Using order.id as the partition key is critical for ordering: all events for the same order land on the same partition, so consumers see them in sequence.
Transactional Publish (Outbox Pattern)
The payment service needs to update the database AND publish a Kafka event atomically.
If the DB commit fails, the event must not be published.
If Kafka is down, the DB update should still succeed.
Claude Code implements the Outbox pattern:
async function processPayment(payment: Payment, db: Transaction): Promise<void> {
// Step 1: Write to DB and outbox in one transaction
await db.transaction(async (trx) => {
await trx('payments').insert({
id: payment.id,
order_id: payment.orderId,
amount_cents: payment.amountCents,
status: 'completed',
});
// Write to outbox — will be published by separate worker
await trx('outbox_events').insert({
id: crypto.randomUUID(),
topic: 'payments.payment.completed',
partition_key: payment.orderId,
payload: JSON.stringify({ paymentId: payment.id, orderId: payment.orderId }),
created_at: new Date(),
published_at: null, // Null = not yet published
});
});
}
// Separate outbox worker — runs every 100ms
async function outboxWorker(): Promise<void> {
while (true) {
const events = await db('outbox_events')
.where('published_at', null)
.orderBy('created_at')
.limit(100)
.forUpdate() // Lock rows to prevent duplicate publishing by concurrent workers
.skipLocked(); // Skip rows locked by other workers
if (events.length === 0) {
await sleep(100);
continue;
}
for (const event of events) {
await producer.send({
topic: event.topic,
messages: [{ key: event.partition_key, value: event.payload }],
});
await db('outbox_events').where('id', event.id).update({
published_at: new Date(),
});
}
}
}
The DB and Kafka are consistent: if the app crashes between publishing and marking as published, the outbox worker republishes on restart. Consumers must be idempotent (deduplicate by event ID) since delivery is at-least-once.
Consumer Groups
Create a consumer group for the notifications service.
It subscribes to orders.order.completed and sends a confirmation email.
Handle failures gracefully — failed messages should go to a dead letter queue after 3 retries.
const consumer = kafka.consumer({
groupId: 'notifications-service-orders-consumer',
retry: {
initialRetryTime: 1000,
retries: 3,
},
});
const dlqProducer = kafka.producer();
async function startConsumer(): Promise<void> {
await consumer.connect();
await consumer.subscribe({
topics: ['orders.order.completed'],
fromBeginning: false,
});
await consumer.run({
// Process one batch at a time — don't commit until all messages in batch succeed
eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
try {
const event = await registry.decode(message.value!);
await processWithRetry(event, message);
resolveOffset(message.offset);
await heartbeat(); // Keep consumer alive during long processing
} catch (error) {
// Max retries exceeded — send to DLQ
await dlqProducer.send({
topic: `orders.order.completed.dlq`,
messages: [{
key: message.key,
value: message.value,
headers: {
...message.headers,
'dlq-original-topic': 'orders.order.completed',
'dlq-original-partition': batch.partition.toString(),
'dlq-original-offset': message.offset,
'dlq-error': (error as Error).message,
'dlq-timestamp': Date.now().toString(),
},
}],
});
// Resolve offset so we don't reprocess — message is in DLQ
resolveOffset(message.offset);
}
}
await commitOffsetsIfNecessary();
},
});
}
Processing the Dead Letter Queue
Build a DLQ viewer and reprocessor.
Operations team needs to see failed messages, fix the issue, then replay them.
// DLQ admin API
app.get('/admin/dlq/:topic', async (req, res) => {
const { topic } = req.params;
const dlqTopic = `${topic}.dlq`;
// Read from DLQ without consuming (use admin to seek to timestamp)
const admin = kafka.admin();
const offsets = await admin.fetchTopicOffsets(dlqTopic);
const messages = await readMessagesFromTopic(dlqTopic, offsets);
res.json(messages.map(msg => ({
offset: msg.offset,
key: msg.key?.toString(),
error: msg.headers?.['dlq-error']?.toString(),
originalOffset: msg.headers?.['dlq-original-offset']?.toString(),
timestamp: new Date(Number(msg.headers?.['dlq-timestamp'])).toISOString(),
payload: JSON.parse(msg.value?.toString() ?? '{}'),
})));
});
app.post('/admin/dlq/:topic/replay', async (req, res) => {
const { topic } = req.params;
const { offsets } = req.body; // Specific offsets to replay
// Re-publish messages to original topic
const dlqMessages = await readDlqMessages(topic, offsets);
await producer.send({
topic,
messages: dlqMessages.map(msg => ({
key: msg.key,
value: msg.value,
headers: { ...msg.headers, 'replayed': 'true', 'replayed-at': Date.now().toString() },
})),
});
res.json({ replayed: dlqMessages.length });
});
RabbitMQ Patterns
This service uses RabbitMQ (not Kafka).
Set up a work queue for sending emails — multiple workers,
one email per worker, retry on failure.
import amqp from 'amqplib';
const QUEUE = 'email_queue';
const DLX = 'email_dlx'; // Dead letter exchange
const DLQ = 'email_dlq'; // Dead letter queue
async function setupRabbitMQ(): Promise<amqp.Channel> {
const connection = await amqp.connect(process.env.RABBITMQ_URL ?? 'amqp://localhost');
const channel = await connection.createChannel();
// Set up dead letter exchange first
await channel.assertExchange(DLX, 'direct', { durable: true });
await channel.assertQueue(DLQ, { durable: true });
await channel.bindQueue(DLQ, DLX, QUEUE);
// Main queue — messages go to DLX after 3 redeliveries
await channel.assertQueue(QUEUE, {
durable: true,
arguments: {
'x-dead-letter-exchange': DLX,
'x-dead-letter-routing-key': QUEUE,
'x-message-ttl': 86400000, // 24h TTL
},
});
// One message per worker at a time — fair dispatch
await channel.prefetch(1);
return channel;
}
// Publisher
async function queueEmail(emailData: EmailJob): Promise<void> {
const channel = await setupRabbitMQ();
channel.sendToQueue(
QUEUE,
Buffer.from(JSON.stringify(emailData)),
{
persistent: true, // Survive broker restart
messageId: crypto.randomUUID(),
timestamp: Date.now(),
contentType: 'application/json',
headers: { 'retry-count': 0 },
},
);
}
// Consumer
async function startEmailWorker(): Promise<void> {
const channel = await setupRabbitMQ();
console.log('Email worker waiting for messages...');
channel.consume(QUEUE, async (msg) => {
if (!msg) return;
const retryCount = (msg.properties.headers['retry-count'] as number) ?? 0;
try {
const emailJob = JSON.parse(msg.content.toString()) as EmailJob;
await sendEmail(emailJob);
channel.ack(msg);
} catch (error) {
if (retryCount < 3) {
// Requeue with incremented retry count
channel.nack(msg, false, false); // false, false = don't requeue → goes to DLX
// Re-publish with delay using TTL queue trick
await queueEmailWithDelay({
...JSON.parse(msg.content.toString()),
retryCount: retryCount + 1,
}, retryCount * 5000);
} else {
// Max retries — send to DLQ for manual inspection
channel.nack(msg, false, false);
console.error('Message sent to DLQ after max retries', { messageId: msg.properties.messageId });
}
}
});
}
Monitoring Consumer Lag
Consumer lag is spiking during traffic peaks.
Add monitoring so we alert when lag exceeds 10,000 messages.
async function monitorConsumerLag(): Promise<void> {
const admin = kafka.admin();
setInterval(async () => {
const groups = await admin.listGroups();
for (const group of groups.groups) {
const offsets = await admin.fetchOffsets({ groupId: group.groupId });
const topicOffsets = await admin.fetchTopicOffsets(/* relevant topic */);
let totalLag = 0;
for (const partition of offsets[0].partitions) {
const endOffset = topicOffsets.find(t => t.partition === partition.partition);
const lag = Number(endOffset?.offset ?? 0) - Number(partition.offset);
totalLag += lag;
}
// Export to Prometheus
consumerLagGauge.set(
{ group_id: group.groupId },
totalLag,
);
if (totalLag > 10_000) {
await alerting.send({
severity: 'warning',
message: `Consumer lag ${totalLag} for group ${group.groupId}`,
});
}
}
}, 30_000); // Check every 30 seconds
}
High consumer lag usually means: consumers are too slow (horizontal scale), messages got too large (batching issue), or there’s a processing bottleneck downstream (like a database query that’s suddenly slow). Claude Code diagnoses which by comparing message rate vs. consumer throughput metrics.
Choosing Between Kafka and RabbitMQ
Should this system use Kafka or RabbitMQ?
We need: background jobs, email sending, and an audit log for all state changes.
Claude Code’s recommendation pattern:
Use RabbitMQ when: you need traditional message queue semantics — tasks that should be processed once, routing with exchanges/bindings, and no need to replay or reread events. Email sending, background jobs, webhook delivery.
Use Kafka when: you need an event log — replayable history, fan-out to many consumers, time-travel queries, event sourcing, analytics pipelines alongside operational consumers. Audit logs, event streaming between microservices, change data capture.
Both: Kafka for the audit log and state-change events. RabbitMQ for email sending and jobs (or use Kafka for everything and keep it simple — Kafka can do both).
For the overall event-driven architecture connecting multiple services, see the microservices guide. For observability into message processing — tracking consumer lag, dead letter rates, and processing time — the observability guide covers Prometheus metrics setup. The Claude Skills 360 bundle includes messaging patterns skills for Kafka, RabbitMQ, and AWS SQS/SNS. Start with the free tier to try message queue pattern generation on your project.