Databricks combines Spark compute, Delta Lake storage, and MLflow tracking into a unified lakehouse platform. Delta Lake adds ACID transactions to Parquet files — MERGE INTO enables upserts without full rewrites. Unity Catalog provides centralized governance across all data and AI assets. Auto Loader incrementally ingests new files from cloud storage with schema inference and evolution. MLflow Model Registry manages model versions from experiment to production. Feature Store ensures consistent feature computation between training and serving. Databricks Asset Bundles (DAB) deploys jobs, workflows, and notebooks via CI/CD. Claude Code generates Delta Lake schemas, Auto Loader pipelines, MLflow tracking code, Feature Store definitions, and DAB deployment configurations.
CLAUDE.md for Databricks Projects
## Databricks Stack
- Runtime: Databricks Runtime >= 15.0 (Spark 3.5, Python 3.11)
- Storage: Delta Lake — always use Delta format, not Parquet directly
- Catalog: Unity Catalog with three-level names: catalog.schema.table
- Ingestion: Auto Loader (cloudFiles) for incremental file ingestion
- Streaming: Structured Streaming with checkpointing to cloud storage
- MLOps: MLflow tracking + Model Registry (built into Databricks)
- CI/CD: Databricks Asset Bundles (databricks.yml) — not manual UI jobs
- Python: use dbutils for secrets, spark.sql() for DDL, Delta API for DML
Delta Lake Operations
# notebooks/delta_operations.py — ACID operations with Delta Lake
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
CATALOG = "prod"
SCHEMA = "orders"
TABLE = f"{CATALOG}.{SCHEMA}.orders"
def create_orders_table() -> None:
"""Create Delta table with schema enforcement."""
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE} (
order_id STRING NOT NULL,
customer_id STRING NOT NULL,
amount DOUBLE,
status STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP,
is_deleted BOOLEAN DEFAULT FALSE
)
USING DELTA
PARTITIONED BY (DATE(created_at))
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
def upsert_orders(new_data: DataFrame) -> None:
"""Merge new/updated orders — idempotent upsert."""
delta_table = DeltaTable.forName(spark, TABLE)
(
delta_table.alias("target")
.merge(
new_data.alias("source"),
"target.order_id = source.order_id"
)
.whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"amount": "source.amount",
"status": "source.status",
"updated_at": "source.updated_at",
}
)
.whenNotMatchedInsertAll()
.execute()
)
def soft_delete_orders(order_ids: list[str]) -> None:
"""Soft delete by setting is_deleted flag."""
delta_table = DeltaTable.forName(spark, TABLE)
(
delta_table.update(
condition=F.col("order_id").isin(order_ids),
set={"is_deleted": F.lit(True), "updated_at": F.current_timestamp()}
)
)
def read_order_history(order_id: str) -> DataFrame:
"""Read all versions of an order using Delta time travel."""
return (
spark.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table(TABLE)
.filter(F.col("order_id") == order_id)
.select("order_id", "status", "amount", "_change_type", "_commit_timestamp")
.orderBy("_commit_timestamp")
)
def vacuum_table(retain_hours: int = 168) -> None:
"""Remove old data files — keep 7 days for time travel."""
spark.sql(f"VACUUM {TABLE} RETAIN {retain_hours} HOURS")
Auto Loader Streaming Ingestion
# notebooks/autoloader_ingestion.py — incremental file ingestion
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
ORDER_SCHEMA = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("amount", DoubleType(), True),
StructField("status", StringType(), True),
StructField("created_at", TimestampType(), True),
])
def start_autoloader_stream(
source_path: str, # s3://my-bucket/raw/orders/
checkpoint_path: str, # s3://my-bucket/checkpoints/orders/
target_table: str,
) -> None:
"""Auto Loader: incrementally ingest new files as they arrive."""
query = (
spark.readStream
.format("cloudFiles") # Auto Loader format
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path + "/schema")
.option("cloudFiles.inferColumnTypes", "true") # Auto-infer types
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") # Handle new fields
.schema(ORDER_SCHEMA)
.load(source_path)
# Add ingestion metadata
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.option("mergeSchema", "true")
.trigger(availableNow=True) # Process all available files, then stop
.toTable(target_table)
)
query.awaitTermination()
def continuous_stream(source_path: str, checkpoint_path: str, target_table: str) -> None:
"""Continuous streaming with micro-batch trigger."""
query = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path + "/schema")
.load(source_path)
.withColumn("processed_at", F.current_timestamp())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_path)
.trigger(processingTime="5 minutes")
.toTable(target_table)
)
return query # Non-blocking — manage lifecycle separately
MLflow with Databricks Model Registry
# ml/train_and_register.py — end-to-end ML with Databricks MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
import numpy as np
# Set experiment in Unity Catalog
mlflow.set_experiment("/Shared/order-churn-prediction")
MODEL_NAME = "prod.ml_models.order_churn_classifier" # UC model registry path
def train_and_register(X_train, y_train, X_test, y_test, params: dict):
"""Train model, log to MLflow, register in Unity Catalog."""
with mlflow.start_run() as run:
mlflow.log_params(params)
model = GradientBoostingClassifier(**params)
# Log training metrics per fold
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring="roc_auc")
for i, score in enumerate(cv_scores):
mlflow.log_metric("cv_auc", score, step=i)
mlflow.log_metric("cv_auc_mean", cv_scores.mean())
mlflow.log_metric("cv_auc_std", cv_scores.std())
# Fit final model
model.fit(X_train, y_train)
# Test metrics
test_auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
mlflow.log_metric("test_auc", test_auc)
# Log with signature for schema validation
signature = mlflow.models.infer_signature(X_train, model.predict_proba(X_train))
mlflow.sklearn.log_model(
model,
name="model",
signature=signature,
input_example=X_train.iloc[:5],
)
# Register in Unity Catalog
mv = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name=MODEL_NAME,
)
print(f"Registered version {mv.version}: test_auc={test_auc:.4f}")
return mv.version
def promote_to_production(version: int, min_auc: float = 0.85):
"""Promote model version if it meets quality threshold."""
client = MlflowClient()
model_version = client.get_model_version(MODEL_NAME, str(version))
run = client.get_run(model_version.run_id)
test_auc = float(run.data.metrics["test_auc"])
if test_auc < min_auc:
raise ValueError(f"AUC {test_auc:.4f} below threshold {min_auc}")
# In Unity Catalog, use aliases instead of stages
client.set_registered_model_alias(MODEL_NAME, "champion", str(version))
print(f"Version {version} promoted to champion (AUC={test_auc:.4f})")
def load_champion_model():
"""Load the current champion model for inference."""
model = mlflow.sklearn.load_model(f"models:/{MODEL_NAME}@champion")
return model
Databricks Asset Bundle (DAB) Config
# databricks.yml — deploy jobs and workflows via CI/CD
bundle:
name: order-platform
variables:
env:
default: dev
targets:
dev:
mode: development
default: true
workspace:
host: https://adb-xxx.azuredatabricks.net
prod:
mode: production
workspace:
host: https://adb-yyy.azuredatabricks.net
resources:
jobs:
order_ingestion:
name: "Order Ingestion - ${var.env}"
schedule:
quartz_cron_expression: "0 0 * * * ?" # Every hour
timezone_id: "UTC"
job_clusters:
- job_cluster_key: main
new_cluster:
spark_version: "15.4.x-scala2.12"
node_type_id: Standard_DS3_v2
num_workers: 4
tasks:
- task_key: autoloader
job_cluster_key: main
notebook_task:
notebook_path: ./notebooks/autoloader_ingestion.py
source: WORKSPACE
libraries:
- pypi:
package: delta-spark==3.2.0
daily_aggregations:
name: "Daily Revenue Aggregations - ${var.env}"
depends_on:
- job_name: order_ingestion
tasks:
- task_key: aggregate
notebook_task:
notebook_path: ./notebooks/daily_aggregations.py
For the Apache Spark standalone clusters outside Databricks when you need portability without managed services, see the Spark guide for standalone cluster job submission. For the dbt analytics engineering layer that transforms data inside Delta Lake into business-ready models, the dbt advanced guide covers model testing and incremental strategies. The Claude Skills 360 bundle includes Databricks skill sets covering Delta Lake, Auto Loader, MLflow integration, and Asset Bundle deployment. Start with the free tier to try Databricks pipeline generation.