Apache Flink processes data streams with exactly-once semantics, event-time processing, and low latency at high throughput. Unlike Kafka Streams (processing inside your app), Flink is a distributed system you deploy separately — it scales independent of your application and handles terabytes of state with RocksDB state backends. Claude Code implements Flink jobs, explains watermark strategies for late events, and builds the CEP patterns for fraud detection.
CLAUDE.md for Flink Projects
## Flink Stack
- Version: 1.19.x
- Language: Java 17 (Flink's primary API) or Python Table API (PyFlink)
- Connector: Kafka source/sink (flink-connector-kafka)
- State Backend: RocksDB (production), HashMap (testing)
- Checkpointing: every 30s, exactly-once, min 10s between checkpoints
- Watermark strategy: BoundedOutOfOrderness with 5-second tolerance
- Serialization: Avro with Schema Registry for production; JSON for dev
- Parallelism: set at job level, override per operator for bottlenecks
Kafka Source + Basic Processing
// src/main/java/com/myapp/OrderProcessor.java
public class OrderProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpointing for fault tolerance
env.enableCheckpointing(30_000); // Every 30 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
env.getCheckpointConfig().setCheckpointTimeout(60_000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// RocksDB state backend for large state
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/flink-checkpoints");
// Kafka source
KafkaSource<OrderEvent> source = KafkaSource.<OrderEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("order-events")
.setGroupId("order-processor")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(OrderEvent.class))
.build();
// Watermark strategy: tolerate 5-second late events
WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy
.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getOccurredAt().toEpochMilli())
.withIdleness(Duration.ofMinutes(1)); // Handle idle partitions
DataStream<OrderEvent> orders = env
.fromSource(source, watermarkStrategy, "Kafka Orders Source")
.uid("orders-source") // Stable UID for restoring state across job restarts
.name("Orders Source");
// Process: filter, enrich, route
DataStream<EnrichedOrder> enriched = orders
.filter(order -> order.getTotalCents() > 0)
.uid("filter-empty")
.map(new EnrichOrderFunction())
.uid("enrich-orders")
.name("Enrich Orders");
// Kafka sink
KafkaSink<EnrichedOrder> sink = KafkaSink.<EnrichedOrder>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("enriched-orders")
.setValueSerializationSchema(new JsonSerializationSchema<>())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
enriched.sinkTo(sink).uid("enriched-sink");
env.execute("Order Processor");
}
}
Windowed Aggregations
// Tumbling and sliding windows with Flink
// Tumbling window: sum revenue per product per hour (non-overlapping)
enriched
.keyBy(order -> order.getProductId())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(
new RevenueAggregator(), // Incremental: efficient, doesn't store all events
new RevenueWindowFunction(), // Post-process: add window metadata
)
.addSink(revenueSink)
.uid("revenue-hourly-sink");
// Incremental aggregator — runs per element (memory efficient)
public class RevenueAggregator
implements AggregateFunction<EnrichedOrder, RevenueSummary, RevenueSummary> {
@Override
public RevenueSummary createAccumulator() {
return new RevenueSummary();
}
@Override
public RevenueSummary add(EnrichedOrder order, RevenueSummary acc) {
acc.totalCents += order.getTotalCents();
acc.orderCount++;
return acc;
}
@Override
public RevenueSummary getResult(RevenueSummary acc) {
return acc;
}
@Override
public RevenueSummary merge(RevenueSummary a, RevenueSummary b) {
a.totalCents += b.totalCents;
a.orderCount += b.orderCount;
return a;
}
}
// Window function — runs once per window close (has window metadata)
public class RevenueWindowFunction
implements ProcessWindowFunction<RevenueSummary, HourlyRevenue, String, TimeWindow> {
@Override
public void process(
String productId,
Context context,
Iterable<RevenueSummary> summaries,
Collector<HourlyRevenue> out
) {
RevenueSummary summary = summaries.iterator().next();
out.collect(new HourlyRevenue(
productId,
summary.totalCents,
summary.orderCount,
Instant.ofEpochMilli(context.window().getStart()),
Instant.ofEpochMilli(context.window().getEnd())
));
}
}
Complex Event Processing (CEP)
// Fraud detection: flag if 3+ large orders from same customer within 10 minutes
Pattern<OrderEvent, ?> fraudPattern = Pattern
.<OrderEvent>begin("first_large_order")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent event) {
return event.getTotalCents() > 100_000; // > $1000
}
})
.next("second_large_order")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent event) {
return event.getTotalCents() > 100_000;
}
})
.next("third_large_order")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent event) {
return event.getTotalCents() > 100_000;
}
})
.within(Time.minutes(10));
PatternStream<OrderEvent> patternStream = CEP.pattern(
orders.keyBy(OrderEvent::getCustomerId),
fraudPattern
);
DataStream<FraudAlert> alerts = patternStream.process(
new PatternProcessFunction<OrderEvent, FraudAlert>() {
@Override
public void processMatch(
Map<String, List<OrderEvent>> match,
Context ctx,
Collector<FraudAlert> out
) {
List<OrderEvent> thirdOrders = match.get("third_large_order");
out.collect(FraudAlert.builder()
.customerId(thirdOrders.get(0).getCustomerId())
.orderIds(match.values().stream()
.flatMap(List::stream)
.map(OrderEvent::getOrderId)
.collect(toList()))
.detectedAt(Instant.now())
.build());
}
}
);
alerts.addSink(fraudAlertSink);
Late Event Handling
// Handle events that arrive after the watermark passed
OutputTag<OrderEvent> lateEventTag = new OutputTag<>("late-events"){};
SingleOutputStreamOperator<HourlyRevenue> mainStream = orders
.keyBy(order -> order.getProductId())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(10)) // Wait extra 10min before finalizing
.sideOutputLateData(lateEventTag) // Capture events too late for allowedLateness
.aggregate(new RevenueAggregator(), new RevenueWindowFunction());
// Process late events separately — update the already-emitted result
DataStream<OrderEvent> lateOrders = mainStream.getSideOutput(lateEventTag);
lateOrders
.map(event -> new LateOrderNotification(event.getOrderId(), event.getOccurredAt()))
.addSink(lateEventAuditSink);
Flink SQL / Table API
// Table API: SQL-like processing on streams
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// Register Kafka source as a table
tableEnv.executeSql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
product_id STRING,
total_cents BIGINT,
occurred_at TIMESTAMP(3),
WATERMARK FOR occurred_at AS occurred_at - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'enriched-orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""");
// Tumbling window aggregation in SQL
tableEnv.executeSql("""
INSERT INTO hourly_revenue
SELECT
product_id,
TUMBLE_START(occurred_at, INTERVAL '1' HOUR) AS window_start,
SUM(total_cents) AS total_revenue_cents,
COUNT(*) AS order_count
FROM orders
GROUP BY TUMBLE(occurred_at, INTERVAL '1' HOUR), product_id
""");
For the Kafka topics that feed Flink jobs, see the Kafka guide. For Kafka Streams as an alternative to Flink when processing complexity is lower, the Kafka Streams guide covers stateful stream processing within your application. The Claude Skills 360 bundle includes stream processing skill sets covering Flink DataStream API, CEP patterns, and SQL streaming. Start with the free tier to try Flink topology generation.