Claude Code for Kafka Streams: Stream Processing, Stateful Joins, and Windowed Aggregations — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for Kafka Streams: Stream Processing, Stateful Joins, and Windowed Aggregations
Data Engineering

Claude Code for Kafka Streams: Stream Processing, Stateful Joins, and Windowed Aggregations

Published: October 6, 2026
Read time: 9 min read
By: Claude Skills 360

Kafka Streams is a Java library for building stateful stream processing applications that run inside your own processes — no separate cluster to manage beyond Kafka itself. It handles exactly-once processing, state stores with RocksDB, and time-based windowing. Claude Code implements Kafka Streams topologies, explains the KStream vs KTable distinction, and builds the monitoring for processing lag and state store sizes.

CLAUDE.md for Kafka Streams Projects

## Kafka Streams Stack
- Library: kafka-streams 3.7.x embedded in Spring Boot 3.x
- State store: RocksDB (local) + changelog topic (Kafka-backed durability)
- Serdes: Jackson JSON serde for domain objects, String for keys
- Processing guarantee: exactly_once_v2 (requires Kafka 2.5+)
- Commit interval: 100ms (default; lower = more CPU, lower latency)
- Standby replicas: 1 (for fast failover during rebalance)
- Interactive queries: enabled via REST API for state store lookups

Core Topology

// src/main/java/com/myapp/OrderStreamProcessor.java
@Configuration
public class OrderStreamProcessor {

    @Autowired
    private StreamsBuilder streamsBuilder;

    @Bean
    public KStream<String, Order> buildTopology() {
        // Input: raw orders with validation errors mixed in
        KStream<String, OrderEvent> rawOrders = streamsBuilder.stream(
            "order-events",
            Consumed.with(Serdes.String(), orderEventSerde())
        );

        // Branch: split valid vs invalid
        Map<String, KStream<String, OrderEvent>> branches = rawOrders.split(Named.as("branch-"))
            .branch((key, value) -> value.isValid(), Branched.as("valid"))
            .branch((key, value) -> !value.isValid(), Branched.as("invalid"))
            .noDefaultBranch();

        // Dead letter queue for invalid events
        branches.get("branch-invalid").to("order-events-dlq");

        KStream<String, OrderEvent> validOrders = branches.get("branch-valid");

        // Enrich with customer tier (from KTable)
        KTable<String, CustomerProfile> customerProfiles = streamsBuilder.table(
            "customer-profiles",
            Consumed.with(Serdes.String(), customerProfileSerde()),
            Materialized.as("customer-profiles-store")
        );

        // Stream-table join: enrich each order with customer profile
        KStream<String, EnrichedOrder> enrichedOrders = validOrders
            .selectKey((key, order) -> order.getCustomerId())  // Re-key by customerId
            .join(
                customerProfiles,
                (order, profile) -> new EnrichedOrder(order, profile),
                Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde())
            );

        // Write enriched orders to output topic
        enrichedOrders.to(
            "enriched-orders",
            Produced.with(Serdes.String(), enrichedOrderSerde())
        );

        return validOrders;
    }
}

Windowed Aggregations

// Count orders and sum revenue per customer per hour

KStream<String, EnrichedOrder> orders = streamsBuilder.stream("enriched-orders",
    Consumed.with(Serdes.String(), enrichedOrderSerde())
        .withTimestampExtractor(new OrderTimestampExtractor())  // Use event time, not wall clock
);

// Tumbling window: non-overlapping 1-hour buckets
TimeWindows hourlyWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1));

KTable<Windowed<String>, OrderHourlyStats> hourlyStats = orders
    .groupByKey()
    .windowedBy(hourlyWindow)
    .aggregate(
        OrderHourlyStats::empty,           // Initializer
        (customerId, order, stats) -> stats.add(order),  // Aggregator
        Materialized.<String, OrderHourlyStats, WindowStore<Bytes, byte[]>>as("hourly-order-stats")
            .withValueSerde(statsSerde())
            .withRetention(Duration.ofDays(7))  // How long to keep windows
    );

// Sliding window: overlapping windows (every event creates/updates a window)
SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30));
// Use for: "orders in the last 30 minutes" updated on every new order

// Session window: activity-based, gaps close the window
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30));
// Use for: user session aggregation

// Suppress output until window closes (avoid intermediate results)
KTable<Windowed<String>, OrderHourlyStats> finalStats = hourlyStats
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

finalStats.toStream()
    .map((windowedKey, stats) -> KeyValue.pair(
        windowedKey.key(),
        buildHourlyReport(windowedKey.window(), stats)
    ))
    .to("hourly-order-reports");
// Custom timestamp extractor — use event time from the message
public class OrderTimestampExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
        if (record.value() instanceof EnrichedOrder order) {
            return order.getOccurredAt().toEpochMilli();
        }
        return partitionTime;  // Fallback to Kafka timestamp
    }
}

State Store Interactive Queries

// Query local state store (for this partition's data) via REST

@RestController
@RequestMapping("/api/state")
public class StateQueryController {

    @Autowired
    private KafkaStreams streams;

