Claude Code for Apache Flink: Real-Time Analytics, CEP, and Stateful Processing — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for Apache Flink: Real-Time Analytics, CEP, and Stateful Processing
Data Engineering

Claude Code for Apache Flink: Real-Time Analytics, CEP, and Stateful Processing

Published: October 9, 2026
Read time: 9 min read
By: Claude Skills 360

Apache Flink processes data streams with exactly-once semantics, event-time processing, and low latency at high throughput. Unlike Kafka Streams (processing inside your app), Flink is a distributed system you deploy separately — it scales independent of your application and handles terabytes of state with RocksDB state backends. Claude Code implements Flink jobs, explains watermark strategies for late events, and builds the CEP patterns for fraud detection.

## Flink Stack
- Version: 1.19.x
- Language: Java 17 (Flink's primary API) or Python Table API (PyFlink)
- Connector: Kafka source/sink (flink-connector-kafka)
- State Backend: RocksDB (production), HashMap (testing)
- Checkpointing: every 30s, exactly-once, min 10s between checkpoints
- Watermark strategy: BoundedOutOfOrderness with 5-second tolerance
- Serialization: Avro with Schema Registry for production; JSON for dev
- Parallelism: set at job level, override per operator for bottlenecks

Kafka Source + Basic Processing

// src/main/java/com/myapp/OrderProcessor.java
public class OrderProcessor {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Checkpointing for fault tolerance
        env.enableCheckpointing(30_000);  // Every 30 seconds
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10_000);
        env.getCheckpointConfig().setCheckpointTimeout(60_000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // RocksDB state backend for large state
        env.setStateBackend(new EmbeddedRocksDBStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/flink-checkpoints");

        // Kafka source
        KafkaSource<OrderEvent> source = KafkaSource.<OrderEvent>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("order-events")
            .setGroupId("order-processor")
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .setValueOnlyDeserializer(new JsonDeserializationSchema<>(OrderEvent.class))
            .build();

        // Watermark strategy: tolerate 5-second late events
        WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy
            .<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, ts) -> event.getOccurredAt().toEpochMilli())
            .withIdleness(Duration.ofMinutes(1));  // Handle idle partitions

        DataStream<OrderEvent> orders = env
            .fromSource(source, watermarkStrategy, "Kafka Orders Source")
            .uid("orders-source")  // Stable UID for restoring state across job restarts
            .name("Orders Source");

        // Process: filter, enrich, route
        DataStream<EnrichedOrder> enriched = orders
            .filter(order -> order.getTotalCents() > 0)
            .uid("filter-empty")
            .map(new EnrichOrderFunction())
            .uid("enrich-orders")
            .name("Enrich Orders");

        // Kafka sink
        KafkaSink<EnrichedOrder> sink = KafkaSink.<EnrichedOrder>builder()
            .setBootstrapServers("kafka:9092")
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                    .setTopic("enriched-orders")
                    .setValueSerializationSchema(new JsonSerializationSchema<>())
                    .build()
            )
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .build();

        enriched.sinkTo(sink).uid("enriched-sink");

        env.execute("Order Processor");
    }
}

Windowed Aggregations

// Tumbling and sliding windows with Flink

// Tumbling window: sum revenue per product per hour (non-overlapping)
enriched
    .keyBy(order -> order.getProductId())
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(
        new RevenueAggregator(),      // Incremental: efficient, doesn't store all events
        new RevenueWindowFunction(),   // Post-process: add window metadata
    )
    .addSink(revenueSink)
    .uid("revenue-hourly-sink");

// Incremental aggregator — runs per element (memory efficient)
public class RevenueAggregator
    implements AggregateFunction<EnrichedOrder, RevenueSummary, RevenueSummary> {

    @Override
    public RevenueSummary createAccumulator() {
        return new RevenueSummary();
    }

    @Override
    public RevenueSummary add(EnrichedOrder order, RevenueSummary acc) {
        acc.totalCents += order.getTotalCents();
        acc.orderCount++;
        return acc;
    }

    @Override
    public RevenueSummary getResult(RevenueSummary acc) {
        return acc;
    }

    @Override
    public RevenueSummary merge(RevenueSummary a, RevenueSummary b) {
        a.totalCents += b.totalCents;
        a.orderCount += b.orderCount;
        return a;
    }
}

