Claude Code for Kafka and Message Queues: Async Patterns That Scale — Claude Skills 360 Blog
Blog / Development / Claude Code for Kafka and Message Queues: Async Patterns That Scale
Development

Claude Code for Kafka and Message Queues: Async Patterns That Scale

Published: May 27, 2026
Read time: 9 min read
By: Claude Skills 360

Message queues decouple services, absorb traffic spikes, and make systems resilient to downstream failures. They also introduce complexity: ordering guarantees, at-least-once vs exactly-once delivery, dead letter handling, consumer group management. Claude Code handles messaging infrastructure well because these are patterns with established solutions — and it knows the sharp edges.

This guide covers Kafka and RabbitMQ with Claude Code: producers, consumers, dead letter queues, exactly-once processing, and event-driven patterns.

Kafka Fundamentals with Claude Code

CLAUDE.md for a Kafka-heavy project

## Messaging
- Kafka cluster: localhost:9092 (dev), kafka.internal:9092 (prod)
- Schema registry: http://localhost:8081
- Topics follow naming: {domain}.{entity}.{event} (e.g., orders.order.completed)
- Consumer group ID format: {service-name}-{topic}-consumer
- Always use schemas for new topics (Avro via schema registry)
- Dead letter topics: {original-topic}.dlq
- Retention: 7 days default, 30 days for compliance topics

Producer Setup

Create a Kafka producer for the orders service.
Messages should be schema-validated and delivered reliably.
Use exactly-once semantics for payment events.
import { Kafka, Partitioners, CompressionTypes } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';

const kafka = new Kafka({
  clientId: 'orders-service',
  brokers: process.env.KAFKA_BROKERS?.split(',') ?? ['localhost:9092'],
  ssl: process.env.NODE_ENV === 'production',
  sasl: process.env.NODE_ENV === 'production' ? {
    mechanism: 'plain',
    username: process.env.KAFKA_USERNAME!,
    password: process.env.KAFKA_PASSWORD!,
  } : undefined,
});

const registry = new SchemaRegistry({
  host: process.env.SCHEMA_REGISTRY_URL ?? 'http://localhost:8081',
});

// Standard producer — at-least-once delivery
const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
  compression: CompressionTypes.GZIP,
  retry: {
    initialRetryTime: 300,
    retries: 10,
  },
});

// Idempotent producer for payment events — exactly-once within a partition
const idempotentProducer = kafka.producer({
  idempotent: true,   // Prevents duplicate messages on retry
  transactionalId: 'orders-payment-producer',  // Enables transactions
  maxInFlightRequests: 5,  // Required with idempotent: true
});

async function publishOrderCompleted(order: Order): Promise<void> {
  const schema = await registry.getLatestSchemaId('orders.order.completed-value');
  const encodedValue = await registry.encode(schema, {
    orderId: order.id,
    userId: order.userId,
    totalCents: order.totalCents,
    items: order.items,
    completedAt: new Date().toISOString(),
  });

  await producer.send({
    topic: 'orders.order.completed',
    messages: [{
      key: order.id,         // Partition key — same order always goes to same partition
      value: encodedValue,
      headers: {
        'event-type': 'order.completed',
        'schema-version': '1',
        'source-service': 'orders',
        'correlation-id': order.correlationId,
      },
    }],
  });
}

Using order.id as the partition key is critical for ordering: all events for the same order land on the same partition, so consumers see them in sequence.

Transactional Publish (Outbox Pattern)

The payment service needs to update the database AND publish a Kafka event atomically.
If the DB commit fails, the event must not be published.
If Kafka is down, the DB update should still succeed.

Claude Code implements the Outbox pattern:

async function processPayment(payment: Payment, db: Transaction): Promise<void> {
  // Step 1: Write to DB and outbox in one transaction
  await db.transaction(async (trx) => {
    await trx('payments').insert({
      id: payment.id,
      order_id: payment.orderId,
      amount_cents: payment.amountCents,
      status: 'completed',
    });

    // Write to outbox — will be published by separate worker
    await trx('outbox_events').insert({
      id: crypto.randomUUID(),
      topic: 'payments.payment.completed',
      partition_key: payment.orderId,
      payload: JSON.stringify({ paymentId: payment.id, orderId: payment.orderId }),
      created_at: new Date(),
      published_at: null,  // Null = not yet published
    });
  });
}

