Claude Code for Apache Kafka: Consumer Groups, Exactly-Once, and Stream Processing — Claude Skills 360 Blog
Blog / Data / Claude Code for Apache Kafka: Consumer Groups, Exactly-Once, and Stream Processing
Data

Claude Code for Apache Kafka: Consumer Groups, Exactly-Once, and Stream Processing

Published: November 27, 2026
Read time: 10 min read
By: Claude Skills 360

Kafka is a distributed commit log used as an event bus, message queue, and stream processing platform. Getting it right requires understanding consumer group rebalancing, offset management, exactly-once semantics, and partition assignment. Kafka Streams enables stateful stream processing — windowed aggregations, joins, and enrichment — without a separate cluster. Claude Code generates consumer implementations, Kafka Streams topologies, Schema Registry integration, dead letter queue patterns, and the monitoring setup for consumer lag alerting.

CLAUDE.md for Kafka Projects

## Kafka Stack
- Broker: Confluent Platform 7.x or Apache Kafka 3.x
- Client: confluent-kafka-python (librdkafka binding) or kafka-python for simpler cases
- Schema: Confluent Schema Registry with Avro (mandatory for production topics)
- Streams: Kafka Streams (Java) or Faust (Python) for stateful processing
- Serialization: Avro (structured), JSON-schema (web-facing), Protobuf (gRPC bridge)
- Exactly-once: enable.idempotence=true + transactional.id for producers
- Consumer: auto.offset.reset=earliest, enable.auto.commit=false (manual commit after processing)
- DLQ: every consumer writes failed messages to {topic}-dlq
- Observability: consumer_lag metric via kafka-lag-exporter → Prometheus

Consumer with Manual Offset Management

# consumers/order_consumer.py
from confluent_kafka import Consumer, KafkaException, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import logging
import signal
import sys

logger = logging.getLogger(__name__)

class OrderConsumer:
    def __init__(self, config: dict, schema_registry_url: str):
        self.consumer = Consumer({
            'bootstrap.servers': config['bootstrap_servers'],
            'group.id': 'order-processor-v2',
            'auto.offset.reset': 'earliest',
            'enable.auto.commit': False,         # Manual commit for at-least-once
            'max.poll.interval.ms': 300_000,     # 5 minutes — time to process a batch
            'session.timeout.ms': 30_000,
            'heartbeat.interval.ms': 10_000,
        })
        
        # Schema Registry for Avro deserialization
        sr_client = SchemaRegistryClient({'url': schema_registry_url})
        self.deserializer = AvroDeserializer(sr_client)
        
        self._running = True
        signal.signal(signal.SIGTERM, self._handle_shutdown)
        signal.signal(signal.SIGINT, self._handle_shutdown)
    
    def _handle_shutdown(self, *_):
        logger.info("Shutdown signal received")
        self._running = False
    
    def process_batch(self, messages: list) -> list:
        """Process a batch of messages. Returns list of successfully processed message offsets."""
        processed = []
        
        for msg in messages:
            try:
                order = self.deserializer(msg.value(), None)
                self._process_order(order)
                processed.append(msg)
            except Exception as e:
                logger.error(f"Failed to process message {msg.offset()}: {e}")
                self._send_to_dlq(msg, str(e))
                # Still mark as processed — DLQ handles retry
                processed.append(msg)
        
        return processed
    
    def _process_order(self, order: dict) -> None:
        """Business logic: idempotent by order_id."""
        # Check if already processed (idempotency key)
        if db.order_processed(order['order_id']):
            return
        
        db.process_order(order)
        logger.info(f"Processed order {order['order_id']}")
    
    def _send_to_dlq(self, msg, error_reason: str) -> None:
        """Send failed message to dead letter queue."""
        from .dlq_producer import dlq_producer
        dlq_producer.send({
            'original_topic': msg.topic(),
            'original_partition': msg.partition(),
            'original_offset': msg.offset(),
            'error': error_reason,
            'payload': msg.value(),
            'failed_at': datetime.utcnow().isoformat(),
        })
    
    def run(self, topics: list[str]) -> None:
        self.consumer.subscribe(topics, on_assign=self._on_assign)
        
        try:
            while self._running:
                # Poll for up to 100 messages with 1s timeout
                messages = self.consumer.consume(num_messages=100, timeout=1.0)
                
                if not messages:
                    continue
                
                # Filter errors
                valid = []
                for msg in messages:
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            continue  # End of partition — normal
                        raise KafkaException(msg.error())
                    valid.append(msg)
                
                if not valid:
                    continue
                
                # Process batch
                processed = self.process_batch(valid)
                
                # Commit only after successful processing (at-least-once)
                if processed:
                    self.consumer.commit(asynchronous=False)
                    
        finally:
            logger.info("Closing consumer")
            self.consumer.close()
    
    def _on_assign(self, consumer, partitions):
        """Called when partitions are assigned — log for observability."""
        logger.info(f"Partitions assigned: {[p.partition for p in partitions]}")

