Claude Code for Apache Beam: Unified Batch and Streaming Pipelines — Claude Skills 360 Blog
Blog / Data / Claude Code for Apache Beam: Unified Batch and Streaming Pipelines
Data

Claude Code for Apache Beam: Unified Batch and Streaming Pipelines

Published: January 12, 2027
Read time: 9 min read
By: Claude Skills 360

Apache Beam abstracts batch and streaming data processing behind a portable pipeline model. The same Pipeline code runs locally, on Apache Flink, or on Google Dataflow — the runner is swappable. PCollection represents a distributed dataset. ParDo applies a DoFn to each element in parallel. Windowing groups streaming data into time buckets. Side inputs provide shared data to processing functions. The Beam Python SDK generates readable pipelines that the DirectRunner executes locally for testing and Dataflow executes at scale. Claude Code generates Beam pipelines, custom DoFns, windowing configurations, IO connectors, and the pipeline test harnesses for production data engineering workflows.

CLAUDE.md for Apache Beam Projects

## Apache Beam Stack
- SDK: apache-beam >= 2.59, apache-beam[gcp] for Dataflow/BigQuery/GCS
- Runner: DirectRunner (local testing), DataflowRunner (GCP), FlinkRunner (self-hosted)
- IO: beam.io.ReadFromText, beam.io.gcp.bigquery, beam.io.kafka
- Windowing: FixedWindows, SlidingWindows, SessionWindows for streaming
- Triggers: AfterWatermark + AfterCount for streaming output timing
- Testing: TestPipeline with assert_that + equal_to / has_items
- Typing: beam.typehints.with_input_types and with_output_types

Basic Pipeline Patterns

# pipelines/order_etl.py — batch ETL with Apache Beam
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
import json
from datetime import datetime
from dataclasses import dataclass


@dataclass
class Order:
    id: str
    customer_id: str
    amount: float
    status: str
    created_at: str


class ParseOrderJson(beam.DoFn):
    """Parse JSON strings into Order objects."""

    def process(self, element: str):
        try:
            data = json.loads(element)
            yield Order(
                id=data["id"],
                customer_id=data["customer_id"],
                amount=float(data["amount"]),
                status=data["status"],
                created_at=data["created_at"],
            )
        except (json.JSONDecodeError, KeyError) as e:
            # Tag bad records for dead-letter handling
            yield beam.pvalue.TaggedOutput("errors", {
                "raw": element,
                "error": str(e),
            })


class EnrichOrder(beam.DoFn):
    """Add derived fields to orders."""

    def process(self, order: Order):
        enriched = {
            "id": order.id,
            "customer_id": order.customer_id,
            "amount": order.amount,
            "status": order.status,
            "created_at": order.created_at,
            "is_high_value": order.amount > 1000,
            "processed_at": datetime.utcnow().isoformat(),
        }
        yield enriched


def run_order_etl(input_path: str, output_path: str, runner: str = "DirectRunner"):
    """Run batch order ETL pipeline."""

    options = PipelineOptions(runner=runner)

    with beam.Pipeline(options=options) as p:
        # Read raw JSON
        raw = p | "Read" >> beam.io.ReadFromText(input_path)

        # Parse with error handling
        parsed_result = (
            raw
            | "Parse" >> beam.ParDo(ParseOrderJson()).with_outputs("errors", main="orders")
        )

        orders = parsed_result.orders
        errors = parsed_result.errors

        # Write errors to dead letter
        (
            errors
            | "FormatErrors" >> beam.Map(json.dumps)
            | "WriteErrors" >> beam.io.WriteToText(f"{output_path}/errors")
        )

        # Process valid orders
        enriched = orders | "Enrich" >> beam.ParDo(EnrichOrder())

        # Branch: high-value orders get separate processing
        high_value = (
            enriched
            | "FilterHighValue" >> beam.Filter(lambda o: o["is_high_value"])
        )

        standard = (
            enriched
            | "FilterStandard" >> beam.Filter(lambda o: not o["is_high_value"])
        )

        # Write output
        (
            high_value
            | "FormatHighValue" >> beam.Map(json.dumps)
            | "WriteHighValue" >> beam.io.WriteToText(f"{output_path}/high_value")
        )

        (
            standard
            | "FormatStandard" >> beam.Map(json.dumps)
            | "WriteStandard" >> beam.io.WriteToText(f"{output_path}/standard")
        )

Aggregations

# pipelines/revenue_aggregation.py — groupBy and combine operations
import apache_beam as beam
from apache_beam.transforms.combinefn_lifecycle import CombineFn
from typing import Iterable


class SumAndCount(CombineFn):
    """Custom combiner: compute sum and count for average."""

    def create_accumulator(self):
        return (0.0, 0)  # (sum, count)

    def add_input(self, accumulator, input: float):
        total, count = accumulator
        return total + input, count + 1

    def merge_accumulators(self, accumulators):
        totals, counts = zip(*accumulators)
        return sum(totals), sum(counts)

    def extract_output(self, accumulator):
        total, count = accumulator
        return {
            "sum": total,
            "count": count,
            "average": total / count if count > 0 else 0,
        }


def revenue_by_customer(orders_pcollection):
    """Aggregate revenue metrics by customer."""

    return (
        orders_pcollection
        | "ExtractCustomerAmount" >> beam.Map(
            lambda o: (o["customer_id"], o["amount"])
        )
        | "GroupByCustomer" >> beam.CombinePerKey(SumAndCount())
        | "FormatResult" >> beam.Map(
            lambda kv: {
                "customer_id": kv[0],
                **kv[1],
            }
        )
    )