// Separate outbox worker — runs every 100ms
async function outboxWorker(): Promise<void> {
  while (true) {
    const events = await db('outbox_events')
      .where('published_at', null)
      .orderBy('created_at')
      .limit(100)
      .forUpdate()   // Lock rows to prevent duplicate publishing by concurrent workers
      .skipLocked(); // Skip rows locked by other workers

    if (events.length === 0) {
      await sleep(100);
      continue;
    }

    for (const event of events) {
      await producer.send({
        topic: event.topic,
        messages: [{ key: event.partition_key, value: event.payload }],
      });

      await db('outbox_events').where('id', event.id).update({
        published_at: new Date(),
      });
    }
  }
}

The DB and Kafka are consistent: if the app crashes between publishing and marking as published, the outbox worker republishes on restart. Consumers must be idempotent (deduplicate by event ID) since delivery is at-least-once.

Consumer Groups

Create a consumer group for the notifications service.
It subscribes to orders.order.completed and sends a confirmation email.
Handle failures gracefully — failed messages should go to a dead letter queue after 3 retries.
const consumer = kafka.consumer({
  groupId: 'notifications-service-orders-consumer',
  retry: {
    initialRetryTime: 1000,
    retries: 3,
  },
});

const dlqProducer = kafka.producer();

async function startConsumer(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({
    topics: ['orders.order.completed'],
    fromBeginning: false,
  });

  await consumer.run({
    // Process one batch at a time — don't commit until all messages in batch succeed
    eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
      for (const message of batch.messages) {
        try {
          const event = await registry.decode(message.value!);
          await processWithRetry(event, message);
          
          resolveOffset(message.offset);
          await heartbeat(); // Keep consumer alive during long processing
          
        } catch (error) {
          // Max retries exceeded — send to DLQ
          await dlqProducer.send({
            topic: `orders.order.completed.dlq`,
            messages: [{
              key: message.key,
              value: message.value,
              headers: {
                ...message.headers,
                'dlq-original-topic': 'orders.order.completed',
                'dlq-original-partition': batch.partition.toString(),
                'dlq-original-offset': message.offset,
                'dlq-error': (error as Error).message,
                'dlq-timestamp': Date.now().toString(),
              },
            }],
          });
          
          // Resolve offset so we don't reprocess — message is in DLQ
          resolveOffset(message.offset);
        }
      }
      
      await commitOffsetsIfNecessary();
    },
  });
}

Processing the Dead Letter Queue

Build a DLQ viewer and reprocessor.
Operations team needs to see failed messages, fix the issue, then replay them.
// DLQ admin API
app.get('/admin/dlq/:topic', async (req, res) => {
  const { topic } = req.params;
  const dlqTopic = `${topic}.dlq`;
  
  // Read from DLQ without consuming (use admin to seek to timestamp)
  const admin = kafka.admin();
  const offsets = await admin.fetchTopicOffsets(dlqTopic);
  const messages = await readMessagesFromTopic(dlqTopic, offsets);
  
  res.json(messages.map(msg => ({
    offset: msg.offset,
    key: msg.key?.toString(),
    error: msg.headers?.['dlq-error']?.toString(),
    originalOffset: msg.headers?.['dlq-original-offset']?.toString(),
    timestamp: new Date(Number(msg.headers?.['dlq-timestamp'])).toISOString(),
    payload: JSON.parse(msg.value?.toString() ?? '{}'),
  })));
});

app.post('/admin/dlq/:topic/replay', async (req, res) => {
  const { topic } = req.params;
  const { offsets } = req.body; // Specific offsets to replay
  
  // Re-publish messages to original topic
  const dlqMessages = await readDlqMessages(topic, offsets);
  await producer.send({
    topic,
    messages: dlqMessages.map(msg => ({
      key: msg.key,
      value: msg.value,
      headers: { ...msg.headers, 'replayed': 'true', 'replayed-at': Date.now().toString() },
    })),
  });
  
  res.json({ replayed: dlqMessages.length });
});

RabbitMQ Patterns

This service uses RabbitMQ (not Kafka). 
Set up a work queue for sending emails — multiple workers, 
one email per worker, retry on failure.
import amqp from 'amqplib';

const QUEUE = 'email_queue';
const DLX = 'email_dlx';      // Dead letter exchange
const DLQ = 'email_dlq';      // Dead letter queue