Exactly-Once Producer with Transactions

# producers/transactional_producer.py
from confluent_kafka import Producer, KafkaException

class TransactionalOrderProducer:
    def __init__(self, config: dict):
        self.producer = Producer({
            'bootstrap.servers': config['bootstrap_servers'],
            'enable.idempotence': True,           # Exactly-once delivery (dedup)
            'transactional.id': 'order-producer-01',  # Must be unique per producer instance
            'acks': 'all',                         # Wait for all ISR replicas
            'retries': 2147483647,                 # Retry indefinitely on transient errors
            'max.in.flight.requests.per.connection': 5,  # Safe with idempotence
        })
        self.producer.init_transactions()
    
    def send_order_events(self, order_id: str, events: list[dict]) -> None:
        """Send multiple events atomically — all or nothing."""
        try:
            self.producer.begin_transaction()
            
            for event in events:
                self.producer.produce(
                    topic=f"orders.{event['type']}",
                    key=order_id.encode(),
                    value=serialize_avro(event),
                    on_delivery=self._delivery_report,
                )
            
            self.producer.flush()  # Wait for all produce() calls to complete
            self.producer.commit_transaction()
        except KafkaException as e:
            self.producer.abort_transaction()
            raise RuntimeError(f"Transaction aborted: {e}") from e
    
    def _delivery_report(self, err, msg) -> None:
        if err:
            logger.error(f"Message delivery failed: {err}")
        else:
            logger.debug(f"Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

Kafka Streams Topology (Java)

// streams/OrderEnrichmentTopology.java
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;

public class OrderEnrichmentTopology {
    
    public static Topology build(StreamsConfig config) {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Source stream: raw order events
        KStream<String, Order> orders = builder.stream(
            "orders.created",
            Consumed.with(Serdes.String(), OrderSerde.serde())
        );
        
        // KTable: customer profiles (changelog topic → materialized state)
        KTable<String, Customer> customers = builder.table(
            "customers.profiles",
            Consumed.with(Serdes.String(), CustomerSerde.serde()),
            Materialized.as("customers-store")
        );
        
        // Enrich orders with customer data via stream-table join
        KStream<String, EnrichedOrder> enriched = orders.join(
            customers,
            (order, customer) -> new EnrichedOrder(order, customer),
            Joined.with(Serdes.String(), OrderSerde.serde(), CustomerSerde.serde())
        );
        
        // Windowed aggregation: revenue per customer per hour
        enriched
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
            .aggregate(
                RevenueAccumulator::new,
                (customerId, order, acc) -> acc.add(order.getAmountCents()),
                Materialized.as("hourly-revenue-store")
            )
            .toStream()
            .to("orders.hourly-revenue", Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 3600000L), LongSerde.serde()));
        
        // Branch: route by order value
        Map<String, KStream<String, EnrichedOrder>> branches = enriched.split(Named.as("order-router-"))
            .branch((key, order) -> order.getAmountCents() > 100_000, Branched.as("high-value"))
            .branch((key, order) -> order.getAmountCents() <= 100_000, Branched.as("standard"))
            .noDefaultBranch();
        
        branches.get("order-router-high-value")
            .to("orders.high-value", Produced.with(Serdes.String(), EnrichedOrderSerde.serde()));
        
        branches.get("order-router-standard")
            .to("orders.standard", Produced.with(Serdes.String(), EnrichedOrderSerde.serde()));
        
        return builder.build();
    }
}

Schema Registry + Avro

# schemas/order_created.avsc
{
  "type": "record",
  "name": "OrderCreated",
  "namespace": "com.myapp.orders",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "amount_cents", "type": "long"},
    {"name": "status", "type": {"type": "enum", "name": "OrderStatus",
        "symbols": ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED"]}},
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {"name": "items", "type": {"type": "array", "items": {
        "type": "record", "name": "OrderItem",
        "fields": [
            {"name": "product_id", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "unit_price_cents", "type": "long"}
        ]
    }}}
  ]
}
# Register schema and serialize
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
from confluent_kafka.schema_registry.avro import AvroSerializer

sr_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})

with open('schemas/order_created.avsc') as f:
    schema_str = f.read()

serializer = AvroSerializer(sr_client, schema_str,
    conf={'auto.register.schemas': False})  # Only register in dev/CI

For the Kafka Streams stateful processing that complements this consumer setup, the Apache Kafka Streams guide covers stateful topologies and stream-table joins in depth. For the Flink alternative for more complex streaming pipelines, see the Apache Flink guide. The Claude Skills 360 bundle includes Kafka skill sets covering consumer groups, exactly-once semantics, and Streams topologies. Start with the free tier to try Kafka consumer generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free