Apache Beam abstracts batch and streaming data processing behind a portable pipeline model. The same Pipeline code runs locally, on Apache Flink, or on Google Dataflow — the runner is swappable. PCollection represents a distributed dataset. ParDo applies a DoFn to each element in parallel. Windowing groups streaming data into time buckets. Side inputs provide shared data to processing functions. The Beam Python SDK generates readable pipelines that the DirectRunner executes locally for testing and Dataflow executes at scale. Claude Code generates Beam pipelines, custom DoFns, windowing configurations, IO connectors, and the pipeline test harnesses for production data engineering workflows.
CLAUDE.md for Apache Beam Projects
## Apache Beam Stack
- SDK: apache-beam >= 2.59, apache-beam[gcp] for Dataflow/BigQuery/GCS
- Runner: DirectRunner (local testing), DataflowRunner (GCP), FlinkRunner (self-hosted)
- IO: beam.io.ReadFromText, beam.io.gcp.bigquery, beam.io.kafka
- Windowing: FixedWindows, SlidingWindows, SessionWindows for streaming
- Triggers: AfterWatermark + AfterCount for streaming output timing
- Testing: TestPipeline with assert_that + equal_to / has_items
- Typing: beam.typehints.with_input_types and with_output_types
Basic Pipeline Patterns
# pipelines/order_etl.py — batch ETL with Apache Beam
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
import json
from datetime import datetime
from dataclasses import dataclass
@dataclass
class Order:
id: str
customer_id: str
amount: float
status: str
created_at: str
class ParseOrderJson(beam.DoFn):
"""Parse JSON strings into Order objects."""
def process(self, element: str):
try:
data = json.loads(element)
yield Order(
id=data["id"],
customer_id=data["customer_id"],
amount=float(data["amount"]),
status=data["status"],
created_at=data["created_at"],
)
except (json.JSONDecodeError, KeyError) as e:
# Tag bad records for dead-letter handling
yield beam.pvalue.TaggedOutput("errors", {
"raw": element,
"error": str(e),
})
class EnrichOrder(beam.DoFn):
"""Add derived fields to orders."""
def process(self, order: Order):
enriched = {
"id": order.id,
"customer_id": order.customer_id,
"amount": order.amount,
"status": order.status,
"created_at": order.created_at,
"is_high_value": order.amount > 1000,
"processed_at": datetime.utcnow().isoformat(),
}
yield enriched
def run_order_etl(input_path: str, output_path: str, runner: str = "DirectRunner"):
"""Run batch order ETL pipeline."""
options = PipelineOptions(runner=runner)
with beam.Pipeline(options=options) as p:
# Read raw JSON
raw = p | "Read" >> beam.io.ReadFromText(input_path)
# Parse with error handling
parsed_result = (
raw
| "Parse" >> beam.ParDo(ParseOrderJson()).with_outputs("errors", main="orders")
)
orders = parsed_result.orders
errors = parsed_result.errors
# Write errors to dead letter
(
errors
| "FormatErrors" >> beam.Map(json.dumps)
| "WriteErrors" >> beam.io.WriteToText(f"{output_path}/errors")
)
# Process valid orders
enriched = orders | "Enrich" >> beam.ParDo(EnrichOrder())
# Branch: high-value orders get separate processing
high_value = (
enriched
| "FilterHighValue" >> beam.Filter(lambda o: o["is_high_value"])
)
standard = (
enriched
| "FilterStandard" >> beam.Filter(lambda o: not o["is_high_value"])
)
# Write output
(
high_value
| "FormatHighValue" >> beam.Map(json.dumps)
| "WriteHighValue" >> beam.io.WriteToText(f"{output_path}/high_value")
)
(
standard
| "FormatStandard" >> beam.Map(json.dumps)
| "WriteStandard" >> beam.io.WriteToText(f"{output_path}/standard")
)
Aggregations
# pipelines/revenue_aggregation.py — groupBy and combine operations
import apache_beam as beam
from apache_beam.transforms.combinefn_lifecycle import CombineFn
from typing import Iterable
class SumAndCount(CombineFn):
"""Custom combiner: compute sum and count for average."""
def create_accumulator(self):
return (0.0, 0) # (sum, count)
def add_input(self, accumulator, input: float):
total, count = accumulator
return total + input, count + 1
def merge_accumulators(self, accumulators):
totals, counts = zip(*accumulators)
return sum(totals), sum(counts)
def extract_output(self, accumulator):
total, count = accumulator
return {
"sum": total,
"count": count,
"average": total / count if count > 0 else 0,
}
def revenue_by_customer(orders_pcollection):
"""Aggregate revenue metrics by customer."""
return (
orders_pcollection
| "ExtractCustomerAmount" >> beam.Map(
lambda o: (o["customer_id"], o["amount"])
)
| "GroupByCustomer" >> beam.CombinePerKey(SumAndCount())
| "FormatResult" >> beam.Map(
lambda kv: {
"customer_id": kv[0],
**kv[1],
}
)
)
def daily_revenue(orders_pcollection):
"""Simple daily revenue totals using GroupByKey."""
return (
orders_pcollection
| "ExtractDateAmount" >> beam.Map(
lambda o: (o["created_at"][:10], o["amount"]) # YYYY-MM-DD
)
| "GroupByDate" >> beam.GroupByKey()
| "SumByDate" >> beam.Map(
lambda kv: {
"date": kv[0],
"total_revenue": sum(kv[1]),
"order_count": len(list(kv[1])),
}
)
)
Streaming with Windowing
# pipelines/streaming_metrics.py — real-time metrics over Pub/Sub
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterCount, AccumulationMode
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import json
def run_streaming_metrics(
input_subscription: str, # projects/my-project/subscriptions/orders-sub
output_topic: str,
runner: str = "DataflowRunner",
):
"""Streaming revenue metrics with 1-minute tumbling windows."""
options = PipelineOptions(runner=runner, streaming=True)
if runner == "DataflowRunner":
gcp_options = options.view_as(GoogleCloudOptions)
gcp_options.project = "my-project"
gcp_options.region = "us-central1"
gcp_options.temp_location = "gs://my-bucket/temp"
with beam.Pipeline(options=options) as p:
orders = (
p
| "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "DecodeBytes" >> beam.Map(lambda b: b.decode("utf-8"))
| "ParseJson" >> beam.Map(json.loads)
)
# Add timestamps from event data (not processing time)
timestamped = orders | "AddTimestamps" >> beam.Map(
lambda o: beam.window.TimestampedValue(
o,
# Parse ISO timestamp to Unix seconds
int(datetime.fromisoformat(o["created_at"]).timestamp())
)
)
# 1-minute fixed windows
windowed = (
timestamped
| "Window" >> beam.WindowInto(
window.FixedWindows(60), # 60-second windows
trigger=AfterWatermark(
late=AfterCount(1), # Emit once for late data
),
allowed_lateness=300, # Accept data up to 5 min late
accumulation_mode=AccumulationMode.ACCUMULATING,
)
)
# Revenue per window
revenue = (
windowed
| "ExtractAmount" >> beam.Map(lambda o: ("revenue", o["amount"]))
| "SumRevenue" >> beam.CombinePerKey(sum)
)
# Emit results to Pub/Sub
(
revenue
| "Format" >> beam.Map(lambda kv: json.dumps({"metric": kv[0], "value": kv[1]}).encode())
| "WriteToPubSub" >> beam.io.WriteToPubSub(output_topic)
)
Side Inputs
# pipelines/side_inputs.py — broadcast lookup data alongside elements
import apache_beam as beam
class EnrichWithCustomerData(beam.DoFn):
"""Enrich orders with a customer lookup table (side input)."""
def process(self, order: dict, customer_lookup: dict):
customer = customer_lookup.get(order["customer_id"], {})
yield {
**order,
"customer_name": customer.get("name", "Unknown"),
"customer_tier": customer.get("tier", "standard"),
"customer_country": customer.get("country", "US"),
}
def enrich_orders_with_customers(
orders_pcollection,
customers_pcollection,
):
"""Join orders with customer data using side inputs."""
# Convert customer PCollection to a dict singleton view
customer_dict = (
customers_pcollection
| "ToCustomerKV" >> beam.Map(lambda c: (c["id"], c))
| "GroupCustomers" >> beam.combiners.ToDict()
)
return (
orders_pcollection
| "Enrich" >> beam.ParDo(
EnrichWithCustomerData(),
customer_lookup=beam.pvalue.AsSingleton(customer_dict),
)
)
Pipeline Testing
# tests/test_pipelines.py — unit testing with TestPipeline
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to, is_empty
import json
def test_parse_order_json():
valid_json = json.dumps({
"id": "ord-1",
"customer_id": "cust-1",
"amount": 99.99,
"status": "pending",
"created_at": "2026-01-15T10:00:00Z",
})
invalid_json = "not json"
with TestPipeline() as p:
result = (
p
| beam.Create([valid_json, invalid_json])
| beam.ParDo(ParseOrderJson()).with_outputs("errors", main="orders")
)
assert_that(
result.orders,
equal_to([Order(
id="ord-1",
customer_id="cust-1",
amount=99.99,
status="pending",
created_at="2026-01-15T10:00:00Z",
)]),
label="ValidOrders",
)
assert_that(
result.errors,
beam.testing.util.has_items(
beam.testing.util.matches_all([
lambda e: "not json" in e["raw"],
])
),
label="InvalidOrders",
)
def test_revenue_aggregation():
orders = [
{"customer_id": "c1", "amount": 100.0},
{"customer_id": "c1", "amount": 200.0},
{"customer_id": "c2", "amount": 50.0},
]
with TestPipeline() as p:
result = (
p
| beam.Create(orders)
| revenue_by_customer # Reuse the transform
)
def check_results(results):
result_dict = {r["customer_id"]: r for r in results}
assert result_dict["c1"]["sum"] == 300.0
assert result_dict["c1"]["count"] == 2
assert result_dict["c2"]["sum"] == 50.0
assert_that(result, check_results)
For the Apache Flink native streaming alternative with lower latency and stateful processing, see the Flink guide for streaming state and event time processing. For the Apache Spark batch processing alternative that dominates large-scale data lake workloads, the Spark guide covers DataFrames, SQL, and MLlib. The Claude Skills 360 bundle includes Apache Beam skill sets covering pipeline design, windowing, and Dataflow deployment. Start with the free tier to try Beam pipeline generation.