async function setupRabbitMQ(): Promise<amqp.Channel> {
  const connection = await amqp.connect(process.env.RABBITMQ_URL ?? 'amqp://localhost');
  const channel = await connection.createChannel();
  
  // Set up dead letter exchange first
  await channel.assertExchange(DLX, 'direct', { durable: true });
  await channel.assertQueue(DLQ, { durable: true });
  await channel.bindQueue(DLQ, DLX, QUEUE);
  
  // Main queue — messages go to DLX after 3 redeliveries
  await channel.assertQueue(QUEUE, {
    durable: true,
    arguments: {
      'x-dead-letter-exchange': DLX,
      'x-dead-letter-routing-key': QUEUE,
      'x-message-ttl': 86400000,  // 24h TTL
    },
  });
  
  // One message per worker at a time — fair dispatch
  await channel.prefetch(1);
  
  return channel;
}

// Publisher
async function queueEmail(emailData: EmailJob): Promise<void> {
  const channel = await setupRabbitMQ();
  
  channel.sendToQueue(
    QUEUE,
    Buffer.from(JSON.stringify(emailData)),
    {
      persistent: true,           // Survive broker restart
      messageId: crypto.randomUUID(),
      timestamp: Date.now(),
      contentType: 'application/json',
      headers: { 'retry-count': 0 },
    },
  );
}

// Consumer
async function startEmailWorker(): Promise<void> {
  const channel = await setupRabbitMQ();
  
  console.log('Email worker waiting for messages...');
  
  channel.consume(QUEUE, async (msg) => {
    if (!msg) return;
    
    const retryCount = (msg.properties.headers['retry-count'] as number) ?? 0;
    
    try {
      const emailJob = JSON.parse(msg.content.toString()) as EmailJob;
      await sendEmail(emailJob);
      channel.ack(msg);
      
    } catch (error) {
      if (retryCount < 3) {
        // Requeue with incremented retry count
        channel.nack(msg, false, false); // false, false = don't requeue → goes to DLX
        
        // Re-publish with delay using TTL queue trick
        await queueEmailWithDelay({ 
          ...JSON.parse(msg.content.toString()),
          retryCount: retryCount + 1,
        }, retryCount * 5000);
        
      } else {
        // Max retries — send to DLQ for manual inspection
        channel.nack(msg, false, false);
        console.error('Message sent to DLQ after max retries', { messageId: msg.properties.messageId });
      }
    }
  });
}

Monitoring Consumer Lag

Consumer lag is spiking during traffic peaks.
Add monitoring so we alert when lag exceeds 10,000 messages.
async function monitorConsumerLag(): Promise<void> {
  const admin = kafka.admin();
  
  setInterval(async () => {
    const groups = await admin.listGroups();
    
    for (const group of groups.groups) {
      const offsets = await admin.fetchOffsets({ groupId: group.groupId });
      const topicOffsets = await admin.fetchTopicOffsets(/* relevant topic */);
      
      let totalLag = 0;
      for (const partition of offsets[0].partitions) {
        const endOffset = topicOffsets.find(t => t.partition === partition.partition);
        const lag = Number(endOffset?.offset ?? 0) - Number(partition.offset);
        totalLag += lag;
      }
      
      // Export to Prometheus
      consumerLagGauge.set(
        { group_id: group.groupId },
        totalLag,
      );
      
      if (totalLag > 10_000) {
        await alerting.send({
          severity: 'warning',
          message: `Consumer lag ${totalLag} for group ${group.groupId}`,
        });
      }
    }
  }, 30_000); // Check every 30 seconds
}

High consumer lag usually means: consumers are too slow (horizontal scale), messages got too large (batching issue), or there’s a processing bottleneck downstream (like a database query that’s suddenly slow). Claude Code diagnoses which by comparing message rate vs. consumer throughput metrics.

Choosing Between Kafka and RabbitMQ

Should this system use Kafka or RabbitMQ?
We need: background jobs, email sending, and an audit log for all state changes.

Claude Code’s recommendation pattern:

Use RabbitMQ when: you need traditional message queue semantics — tasks that should be processed once, routing with exchanges/bindings, and no need to replay or reread events. Email sending, background jobs, webhook delivery.

Use Kafka when: you need an event log — replayable history, fan-out to many consumers, time-travel queries, event sourcing, analytics pipelines alongside operational consumers. Audit logs, event streaming between microservices, change data capture.

Both: Kafka for the audit log and state-change events. RabbitMQ for email sending and jobs (or use Kafka for everything and keep it simple — Kafka can do both).

For the overall event-driven architecture connecting multiple services, see the microservices guide. For observability into message processing — tracking consumer lag, dead letter rates, and processing time — the observability guide covers Prometheus metrics setup. The Claude Skills 360 bundle includes messaging patterns skills for Kafka, RabbitMQ, and AWS SQS/SNS. Start with the free tier to try message queue pattern generation on your project.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free