// Window function — runs once per window close (has window metadata)
public class RevenueWindowFunction
    implements ProcessWindowFunction<RevenueSummary, HourlyRevenue, String, TimeWindow> {

    @Override
    public void process(
        String productId,
        Context context,
        Iterable<RevenueSummary> summaries,
        Collector<HourlyRevenue> out
    ) {
        RevenueSummary summary = summaries.iterator().next();
        out.collect(new HourlyRevenue(
            productId,
            summary.totalCents,
            summary.orderCount,
            Instant.ofEpochMilli(context.window().getStart()),
            Instant.ofEpochMilli(context.window().getEnd())
        ));
    }
}

Complex Event Processing (CEP)

// Fraud detection: flag if 3+ large orders from same customer within 10 minutes

Pattern<OrderEvent, ?> fraudPattern = Pattern
    .<OrderEvent>begin("first_large_order")
        .where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent event) {
                return event.getTotalCents() > 100_000;  // > $1000
            }
        })
    .next("second_large_order")
        .where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent event) {
                return event.getTotalCents() > 100_000;
            }
        })
    .next("third_large_order")
        .where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent event) {
                return event.getTotalCents() > 100_000;
            }
        })
    .within(Time.minutes(10));

PatternStream<OrderEvent> patternStream = CEP.pattern(
    orders.keyBy(OrderEvent::getCustomerId),
    fraudPattern
);

DataStream<FraudAlert> alerts = patternStream.process(
    new PatternProcessFunction<OrderEvent, FraudAlert>() {
        @Override
        public void processMatch(
            Map<String, List<OrderEvent>> match,
            Context ctx,
            Collector<FraudAlert> out
        ) {
            List<OrderEvent> thirdOrders = match.get("third_large_order");
            out.collect(FraudAlert.builder()
                .customerId(thirdOrders.get(0).getCustomerId())
                .orderIds(match.values().stream()
                    .flatMap(List::stream)
                    .map(OrderEvent::getOrderId)
                    .collect(toList()))
                .detectedAt(Instant.now())
                .build());
        }
    }
);

alerts.addSink(fraudAlertSink);

Late Event Handling

// Handle events that arrive after the watermark passed

OutputTag<OrderEvent> lateEventTag = new OutputTag<>("late-events"){};

SingleOutputStreamOperator<HourlyRevenue> mainStream = orders
    .keyBy(order -> order.getProductId())
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .allowedLateness(Time.minutes(10))  // Wait extra 10min before finalizing
    .sideOutputLateData(lateEventTag)   // Capture events too late for allowedLateness
    .aggregate(new RevenueAggregator(), new RevenueWindowFunction());

// Process late events separately — update the already-emitted result
DataStream<OrderEvent> lateOrders = mainStream.getSideOutput(lateEventTag);

lateOrders
    .map(event -> new LateOrderNotification(event.getOrderId(), event.getOccurredAt()))
    .addSink(lateEventAuditSink);
// Table API: SQL-like processing on streams
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

// Register Kafka source as a table
tableEnv.executeSql("""
    CREATE TABLE orders (
        order_id STRING,
        customer_id STRING,
        product_id STRING,
        total_cents BIGINT,
        occurred_at TIMESTAMP(3),
        WATERMARK FOR occurred_at AS occurred_at - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'enriched-orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""");

// Tumbling window aggregation in SQL
tableEnv.executeSql("""
    INSERT INTO hourly_revenue
    SELECT
        product_id,
        TUMBLE_START(occurred_at, INTERVAL '1' HOUR) AS window_start,
        SUM(total_cents) AS total_revenue_cents,
        COUNT(*) AS order_count
    FROM orders
    GROUP BY TUMBLE(occurred_at, INTERVAL '1' HOUR), product_id
""");

For the Kafka topics that feed Flink jobs, see the Kafka guide. For Kafka Streams as an alternative to Flink when processing complexity is lower, the Kafka Streams guide covers stateful stream processing within your application. The Claude Skills 360 bundle includes stream processing skill sets covering Flink DataStream API, CEP patterns, and SQL streaming. Start with the free tier to try Flink topology generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free