    // Query a specific customer's current stats (KeyValueStore)
    @GetMapping("/customers/{customerId}/stats")
    public ResponseEntity<CustomerStats> getCustomerStats(@PathVariable String customerId) {
        ReadOnlyKeyValueStore<String, CustomerStats> store;
        try {
            store = streams.store(
                StoreQueryParameters.fromNameAndType("customer-stats", QueryableStoreTypes.keyValueStore())
            );
        } catch (InvalidStateStoreException ex) {
            return ResponseEntity.status(503).build();  // Store not ready (rebalancing)
        }

        CustomerStats stats = store.get(customerId);
        if (stats == null) return ResponseEntity.notFound().build();
        return ResponseEntity.ok(stats);
    }

    // Query all customers within a range
    @GetMapping("/customers/range")
    public List<CustomerStats> getRange(@RequestParam String from, @RequestParam String to) {
        ReadOnlyKeyValueStore<String, CustomerStats> store = streams.store(
            StoreQueryParameters.fromNameAndType("customer-stats", QueryableStoreTypes.keyValueStore())
        );
        KeyValueIterator<String, CustomerStats> iter = store.range(from, to);
        List<CustomerStats> results = new ArrayList<>();
        iter.forEachRemaining(kv -> results.add(kv.value));
        return results;
    }

    // For windowed stores (hourly aggregations)
    @GetMapping("/customers/{customerId}/hourly")
    public List<HourlyReport> getHourlyStats(
        @PathVariable String customerId,
        @RequestParam Instant from,
        @RequestParam Instant to
    ) {
        ReadOnlyWindowStore<String, OrderHourlyStats> store = streams.store(
            StoreQueryParameters.fromNameAndType("hourly-order-stats", QueryableStoreTypes.windowStore())
        );
        WindowStoreIterator<OrderHourlyStats> iter = store.fetch(customerId, from, to);
        List<HourlyReport> results = new ArrayList<>();
        iter.forEachRemaining(kv -> results.add(new HourlyReport(Instant.ofEpochMilli(kv.key), kv.value)));
        return results;
    }
}

Stream-Stream Join (Enrichment with Windowing)

// Join two streams within a time window
// Use case: match an order-placed event with its payment-confirmed event

KStream<String, OrderPlaced> orderPlaced = streamsBuilder.stream("order-placed",
    Consumed.with(Serdes.String(), orderPlacedSerde()));

KStream<String, PaymentConfirmed> paymentConfirmed = streamsBuilder.stream("payment-confirmed",
    Consumed.with(Serdes.String(), paymentConfirmedSerde()));

// Both events must arrive within 5 minutes of each other
JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5));

KStream<String, FulfilledOrder> fulfilled = orderPlaced.join(
    paymentConfirmed,
    (order, payment) -> new FulfilledOrder(order, payment),
    joinWindow,
    StreamJoined.with(Serdes.String(), orderPlacedSerde(), paymentConfirmedSerde())
);

// Left join: include orders even without payment (for investigation)
KStream<String, OrderWithPayment> allOrders = orderPlaced.leftJoin(
    paymentConfirmed,
    (order, payment) -> new OrderWithPayment(order, payment),  // payment may be null
    joinWindow,
    StreamJoined.with(Serdes.String(), orderPlacedSerde(), paymentConfirmedSerde())
);

Monitoring Processing Lag

# Prometheus scrape config for Kafka Streams JMX metrics
# Expose JMX via prometheus-jmx-exporter as a Java agent

# Key metrics to alert on:
# kafka.consumer.consumer-fetch-manager-metrics:records-lag-max
#   → How far behind this consumer is (in messages)
# kafka.streams:task-level-metrics:process-latency-avg
#   → Average processing time per record
# kafka.streams:state-store-metrics:rocksdb-block-cache-hit-ratio
#   → State store hit rate (low = disk reads = slow)

# Alert: processing lag growing
- alert: KafkaStreamsLagGrowing
  expr: |
    kafka_consumer_consumer_fetch_manager_metrics_records_lag_max{
      group="order-processor"
    } > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Kafka Streams consumer lag growing"
// Log topology description for debugging
@PostConstruct
public void logTopology() {
    log.info("Kafka Streams topology:\n{}", streams.metadataForAllStreamsClients());
    // Better: use kafka-streams-viz or ksqlDB for visual topology
}

// State store lag via AdminClient
public Map<String, Long> getConsumerLag() throws Exception {
    try (AdminClient admin = AdminClient.create(kafkaProps)) {
        Map<TopicPartition, OffsetAndMetadata> offsets = admin
            .listConsumerGroupOffsets("order-processor-app")
            .partitionsToOffsetAndMetadata().get();
        
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = admin
            .listOffsets(offsets.keySet().stream().collect(
                toMap(tp -> tp, tp -> OffsetSpec.latest())
            )).all().get();
        
        return offsets.entrySet().stream().collect(toMap(
            e -> e.getKey().toString(),
            e -> endOffsets.get(e.getKey()).offset() - e.getValue().offset()
        ));
    }
}

For the Kafka producer/consumer basics and topic configuration that feeds these streams, see the Kafka guide. For the dbt transformations that process the aggregated output from Kafka Streams into analytics models, the dbt guide covers downstream SQL modeling. The Claude Skills 360 bundle includes stream processing skill sets covering Kafka Streams topologies, windowed aggregations, and interactive queries. Start with the free tier to try stream processing topology generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free