Apache Spark processes petabyte-scale datasets across hundreds of nodes, but the same code runs locally on a laptop. PySpark gives Python access to Spark’s distributed DataFrame API: read Parquet from S3, join multi-billion-row tables, compute windowed aggregations, and write results back — all with a familiar pandas-like syntax that Spark optimizes under the hood. Claude Code writes PySpark transformations, Spark SQL queries, Structured Streaming pipelines, and the tuning hints that prevent shuffle spills and out-of-memory errors in production.
CLAUDE.md for Spark Projects
## Spark Stack
- PySpark 3.5.x with Delta Lake 3.x for ACID tables
- Cluster: Databricks (managed) or EMR Serverless
- Storage: S3 with Parquet/Delta format; never use CSV in production
- Optimization: broadcast joins for <10MB tables; repartition before wide shuffles
- Caching: only cache DataFrames used >2x; cacheCount() to materialize
- Schema: always define explicit schemas for reads — inference is slow
- Type hints: use PySpark type stubs for IDE support
SparkSession and Schema Definition
# spark_jobs/utils.py
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType,
LongType, TimestampType, DecimalType, ArrayType
)
def create_spark_session(app_name: str, extra_configs: dict = {}) -> SparkSession:
builder = (
SparkSession.builder
.appName(app_name)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.adaptive.enabled", "true") # AQE: auto optimize joins/shuffles
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.sql.shuffle.partitions", "200")
)
for key, value in extra_configs.items():
builder = builder.config(key, value)
return builder.getOrCreate()
# Always define schemas explicitly — never infer from data
ORDER_SCHEMA = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("user_id", StringType(), nullable=False),
StructField("status", StringType(), nullable=True),
StructField("total_cents", LongType(), nullable=False),
StructField("created_at", TimestampType(), nullable=False),
StructField("items", ArrayType(StructType([
StructField("product_id", StringType(), nullable=False),
StructField("quantity", IntegerType(), nullable=False),
StructField("price_cents", LongType(), nullable=False),
])), nullable=True),
])
Core Transformations
# spark_jobs/orders_pipeline.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def clean_orders(df: DataFrame) -> DataFrame:
"""Clean and normalize raw orders data."""
return (
df
.filter(F.col("order_id").isNotNull())
.filter(F.col("total_cents") > 0)
.withColumn("status", F.lower(F.trim(F.col("status"))))
.withColumn("created_date", F.to_date(F.col("created_at")))
.withColumn("total_usd", F.col("total_cents") / 100.0)
.drop("_corrupt_record")
)
def enrich_with_user_data(orders_df: DataFrame, users_df: DataFrame) -> DataFrame:
"""
Join orders with users. Broadcast users if small (<10MB).
Without broadcast hint, Spark would do an expensive shuffle join.
"""
return orders_df.join(
F.broadcast(users_df.select("user_id", "email", "country", "tier")),
on="user_id",
how="left",
)
def compute_user_metrics(df: DataFrame) -> DataFrame:
"""
Per-user lifetime and cohort metrics using window functions.
"""
# Window: all orders for a user, ordered by time
user_window = Window.partitionBy("user_id").orderBy("created_at")
# Window: all orders in same month for aggregate
user_month_window = (
Window.partitionBy("user_id", F.date_trunc("month", F.col("created_at")))
)
return (
df
.withColumn("order_number", F.row_number().over(user_window))
.withColumn("is_first_order", F.col("order_number") == 1)
.withColumn("cumulative_spend_cents", F.sum("total_cents").over(
user_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
))
.withColumn("monthly_order_count", F.count("order_id").over(user_month_window))
.withColumn("days_since_prev_order",
F.datediff(
F.col("created_at"),
F.lag("created_at", 1).over(user_window)
)
)
)
def aggregate_daily_revenue(df: DataFrame) -> DataFrame:
"""Daily revenue summary by country and product tier."""
return (
df
.filter(F.col("status") == "delivered")
.groupBy(
F.to_date("created_at").alias("date"),
"country",
"tier",
)
.agg(
F.count("order_id").alias("order_count"),
F.sum("total_cents").alias("revenue_cents"),
F.countDistinct("user_id").alias("unique_customers"),
F.avg("total_cents").alias("avg_order_cents"),
F.percentile_approx("total_cents", 0.5).alias("median_order_cents"),
)
.orderBy("date", "country")
)
Spark SQL
# spark_jobs/sql_queries.py — SQL for complex logic
def run_cohort_analysis(spark: SparkSession, orders_df: DataFrame) -> DataFrame:
"""Cohort retention analysis in Spark SQL."""
orders_df.createOrReplaceTempView("orders")
return spark.sql("""
WITH cohorts AS (
SELECT
user_id,
DATE_TRUNC('month', MIN(created_at)) AS cohort_month
FROM orders
WHERE status = 'delivered'
GROUP BY user_id
),
user_activity AS (
SELECT
o.user_id,
c.cohort_month,
DATE_TRUNC('month', o.created_at) AS activity_month,
DATEDIFF(
DATE_TRUNC('month', o.created_at),
c.cohort_month
) / 30 AS months_since_cohort
FROM orders o
JOIN cohorts c USING (user_id)
)
SELECT
cohort_month,
months_since_cohort,
COUNT(DISTINCT user_id) AS retained_users
FROM user_activity
GROUP BY cohort_month, months_since_cohort
ORDER BY cohort_month, months_since_cohort
""")
Structured Streaming
# spark_jobs/streaming.py — real-time pipeline from Kafka
from pyspark.sql.streaming import StreamingQuery
def start_order_stream(spark: SparkSession) -> StreamingQuery:
# Read from Kafka
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest")
.load()
)
# Parse JSON payload
order_stream = (
raw_stream
.select(
F.from_json(
F.col("value").cast("string"),
ORDER_SCHEMA
).alias("data")
)
.select("data.*")
.withWatermark("created_at", "10 minutes") # Late data tolerance
)
# 5-minute tumbling window aggregation
windowed_revenue = (
order_stream
.filter(F.col("status") == "delivered")
.groupBy(
F.window("created_at", "5 minutes"),
"country",
)
.agg(F.sum("total_cents").alias("revenue_cents"))
)
# Write to Delta table with MERGE (upsert)
return (
windowed_revenue.writeStream
.format("delta")
.outputMode("update")
.option("checkpointLocation", "s3://my-bucket/checkpoints/revenue/")
.option("path", "s3://my-bucket/delta/realtime_revenue/")
.trigger(processingTime="1 minute")
.start()
)
For the dbt analytics layer that runs on top of Spark-processed Delta tables, the dbt advanced guide covers dbt models that read from external Spark sources. For the data engineering infrastructure that orchestrates Spark jobs, the Airflow/Prefect guide covers DAG-based Spark job submission. The Claude Skills 360 bundle includes Apache Spark skill sets covering PySpark transformations, window functions, Structured Streaming, and performance tuning. Start with the free tier to try PySpark pipeline generation.