Redpanda is Kafka-compatible event streaming without ZooKeeper — lower latency, single binary, no JVM. KafkaJS connects identically: new Kafka({ clientId: "my-app", brokers: ["localhost:9092"] }). Redpanda Cloud: brokers: [process.env.REDPANDA_BROKERS!], ssl: true, sasl: { mechanism: "scram-sha-256", username, password }. Schema Registry URL: REDPANDA_SCHEMA_REGISTRY_URL — same API as Confluent Schema Registry. rpk topic create orders --partitions 6 --replicas 1 creates a topic. rpk topic consume orders --offset start reads from the beginning. rpk group list lists consumer groups; rpk group describe my-group shows lag. Redpanda Connect (formerly Benthos): input: kafka_franz, pipeline: processors, output: kafka_franz or output: http_client — pipelines configured in YAML, no code needed for transforms. Redpanda Wasm transforms: rpk transform build && rpk transform deploy --input-topic=raw-events --output-topic=clean-events runs a Wasm function inline in the broker. GET /v1/brokers and GET /v1/topics from the Redpanda Admin API. Redpanda Console: web UI for topics, consumer groups, schema registry, and connect — ships in redpandadata/console Docker image. Self-hosted Docker Compose: redpandadata/redpanda:latest with --mode dev-container. Claude Code generates Redpanda KafkaJS producers/consumers, Schema Registry integrations, Redpanda Connect pipelines, and rpk automation scripts.
CLAUDE.md for Redpanda
## Redpanda Stack
- Client: kafkajs >= 2.x (100% Kafka-compatible — no Redpanda-specific lib needed)
- Local: docker run redpandadata/redpanda:latest --mode dev-container --kafka-addr localhost:9092
- Cloud: brokers from Redpanda Cloud console + ssl: true + sasl: { mechanism: "scram-sha-256" }
- Schema Registry: same API as Confluent — REDPANDA_SCHEMA_REGISTRY_URL env var
- Admin API: GET /v1/topics, GET /v1/brokers (port 9644 default on self-hosted)
- rpk: rpk topic create/consume/produce, rpk group list/describe — standard CLI for ops
Redpanda Client (KafkaJS)
// lib/redpanda/client.ts — KafkaJS client for Redpanda
import { Kafka, type KafkaConfig, logLevel } from "kafkajs"
function buildConfig(): KafkaConfig {
const brokers = (process.env.REDPANDA_BROKERS ?? "localhost:9092").split(",")
// Redpanda Cloud requires SSL + SASL
if (process.env.REDPANDA_SASL_USERNAME) {
return {
clientId: process.env.SERVICE_NAME ?? "my-app",
brokers,
ssl: true,
sasl: {
mechanism: "scram-sha-256",
username: process.env.REDPANDA_SASL_USERNAME!,
password: process.env.REDPANDA_SASL_PASSWORD!,
},
logLevel: logLevel.WARN,
}
}
// Self-hosted (no auth)
return {
clientId: process.env.SERVICE_NAME ?? "my-app",
brokers,
logLevel: logLevel.WARN,
}
}
export const redpanda = new Kafka(buildConfig())
export const admin = redpanda.admin()
Topic Management
// lib/redpanda/topics.ts — topic creation and admin operations
import { admin } from "./client"
export type TopicSpec = {
name: string
numPartitions?: number
replicationFactor?: number
retentionMs?: number
cleanupPolicy?: "delete" | "compact" | "compact,delete"
}
export async function ensureTopics(topics: TopicSpec[]): Promise<void> {
await admin.connect()
try {
const existing = new Set(await admin.listTopics())
const toCreate = topics.filter((t) => !existing.has(t.name))
if (!toCreate.length) return
await admin.createTopics({
topics: toCreate.map((t) => ({
topic: t.name,
numPartitions: t.numPartitions ?? 3,
replicationFactor: t.replicationFactor ?? 1,
configEntries: [
...(t.retentionMs ? [{ name: "retention.ms", value: String(t.retentionMs) }] : []),
...(t.cleanupPolicy ? [{ name: "cleanup.policy", value: t.cleanupPolicy }] : []),
],
})),
})
console.log(`[Redpanda] Created topics: ${toCreate.map((t) => t.name).join(", ")}`)
} finally {
await admin.disconnect()
}
}
export async function getConsumerGroupLag(groupId: string): Promise<
Array<{ topic: string; partition: number; lag: number }>
> {
await admin.connect()
try {
const offsets = await admin.fetchOffsets({ groupId, resolveOffsets: true })
const lags: Array<{ topic: string; partition: number; lag: number }> = []
for (const t of offsets) {
const topicOffsets = await admin.fetchTopicOffsets(t.topic)
for (const p of t.partitions) {
const latest = topicOffsets.find((o) => o.partition === p.partition)
if (latest) {
lags.push({
topic: t.topic,
partition: p.partition,
lag: parseInt(latest.offset) - parseInt(p.offset),
})
}
}
}
return lags
} finally {
await admin.disconnect()
}
}
Typed Producer and Consumer
// lib/redpanda/events.ts — typed event bus on Redpanda
import { redpanda } from "./client"
import type { Producer, Consumer, EachMessagePayload } from "kafkajs"
import { randomUUID } from "crypto"
// ── Event types ────────────────────────────────────────────────────────────
export type BaseEvent<T extends string, P> = {
eventId: string
eventType: T
occurredAt: string
payload: P
}
export type OrderPlaced = BaseEvent<"OrderPlaced", {
orderId: string
userId: string
amount: number
currency: string
items: Array<{ productId: string; quantity: number; price: number }>
}>
export type UserSignedUp = BaseEvent<"UserSignedUp", {
userId: string
email: string
plan: "free" | "pro" | "enterprise"
}>
export type AppEvent = OrderPlaced | UserSignedUp
// ── Topic routing ──────────────────────────────────────────────────────────
const TOPIC_MAP: Record<AppEvent["eventType"], string> = {
OrderPlaced: "orders",
UserSignedUp: "users",
}
// ── Producer ───────────────────────────────────────────────────────────────
let producer: Producer | null = null
async function getProducer(): Promise<Producer> {
if (!producer) {
producer = redpanda.producer({ idempotent: true })
await producer.connect()
process.once("SIGTERM", () => producer?.disconnect().catch(console.error))
process.once("SIGINT", () => producer?.disconnect().catch(console.error))
}
return producer
}
export async function publishEvent<E extends AppEvent>(
event: Omit<E, "eventId" | "occurredAt">,
): Promise<void> {
const p = await getProducer()
const full = { ...event, eventId: randomUUID(), occurredAt: new Date().toISOString() }
const topic = TOPIC_MAP[event.eventType]
await p.send({
topic,
messages: [{
key: `${event.eventType}-${randomUUID()}`,
value: JSON.stringify(full),
headers: { eventType: event.eventType, contentType: "application/json" },
}],
})
}
// ── Consumer ───────────────────────────────────────────────────────────────
export type EventHandler<E extends AppEvent> = (event: E) => Promise<void>
export async function startEventConsumer(
groupId: string,
topics: string[],
handlers: Partial<{ [K in AppEvent["eventType"]]: EventHandler<Extract<AppEvent, { eventType: K }>> }>,
): Promise<() => Promise<void>> {
const consumer: Consumer = redpanda.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topics, fromBeginning: false })
await consumer.run({
autoCommit: true,
eachMessage: async ({ message }: EachMessagePayload) => {
const raw = message.value?.toString()
if (!raw) return
const event = JSON.parse(raw) as AppEvent
const handler = handlers[event.eventType] as EventHandler<AppEvent> | undefined
if (handler) {
await handler(event).catch((err) =>
console.error(`[Redpanda] Handler error for ${event.eventType}:`, err.message),
)
}
},
})
const stop = async () => {
await consumer.stop()
await consumer.disconnect()
}
return stop
}
Redpanda Connect Pipeline
# redpanda-connect.yaml — no-code data pipeline (Benthos-compatible)
# Reads from Redpanda, transforms, and writes to HTTP endpoint
input:
kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topics: ["raw-events"]
consumer_group: connect-pipeline
tls:
enabled: true
sasl:
- mechanism: SCRAM-SHA-256
username: "${REDPANDA_SASL_USERNAME}"
password: "${REDPANDA_SASL_PASSWORD}"
pipeline:
processors:
- mapping: |
root = this
root.processedAt = now()
root.payload.amount = root.payload.amount.number() * 100 # cents
- catch:
- log:
level: ERROR
message: 'Processing error: ${! error() }'
output:
fallback:
- kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: processed-events
tls: { enabled: true }
sasl:
- mechanism: SCRAM-SHA-256
username: "${REDPANDA_SASL_USERNAME}"
password: "${REDPANDA_SASL_PASSWORD}"
- kafka_franz:
seed_brokers: ["${REDPANDA_BROKERS}"]
topic: dead-letter-queue
tls: { enabled: true }
sasl:
- mechanism: SCRAM-SHA-256
username: "${REDPANDA_SASL_USERNAME}"
password: "${REDPANDA_SASL_PASSWORD}"
Docker Compose
# docker-compose.yml — Redpanda self-hosted dev setup
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v23.3.21
command:
- redpanda
- start
- --mode dev-container
- --smp 1
- --memory 512M
- --reserve-memory 0M
- --node-id 0
- --kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
- --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
- --schema-registry-addr 0.0.0.0:8081
- --pandaproxy-addr 0.0.0.0:8082
- --rpc-addr 0.0.0.0:33145
- --advertise-rpc-addr redpanda:33145
ports:
- "9092:9092" # Kafka API (external)
- "8081:8081" # Schema Registry
- "8082:8082" # Pandaproxy (HTTP API)
- "9644:9644" # Admin API
console:
image: docker.redpanda.com/redpandadata/console:v2.5.2
environment:
CONFIG_FILEPATH: /tmp/config.yml
volumes:
- ./redpanda-console-config.yaml:/tmp/config.yml:ro
ports:
- "8080:8080"
depends_on: [redpanda]
For the Apache Kafka alternative when needing the full Kafka ecosystem with the broadest tooling support (Kafka Streams, Kafka Connect ecosystem, ksqlDB, MirrorMaker 2, extensive managed options like Confluent Cloud and MSK) — Kafka is the industry standard with maximum operator familiarity while Redpanda is a drop-in replacement that eliminates ZooKeeper/KRaft complexity, achieves lower tail latency, and runs as a single binary ideal for smaller teams or self-hosting. For the NATS JetStream alternative when needing a lighter-weight cloud-native messaging system with sub-millisecond latency for request-reply patterns, a smaller operational footprint, and JetStream for optional persistence — NATS is excellent for service mesh RPC and fan-out while Redpanda excels at durable high-throughput event streaming with consumer group semantics. The Claude Skills 360 bundle includes Redpanda skill sets covering KafkaJS integration, topic management, and Redpanda Connect pipelines. Start with the free tier to try event streaming generation.