Kafka Streams is a Java library for building stateful stream processing applications that run inside your own processes — no separate cluster to manage beyond Kafka itself. It handles exactly-once processing, state stores with RocksDB, and time-based windowing. Claude Code implements Kafka Streams topologies, explains the KStream vs KTable distinction, and builds the monitoring for processing lag and state store sizes.
CLAUDE.md for Kafka Streams Projects
## Kafka Streams Stack
- Library: kafka-streams 3.7.x embedded in Spring Boot 3.x
- State store: RocksDB (local) + changelog topic (Kafka-backed durability)
- Serdes: Jackson JSON serde for domain objects, String for keys
- Processing guarantee: exactly_once_v2 (requires Kafka 2.5+)
- Commit interval: 100ms (default; lower = more CPU, lower latency)
- Standby replicas: 1 (for fast failover during rebalance)
- Interactive queries: enabled via REST API for state store lookups
Core Topology
// src/main/java/com/myapp/OrderStreamProcessor.java
@Configuration
public class OrderStreamProcessor {
@Autowired
private StreamsBuilder streamsBuilder;
@Bean
public KStream<String, Order> buildTopology() {
// Input: raw orders with validation errors mixed in
KStream<String, OrderEvent> rawOrders = streamsBuilder.stream(
"order-events",
Consumed.with(Serdes.String(), orderEventSerde())
);
// Branch: split valid vs invalid
Map<String, KStream<String, OrderEvent>> branches = rawOrders.split(Named.as("branch-"))
.branch((key, value) -> value.isValid(), Branched.as("valid"))
.branch((key, value) -> !value.isValid(), Branched.as("invalid"))
.noDefaultBranch();
// Dead letter queue for invalid events
branches.get("branch-invalid").to("order-events-dlq");
KStream<String, OrderEvent> validOrders = branches.get("branch-valid");
// Enrich with customer tier (from KTable)
KTable<String, CustomerProfile> customerProfiles = streamsBuilder.table(
"customer-profiles",
Consumed.with(Serdes.String(), customerProfileSerde()),
Materialized.as("customer-profiles-store")
);
// Stream-table join: enrich each order with customer profile
KStream<String, EnrichedOrder> enrichedOrders = validOrders
.selectKey((key, order) -> order.getCustomerId()) // Re-key by customerId
.join(
customerProfiles,
(order, profile) -> new EnrichedOrder(order, profile),
Joined.with(Serdes.String(), orderEventSerde(), customerProfileSerde())
);
// Write enriched orders to output topic
enrichedOrders.to(
"enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde())
);
return validOrders;
}
}
Windowed Aggregations
// Count orders and sum revenue per customer per hour
KStream<String, EnrichedOrder> orders = streamsBuilder.stream("enriched-orders",
Consumed.with(Serdes.String(), enrichedOrderSerde())
.withTimestampExtractor(new OrderTimestampExtractor()) // Use event time, not wall clock
);
// Tumbling window: non-overlapping 1-hour buckets
TimeWindows hourlyWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1));
KTable<Windowed<String>, OrderHourlyStats> hourlyStats = orders
.groupByKey()
.windowedBy(hourlyWindow)
.aggregate(
OrderHourlyStats::empty, // Initializer
(customerId, order, stats) -> stats.add(order), // Aggregator
Materialized.<String, OrderHourlyStats, WindowStore<Bytes, byte[]>>as("hourly-order-stats")
.withValueSerde(statsSerde())
.withRetention(Duration.ofDays(7)) // How long to keep windows
);
// Sliding window: overlapping windows (every event creates/updates a window)
SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(30));
// Use for: "orders in the last 30 minutes" updated on every new order
// Session window: activity-based, gaps close the window
SessionWindows sessionWindow = SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30));
// Use for: user session aggregation
// Suppress output until window closes (avoid intermediate results)
KTable<Windowed<String>, OrderHourlyStats> finalStats = hourlyStats
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
finalStats.toStream()
.map((windowedKey, stats) -> KeyValue.pair(
windowedKey.key(),
buildHourlyReport(windowedKey.window(), stats)
))
.to("hourly-order-reports");
// Custom timestamp extractor — use event time from the message
public class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
if (record.value() instanceof EnrichedOrder order) {
return order.getOccurredAt().toEpochMilli();
}
return partitionTime; // Fallback to Kafka timestamp
}
}
State Store Interactive Queries
// Query local state store (for this partition's data) via REST
@RestController
@RequestMapping("/api/state")
public class StateQueryController {
@Autowired
private KafkaStreams streams;
// Query a specific customer's current stats (KeyValueStore)
@GetMapping("/customers/{customerId}/stats")
public ResponseEntity<CustomerStats> getCustomerStats(@PathVariable String customerId) {
ReadOnlyKeyValueStore<String, CustomerStats> store;
try {
store = streams.store(
StoreQueryParameters.fromNameAndType("customer-stats", QueryableStoreTypes.keyValueStore())
);
} catch (InvalidStateStoreException ex) {
return ResponseEntity.status(503).build(); // Store not ready (rebalancing)
}
CustomerStats stats = store.get(customerId);
if (stats == null) return ResponseEntity.notFound().build();
return ResponseEntity.ok(stats);
}
// Query all customers within a range
@GetMapping("/customers/range")
public List<CustomerStats> getRange(@RequestParam String from, @RequestParam String to) {
ReadOnlyKeyValueStore<String, CustomerStats> store = streams.store(
StoreQueryParameters.fromNameAndType("customer-stats", QueryableStoreTypes.keyValueStore())
);
KeyValueIterator<String, CustomerStats> iter = store.range(from, to);
List<CustomerStats> results = new ArrayList<>();
iter.forEachRemaining(kv -> results.add(kv.value));
return results;
}
// For windowed stores (hourly aggregations)
@GetMapping("/customers/{customerId}/hourly")
public List<HourlyReport> getHourlyStats(
@PathVariable String customerId,
@RequestParam Instant from,
@RequestParam Instant to
) {
ReadOnlyWindowStore<String, OrderHourlyStats> store = streams.store(
StoreQueryParameters.fromNameAndType("hourly-order-stats", QueryableStoreTypes.windowStore())
);
WindowStoreIterator<OrderHourlyStats> iter = store.fetch(customerId, from, to);
List<HourlyReport> results = new ArrayList<>();
iter.forEachRemaining(kv -> results.add(new HourlyReport(Instant.ofEpochMilli(kv.key), kv.value)));
return results;
}
}
Stream-Stream Join (Enrichment with Windowing)
// Join two streams within a time window
// Use case: match an order-placed event with its payment-confirmed event
KStream<String, OrderPlaced> orderPlaced = streamsBuilder.stream("order-placed",
Consumed.with(Serdes.String(), orderPlacedSerde()));
KStream<String, PaymentConfirmed> paymentConfirmed = streamsBuilder.stream("payment-confirmed",
Consumed.with(Serdes.String(), paymentConfirmedSerde()));
// Both events must arrive within 5 minutes of each other
JoinWindows joinWindow = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5));
KStream<String, FulfilledOrder> fulfilled = orderPlaced.join(
paymentConfirmed,
(order, payment) -> new FulfilledOrder(order, payment),
joinWindow,
StreamJoined.with(Serdes.String(), orderPlacedSerde(), paymentConfirmedSerde())
);
// Left join: include orders even without payment (for investigation)
KStream<String, OrderWithPayment> allOrders = orderPlaced.leftJoin(
paymentConfirmed,
(order, payment) -> new OrderWithPayment(order, payment), // payment may be null
joinWindow,
StreamJoined.with(Serdes.String(), orderPlacedSerde(), paymentConfirmedSerde())
);
Monitoring Processing Lag
# Prometheus scrape config for Kafka Streams JMX metrics
# Expose JMX via prometheus-jmx-exporter as a Java agent
# Key metrics to alert on:
# kafka.consumer.consumer-fetch-manager-metrics:records-lag-max
# → How far behind this consumer is (in messages)
# kafka.streams:task-level-metrics:process-latency-avg
# → Average processing time per record
# kafka.streams:state-store-metrics:rocksdb-block-cache-hit-ratio
# → State store hit rate (low = disk reads = slow)
# Alert: processing lag growing
- alert: KafkaStreamsLagGrowing
expr: |
kafka_consumer_consumer_fetch_manager_metrics_records_lag_max{
group="order-processor"
} > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka Streams consumer lag growing"
// Log topology description for debugging
@PostConstruct
public void logTopology() {
log.info("Kafka Streams topology:\n{}", streams.metadataForAllStreamsClients());
// Better: use kafka-streams-viz or ksqlDB for visual topology
}
// State store lag via AdminClient
public Map<String, Long> getConsumerLag() throws Exception {
try (AdminClient admin = AdminClient.create(kafkaProps)) {
Map<TopicPartition, OffsetAndMetadata> offsets = admin
.listConsumerGroupOffsets("order-processor-app")
.partitionsToOffsetAndMetadata().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = admin
.listOffsets(offsets.keySet().stream().collect(
toMap(tp -> tp, tp -> OffsetSpec.latest())
)).all().get();
return offsets.entrySet().stream().collect(toMap(
e -> e.getKey().toString(),
e -> endOffsets.get(e.getKey()).offset() - e.getValue().offset()
));
}
}
For the Kafka producer/consumer basics and topic configuration that feeds these streams, see the Kafka guide. For the dbt transformations that process the aggregated output from Kafka Streams into analytics models, the dbt guide covers downstream SQL modeling. The Claude Skills 360 bundle includes stream processing skill sets covering Kafka Streams topologies, windowed aggregations, and interactive queries. Start with the free tier to try stream processing topology generation.