Kafka is a distributed commit log used as an event bus, message queue, and stream processing platform. Getting it right requires understanding consumer group rebalancing, offset management, exactly-once semantics, and partition assignment. Kafka Streams enables stateful stream processing — windowed aggregations, joins, and enrichment — without a separate cluster. Claude Code generates consumer implementations, Kafka Streams topologies, Schema Registry integration, dead letter queue patterns, and the monitoring setup for consumer lag alerting.
CLAUDE.md for Kafka Projects
## Kafka Stack
- Broker: Confluent Platform 7.x or Apache Kafka 3.x
- Client: confluent-kafka-python (librdkafka binding) or kafka-python for simpler cases
- Schema: Confluent Schema Registry with Avro (mandatory for production topics)
- Streams: Kafka Streams (Java) or Faust (Python) for stateful processing
- Serialization: Avro (structured), JSON-schema (web-facing), Protobuf (gRPC bridge)
- Exactly-once: enable.idempotence=true + transactional.id for producers
- Consumer: auto.offset.reset=earliest, enable.auto.commit=false (manual commit after processing)
- DLQ: every consumer writes failed messages to {topic}-dlq
- Observability: consumer_lag metric via kafka-lag-exporter → Prometheus
Consumer with Manual Offset Management
# consumers/order_consumer.py
from confluent_kafka import Consumer, KafkaException, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import logging
import signal
import sys
logger = logging.getLogger(__name__)
class OrderConsumer:
def __init__(self, config: dict, schema_registry_url: str):
self.consumer = Consumer({
'bootstrap.servers': config['bootstrap_servers'],
'group.id': 'order-processor-v2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # Manual commit for at-least-once
'max.poll.interval.ms': 300_000, # 5 minutes — time to process a batch
'session.timeout.ms': 30_000,
'heartbeat.interval.ms': 10_000,
})
# Schema Registry for Avro deserialization
sr_client = SchemaRegistryClient({'url': schema_registry_url})
self.deserializer = AvroDeserializer(sr_client)
self._running = True
signal.signal(signal.SIGTERM, self._handle_shutdown)
signal.signal(signal.SIGINT, self._handle_shutdown)
def _handle_shutdown(self, *_):
logger.info("Shutdown signal received")
self._running = False
def process_batch(self, messages: list) -> list:
"""Process a batch of messages. Returns list of successfully processed message offsets."""
processed = []
for msg in messages:
try:
order = self.deserializer(msg.value(), None)
self._process_order(order)
processed.append(msg)
except Exception as e:
logger.error(f"Failed to process message {msg.offset()}: {e}")
self._send_to_dlq(msg, str(e))
# Still mark as processed — DLQ handles retry
processed.append(msg)
return processed
def _process_order(self, order: dict) -> None:
"""Business logic: idempotent by order_id."""
# Check if already processed (idempotency key)
if db.order_processed(order['order_id']):
return
db.process_order(order)
logger.info(f"Processed order {order['order_id']}")
def _send_to_dlq(self, msg, error_reason: str) -> None:
"""Send failed message to dead letter queue."""
from .dlq_producer import dlq_producer
dlq_producer.send({
'original_topic': msg.topic(),
'original_partition': msg.partition(),
'original_offset': msg.offset(),
'error': error_reason,
'payload': msg.value(),
'failed_at': datetime.utcnow().isoformat(),
})
def run(self, topics: list[str]) -> None:
self.consumer.subscribe(topics, on_assign=self._on_assign)
try:
while self._running:
# Poll for up to 100 messages with 1s timeout
messages = self.consumer.consume(num_messages=100, timeout=1.0)
if not messages:
continue
# Filter errors
valid = []
for msg in messages:
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue # End of partition — normal
raise KafkaException(msg.error())
valid.append(msg)
if not valid:
continue
# Process batch
processed = self.process_batch(valid)
# Commit only after successful processing (at-least-once)
if processed:
self.consumer.commit(asynchronous=False)
finally:
logger.info("Closing consumer")
self.consumer.close()
def _on_assign(self, consumer, partitions):
"""Called when partitions are assigned — log for observability."""
logger.info(f"Partitions assigned: {[p.partition for p in partitions]}")
Exactly-Once Producer with Transactions
# producers/transactional_producer.py
from confluent_kafka import Producer, KafkaException
class TransactionalOrderProducer:
def __init__(self, config: dict):
self.producer = Producer({
'bootstrap.servers': config['bootstrap_servers'],
'enable.idempotence': True, # Exactly-once delivery (dedup)
'transactional.id': 'order-producer-01', # Must be unique per producer instance
'acks': 'all', # Wait for all ISR replicas
'retries': 2147483647, # Retry indefinitely on transient errors
'max.in.flight.requests.per.connection': 5, # Safe with idempotence
})
self.producer.init_transactions()
def send_order_events(self, order_id: str, events: list[dict]) -> None:
"""Send multiple events atomically — all or nothing."""
try:
self.producer.begin_transaction()
for event in events:
self.producer.produce(
topic=f"orders.{event['type']}",
key=order_id.encode(),
value=serialize_avro(event),
on_delivery=self._delivery_report,
)
self.producer.flush() # Wait for all produce() calls to complete
self.producer.commit_transaction()
except KafkaException as e:
self.producer.abort_transaction()
raise RuntimeError(f"Transaction aborted: {e}") from e
def _delivery_report(self, err, msg) -> None:
if err:
logger.error(f"Message delivery failed: {err}")
else:
logger.debug(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")
Kafka Streams Topology (Java)
// streams/OrderEnrichmentTopology.java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
public class OrderEnrichmentTopology {
public static Topology build(StreamsConfig config) {
StreamsBuilder builder = new StreamsBuilder();
// Source stream: raw order events
KStream<String, Order> orders = builder.stream(
"orders.created",
Consumed.with(Serdes.String(), OrderSerde.serde())
);
// KTable: customer profiles (changelog topic → materialized state)
KTable<String, Customer> customers = builder.table(
"customers.profiles",
Consumed.with(Serdes.String(), CustomerSerde.serde()),
Materialized.as("customers-store")
);
// Enrich orders with customer data via stream-table join
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer),
Joined.with(Serdes.String(), OrderSerde.serde(), CustomerSerde.serde())
);
// Windowed aggregation: revenue per customer per hour
enriched
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
RevenueAccumulator::new,
(customerId, order, acc) -> acc.add(order.getAmountCents()),
Materialized.as("hourly-revenue-store")
)
.toStream()
.to("orders.hourly-revenue", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 3600000L), LongSerde.serde()));
// Branch: route by order value
Map<String, KStream<String, EnrichedOrder>> branches = enriched.split(Named.as("order-router-"))
.branch((key, order) -> order.getAmountCents() > 100_000, Branched.as("high-value"))
.branch((key, order) -> order.getAmountCents() <= 100_000, Branched.as("standard"))
.noDefaultBranch();
branches.get("order-router-high-value")
.to("orders.high-value", Produced.with(Serdes.String(), EnrichedOrderSerde.serde()));
branches.get("order-router-standard")
.to("orders.standard", Produced.with(Serdes.String(), EnrichedOrderSerde.serde()));
return builder.build();
}
}
Schema Registry + Avro
# schemas/order_created.avsc
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.myapp.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "amount_cents", "type": "long"},
{"name": "status", "type": {"type": "enum", "name": "OrderStatus",
"symbols": ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"]}},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "items", "type": {"type": "array", "items": {
"type": "record", "name": "OrderItem",
"fields": [
{"name": "product_id", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "unit_price_cents", "type": "long"}
]
}}}
]
}
# Register schema and serialize
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer
sr_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
with open('schemas/order_created.avsc') as f:
schema_str = f.read()
serializer = AvroSerializer(sr_client, schema_str,
conf={'auto.register.schemas': False}) # Only register in dev/CI
For the Kafka Streams stateful processing that complements this consumer setup, the Apache Kafka Streams guide covers stateful topologies and stream-table joins in depth. For the Flink alternative for more complex streaming pipelines, see the Apache Flink guide. The Claude Skills 360 bundle includes Kafka skill sets covering consumer groups, exactly-once semantics, and Streams topologies. Start with the free tier to try Kafka consumer generation.