def daily_revenue(orders_pcollection):
    """Simple daily revenue totals using GroupByKey."""

    return (
        orders_pcollection
        | "ExtractDateAmount" >> beam.Map(
            lambda o: (o["created_at"][:10], o["amount"])  # YYYY-MM-DD
        )
        | "GroupByDate" >> beam.GroupByKey()
        | "SumByDate" >> beam.Map(
            lambda kv: {
                "date": kv[0],
                "total_revenue": sum(kv[1]),
                "order_count": len(list(kv[1])),
            }
        )
    )

Streaming with Windowing

# pipelines/streaming_metrics.py — real-time metrics over Pub/Sub
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterCount, AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import json


def run_streaming_metrics(
    input_subscription: str,   # projects/my-project/subscriptions/orders-sub
    output_topic: str,
    runner: str = "DataflowRunner",
):
    """Streaming revenue metrics with 1-minute tumbling windows."""

    options = PipelineOptions(runner=runner, streaming=True)
    if runner == "DataflowRunner":
        gcp_options = options.view_as(GoogleCloudOptions)
        gcp_options.project = "my-project"
        gcp_options.region = "us-central1"
        gcp_options.temp_location = "gs://my-bucket/temp"

    with beam.Pipeline(options=options) as p:
        orders = (
            p
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
            | "DecodeBytes" >> beam.Map(lambda b: b.decode("utf-8"))
            | "ParseJson" >> beam.Map(json.loads)
        )

        # Add timestamps from event data (not processing time)
        timestamped = orders | "AddTimestamps" >> beam.Map(
            lambda o: beam.window.TimestampedValue(
                o,
                # Parse ISO timestamp to Unix seconds
                int(datetime.fromisoformat(o["created_at"]).timestamp())
            )
        )

        # 1-minute fixed windows
        windowed = (
            timestamped
            | "Window" >> beam.WindowInto(
                window.FixedWindows(60),   # 60-second windows
                trigger=AfterWatermark(
                    late=AfterCount(1),    # Emit once for late data
                ),
                allowed_lateness=300,      # Accept data up to 5 min late
                accumulation_mode=AccumulationMode.ACCUMULATING,
            )
        )

        # Revenue per window
        revenue = (
            windowed
            | "ExtractAmount" >> beam.Map(lambda o: ("revenue", o["amount"]))
            | "SumRevenue" >> beam.CombinePerKey(sum)
        )

        # Emit results to Pub/Sub
        (
            revenue
            | "Format" >> beam.Map(lambda kv: json.dumps({"metric": kv[0], "value": kv[1]}).encode())
            | "WriteToPubSub" >> beam.io.WriteToPubSub(output_topic)
        )

Side Inputs

# pipelines/side_inputs.py — broadcast lookup data alongside elements
import apache_beam as beam


class EnrichWithCustomerData(beam.DoFn):
    """Enrich orders with a customer lookup table (side input)."""

    def process(self, order: dict, customer_lookup: dict):
        customer = customer_lookup.get(order["customer_id"], {})
        yield {
            **order,
            "customer_name": customer.get("name", "Unknown"),
            "customer_tier": customer.get("tier", "standard"),
            "customer_country": customer.get("country", "US"),
        }


def enrich_orders_with_customers(
    orders_pcollection,
    customers_pcollection,
):
    """Join orders with customer data using side inputs."""

    # Convert customer PCollection to a dict singleton view
    customer_dict = (
        customers_pcollection
        | "ToCustomerKV" >> beam.Map(lambda c: (c["id"], c))
        | "GroupCustomers" >> beam.combiners.ToDict()
    )

    return (
        orders_pcollection
        | "Enrich" >> beam.ParDo(
            EnrichWithCustomerData(),
            customer_lookup=beam.pvalue.AsSingleton(customer_dict),
        )
    )

Pipeline Testing

# tests/test_pipelines.py — unit testing with TestPipeline
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to, is_empty
import json


def test_parse_order_json():
    valid_json = json.dumps({
        "id": "ord-1",
        "customer_id": "cust-1",
        "amount": 99.99,
        "status": "pending",
        "created_at": "2026-01-15T10:00:00Z",
    })

    invalid_json = "not json"

    with TestPipeline() as p:
        result = (
            p
            | beam.Create([valid_json, invalid_json])
            | beam.ParDo(ParseOrderJson()).with_outputs("errors", main="orders")
        )

        assert_that(
            result.orders,
            equal_to([Order(
                id="ord-1",
                customer_id="cust-1",
                amount=99.99,
                status="pending",
                created_at="2026-01-15T10:00:00Z",
            )]),
            label="ValidOrders",
        )

        assert_that(
            result.errors,
            beam.testing.util.has_items(
                beam.testing.util.matches_all([
                    lambda e: "not json" in e["raw"],
                ])
            ),
            label="InvalidOrders",
        )


def test_revenue_aggregation():
    orders = [
        {"customer_id": "c1", "amount": 100.0},
        {"customer_id": "c1", "amount": 200.0},
        {"customer_id": "c2", "amount": 50.0},
    ]

    with TestPipeline() as p:
        result = (
            p
            | beam.Create(orders)
            | revenue_by_customer  # Reuse the transform
        )

        def check_results(results):
            result_dict = {r["customer_id"]: r for r in results}
            assert result_dict["c1"]["sum"] == 300.0
            assert result_dict["c1"]["count"] == 2
            assert result_dict["c2"]["sum"] == 50.0

        assert_that(result, check_results)

For the Apache Flink native streaming alternative with lower latency and stateful processing, see the Flink guide for streaming state and event time processing. For the Apache Spark batch processing alternative that dominates large-scale data lake workloads, the Spark guide covers DataFrames, SQL, and MLlib. The Claude Skills 360 bundle includes Apache Beam skill sets covering pipeline design, windowing, and Dataflow deployment. Start with the free tier to try Beam pipeline generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free