Apache Kafka is the backbone of event-driven architectures — new Kafka({ clientId, brokers }) from kafkajs initializes. kafka.producer({ idempotent: true, transactionalId: "tx-1" }) creates an idempotent producer. await producer.connect() then await producer.send({ topic, messages: [{ key, value: JSON.stringify(payload), headers }] }) publishes. Consumer: kafka.consumer({ groupId }), await consumer.connect(), await consumer.subscribe({ topics, fromBeginning: false }), await consumer.run({ eachMessage: async ({ topic, partition, message }) => { ... } }). consumer.seek({ topic, partition, offset: "earliest" }) rewinds. Manual commit: consumer.run({ autoCommit: false, eachBatch: async ({ batch, commitOffsetsIfNecessary }) => { ... ; await commitOffsetsIfNecessary() } }). Dead letter queue: catch errors in eachMessage, publish to ${topic}-dlq. Schema Registry: @confluentinc/schemaregistry with Avro serialization — const ser = new KafkaAvroSerializer(registry), await ser.serialize(schema, payload). SSL/SASL: ssl: true, sasl: { mechanism: "scram-sha-256", username, password }. Confluent Cloud: point brokers at BOOTSTRAP_SERVERS from Confluent dashboard. kafka.admin() for topic creation and metadata. Claude Code generates KafkaJS producers, consumers, Schema Registry integrations, and Kafka event streaming pipelines.
CLAUDE.md for Kafka (KafkaJS)
## Kafka Stack
- Version: kafkajs >= 2.x
- Init: const kafka = new Kafka({ clientId: "my-app", brokers: [process.env.KAFKA_BROKERS!.split(",")] })
- Producer: const producer = kafka.producer({ idempotent: true }); await producer.connect()
- Send: await producer.send({ topic: "orders", messages: [{ key: orderId, value: JSON.stringify(order) }] })
- Consumer: const consumer = kafka.consumer({ groupId: "order-processor" }); await consumer.connect(); await consumer.subscribe({ topics: ["orders"] })
- Run: await consumer.run({ eachMessage: async ({ message }) => { const order = JSON.parse(message.value!.toString()) } })
- Disconnect: await producer.disconnect(); await consumer.disconnect()
- DLQ: catch errors in eachMessage handler, publish to `${topic}-dlq`, commit original offset
Kafka Client Setup
// lib/kafka/client.ts — KafkaJS client configuration
import { Kafka, type KafkaConfig, type ProducerConfig, Partitioners, logLevel } from "kafkajs"
function buildKafkaConfig(): KafkaConfig {
const brokers = (process.env.KAFKA_BROKERS ?? "localhost:9092").split(",")
// Confluent Cloud or SSL/SASL
if (process.env.KAFKA_SASL_USERNAME) {
return {
clientId: process.env.KAFKA_CLIENT_ID ?? "my-app",
brokers,
ssl: true,
sasl: {
mechanism: "plain",
username: process.env.KAFKA_SASL_USERNAME!,
password: process.env.KAFKA_SASL_PASSWORD!,
},
logLevel: logLevel.WARN,
}
}
return {
clientId: process.env.KAFKA_CLIENT_ID ?? "my-app",
brokers,
logLevel: logLevel.WARN,
}
}
export const kafka = new Kafka(buildKafkaConfig())
export const adminClient = kafka.admin()
Typed Producer
// lib/kafka/producer.ts — type-safe Kafka producer with retry
import { kafka } from "./client"
import type { Producer, Message } from "kafkajs"
let producer: Producer | null = null
export async function getProducer(): Promise<Producer> {
if (!producer) {
producer = kafka.producer({
idempotent: true, // exactly-once semantics
allowAutoTopicCreation: false,
createPartitioner: (require("kafkajs")).Partitioners.DefaultPartitioner,
})
await producer.connect()
// Graceful shutdown
const shutdown = async () => {
await producer?.disconnect()
producer = null
}
process.once("SIGTERM", shutdown)
process.once("SIGINT", shutdown)
}
return producer
}
export type KafkaEvent<T> = {
eventType: string
eventId: string
occurredAt: string // ISO 8601
payload: T
metadata?: Record<string, string>
}
/** Publish a typed event */
export async function publishEvent<T>(
topic: string,
event: KafkaEvent<T>,
key?: string,
): Promise<void> {
const p = await getProducer()
await p.send({
topic,
messages: [{
key: key ?? event.eventId,
value: JSON.stringify(event),
headers: {
eventType: event.eventType,
contentType: "application/json",
...event.metadata,
},
}],
})
}
/** Publish a batch of events efficiently */
export async function publishBatch<T>(
topic: string,
events: Array<KafkaEvent<T>>,
): Promise<void> {
const p = await getProducer()
const messages: Message[] = events.map((e) => ({
key: e.eventId,
value: JSON.stringify(e),
headers: { eventType: e.eventType },
}))
await p.send({ topic, messages })
}
Consumer with DLQ
// lib/kafka/consumer.ts — Kafka consumer with dead letter queue
import { kafka } from "./client"
import { publishEvent } from "./producer"
import type { EachMessagePayload } from "kafkajs"
import { randomUUID } from "crypto"
export type MessageHandler<T> = (
event: T,
meta: { topic: string; partition: number; offset: string },
) => Promise<void>
export type ConsumerOptions = {
groupId: string
topics: string[]
fromBeginning?: boolean
maxRetries?: number
}
export async function startConsumer<T>(
options: ConsumerOptions,
handler: MessageHandler<T>,
): Promise<() => Promise<void>> {
const { groupId, topics, fromBeginning = false, maxRetries = 3 } = options
const consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topics, fromBeginning })
await consumer.run({
autoCommit: true,
eachMessage: async (payload: EachMessagePayload) => {
const { topic, partition, message } = payload
const raw = message.value?.toString()
if (!raw) return
let parsed: T
try {
parsed = JSON.parse(raw) as T
} catch {
console.error(`[Kafka] Parse error on ${topic}:`, raw.slice(0, 200))
return
}
// Retry loop
let lastError: Error | undefined
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
await handler(parsed, { topic, partition, offset: message.offset })
return
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err))
if (attempt < maxRetries - 1) {
await new Promise((r) => setTimeout(r, 100 * 2 ** attempt))
}
}
}
// Send to DLQ after exhausted retries
console.error(`[Kafka] DLQ: ${topic} after ${maxRetries} retries`, lastError?.message)
await publishEvent(`${topic}-dlq`, {
eventType: "DeadLetter",
eventId: randomUUID(),
occurredAt: new Date().toISOString(),
payload: { originalTopic: topic, originalPayload: parsed, error: lastError?.message },
})
},
})
const stop = async () => {
await consumer.stop()
await consumer.disconnect()
}
process.once("SIGTERM", stop)
process.once("SIGINT", stop)
return stop
}
Topic Management
// lib/kafka/admin.ts — create and configure topics
import { adminClient } from "./client"
export type TopicConfig = {
name: string
numPartitions?: number
replicationFactor?: number
retentionMs?: number
}
export async function ensureTopic(config: TopicConfig): Promise<void> {
await adminClient.connect()
const existing = await adminClient.listTopics()
if (existing.includes(config.name)) {
await adminClient.disconnect()
return
}
await adminClient.createTopics({
topics: [{
topic: config.name,
numPartitions: config.numPartitions ?? 3,
replicationFactor: config.replicationFactor ?? 1,
configEntries: config.retentionMs
? [{ name: "retention.ms", value: String(config.retentionMs) }]
: [],
}],
})
await adminClient.disconnect()
}
// Setup call at app start:
// await ensureTopic({ name: "orders", numPartitions: 6, retentionMs: 7 * 86400000 })
// await ensureTopic({ name: "orders-dlq", numPartitions: 1, retentionMs: 30 * 86400000 })
// await ensureTopic({ name: "user-events", numPartitions: 3, retentionMs: 90 * 86400000 })
For the Upstash Kafka alternative when needing a serverless, HTTP-based Kafka compatible API that works in edge runtimes and serverless functions without persistent TCP connections — Upstash Kafka wraps Apache Kafka with an HTTP SDK while KafkaJS requires TCP connections ideal for long-running Node.js services, see the Upstash guide. For the NATS alternative when needing a lighter-weight, cloud-native messaging system with JetStream for persistence, lower operational complexity than Kafka, and sub-millisecond latency for request-reply patterns — NATS is excellent for service mesh communication while Kafka is the standard for durable event streaming with strong ordering guarantees and replay, see the NATS guide. The Claude Skills 360 bundle includes Kafka skill sets covering producers, consumers, DLQ patterns, and Confluent Cloud. Start with the free tier to try event streaming generation.