Claude Code for Databricks: Unified Lakehouse with Delta Lake and MLflow — Claude Skills 360 Blog
Blog / Data / Claude Code for Databricks: Unified Lakehouse with Delta Lake and MLflow
Data

Claude Code for Databricks: Unified Lakehouse with Delta Lake and MLflow

Published: January 13, 2027
Read time: 10 min read
By: Claude Skills 360

Databricks combines Spark compute, Delta Lake storage, and MLflow tracking into a unified lakehouse platform. Delta Lake adds ACID transactions to Parquet files — MERGE INTO enables upserts without full rewrites. Unity Catalog provides centralized governance across all data and AI assets. Auto Loader incrementally ingests new files from cloud storage with schema inference and evolution. MLflow Model Registry manages model versions from experiment to production. Feature Store ensures consistent feature computation between training and serving. Databricks Asset Bundles (DAB) deploys jobs, workflows, and notebooks via CI/CD. Claude Code generates Delta Lake schemas, Auto Loader pipelines, MLflow tracking code, Feature Store definitions, and DAB deployment configurations.

CLAUDE.md for Databricks Projects

## Databricks Stack
- Runtime: Databricks Runtime >= 15.0 (Spark 3.5, Python 3.11)
- Storage: Delta Lake — always use Delta format, not Parquet directly
- Catalog: Unity Catalog with three-level names: catalog.schema.table
- Ingestion: Auto Loader (cloudFiles) for incremental file ingestion
- Streaming: Structured Streaming with checkpointing to cloud storage
- MLOps: MLflow tracking + Model Registry (built into Databricks)
- CI/CD: Databricks Asset Bundles (databricks.yml) — not manual UI jobs
- Python: use dbutils for secrets, spark.sql() for DDL, Delta API for DML

Delta Lake Operations

# notebooks/delta_operations.py — ACID operations with Delta Lake
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

spark = SparkSession.builder.getOrCreate()

CATALOG = "prod"
SCHEMA = "orders"
TABLE = f"{CATALOG}.{SCHEMA}.orders"


def create_orders_table() -> None:
    """Create Delta table with schema enforcement."""

    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {TABLE} (
            order_id     STRING NOT NULL,
            customer_id  STRING NOT NULL,
            amount       DOUBLE,
            status       STRING,
            created_at   TIMESTAMP,
            updated_at   TIMESTAMP,
            is_deleted   BOOLEAN DEFAULT FALSE
        )
        USING DELTA
        PARTITIONED BY (DATE(created_at))
        TBLPROPERTIES (
            'delta.enableChangeDataFeed' = 'true',
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true'
        )
    """)


def upsert_orders(new_data: DataFrame) -> None:
    """Merge new/updated orders — idempotent upsert."""

    delta_table = DeltaTable.forName(spark, TABLE)

    (
        delta_table.alias("target")
        .merge(
            new_data.alias("source"),
            "target.order_id = source.order_id"
        )
        .whenMatchedUpdate(
            condition="source.updated_at > target.updated_at",
            set={
                "amount": "source.amount",
                "status": "source.status",
                "updated_at": "source.updated_at",
            }
        )
        .whenNotMatchedInsertAll()
        .execute()
    )


def soft_delete_orders(order_ids: list[str]) -> None:
    """Soft delete by setting is_deleted flag."""
    delta_table = DeltaTable.forName(spark, TABLE)

    (
        delta_table.update(
            condition=F.col("order_id").isin(order_ids),
            set={"is_deleted": F.lit(True), "updated_at": F.current_timestamp()}
        )
    )


def read_order_history(order_id: str) -> DataFrame:
    """Read all versions of an order using Delta time travel."""
    return (
        spark.read
        .format("delta")
        .option("readChangeFeed", "true")
        .option("startingVersion", 0)
        .table(TABLE)
        .filter(F.col("order_id") == order_id)
        .select("order_id", "status", "amount", "_change_type", "_commit_timestamp")
        .orderBy("_commit_timestamp")
    )


def vacuum_table(retain_hours: int = 168) -> None:
    """Remove old data files — keep 7 days for time travel."""
    spark.sql(f"VACUUM {TABLE} RETAIN {retain_hours} HOURS")

Auto Loader Streaming Ingestion

# notebooks/autoloader_ingestion.py — incremental file ingestion
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType


ORDER_SCHEMA = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("amount", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("created_at", TimestampType(), True),
])


def start_autoloader_stream(
    source_path: str,          # s3://my-bucket/raw/orders/
    checkpoint_path: str,      # s3://my-bucket/checkpoints/orders/
    target_table: str,
) -> None:
    """Auto Loader: incrementally ingest new files as they arrive."""

    query = (
        spark.readStream
        .format("cloudFiles")                           # Auto Loader format
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", checkpoint_path + "/schema")
        .option("cloudFiles.inferColumnTypes", "true")  # Auto-infer types
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")  # Handle new fields
        .schema(ORDER_SCHEMA)
        .load(source_path)
        # Add ingestion metadata
        .withColumn("_ingested_at", F.current_timestamp())
        .withColumn("_source_file", F.input_file_name())
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .trigger(availableNow=True)     # Process all available files, then stop
        .toTable(target_table)
    )

    query.awaitTermination()


def continuous_stream(source_path: str, checkpoint_path: str, target_table: str) -> None:
    """Continuous streaming with micro-batch trigger."""

    query = (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", checkpoint_path + "/schema")
        .load(source_path)
        .withColumn("processed_at", F.current_timestamp())
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_path)
        .trigger(processingTime="5 minutes")
        .toTable(target_table)
    )

    return query  # Non-blocking — manage lifecycle separately

MLflow with Databricks Model Registry

# ml/train_and_register.py — end-to-end ML with Databricks MLflow
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import cross_val_score
import numpy as np

# Set experiment in Unity Catalog
mlflow.set_experiment("/Shared/order-churn-prediction")

MODEL_NAME = "prod.ml_models.order_churn_classifier"   # UC model registry path


def train_and_register(X_train, y_train, X_test, y_test, params: dict):
    """Train model, log to MLflow, register in Unity Catalog."""

    with mlflow.start_run() as run:
        mlflow.log_params(params)

        model = GradientBoostingClassifier(**params)

        # Log training metrics per fold
        cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring="roc_auc")
        for i, score in enumerate(cv_scores):
            mlflow.log_metric("cv_auc", score, step=i)
        mlflow.log_metric("cv_auc_mean", cv_scores.mean())
        mlflow.log_metric("cv_auc_std", cv_scores.std())

        # Fit final model
        model.fit(X_train, y_train)

        # Test metrics
        test_auc = roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
        mlflow.log_metric("test_auc", test_auc)

        # Log with signature for schema validation
        signature = mlflow.models.infer_signature(X_train, model.predict_proba(X_train))
        mlflow.sklearn.log_model(
            model,
            name="model",
            signature=signature,
            input_example=X_train.iloc[:5],
        )

        # Register in Unity Catalog
        mv = mlflow.register_model(
            model_uri=f"runs:/{run.info.run_id}/model",
            name=MODEL_NAME,
        )

        print(f"Registered version {mv.version}: test_auc={test_auc:.4f}")
        return mv.version


def promote_to_production(version: int, min_auc: float = 0.85):
    """Promote model version if it meets quality threshold."""
    client = MlflowClient()

    model_version = client.get_model_version(MODEL_NAME, str(version))
    run = client.get_run(model_version.run_id)
    test_auc = float(run.data.metrics["test_auc"])

    if test_auc < min_auc:
        raise ValueError(f"AUC {test_auc:.4f} below threshold {min_auc}")

    # In Unity Catalog, use aliases instead of stages
    client.set_registered_model_alias(MODEL_NAME, "champion", str(version))
    print(f"Version {version} promoted to champion (AUC={test_auc:.4f})")


def load_champion_model():
    """Load the current champion model for inference."""
    model = mlflow.sklearn.load_model(f"models:/{MODEL_NAME}@champion")
    return model

Databricks Asset Bundle (DAB) Config

# databricks.yml — deploy jobs and workflows via CI/CD
bundle:
  name: order-platform

variables:
  env:
    default: dev

targets:
  dev:
    mode: development
    default: true
    workspace:
      host: https://adb-xxx.azuredatabricks.net

  prod:
    mode: production
    workspace:
      host: https://adb-yyy.azuredatabricks.net

resources:
  jobs:
    order_ingestion:
      name: "Order Ingestion - ${var.env}"
      schedule:
        quartz_cron_expression: "0 0 * * * ?"  # Every hour
        timezone_id: "UTC"
      job_clusters:
        - job_cluster_key: main
          new_cluster:
            spark_version: "15.4.x-scala2.12"
            node_type_id: Standard_DS3_v2
            num_workers: 4
      tasks:
        - task_key: autoloader
          job_cluster_key: main
          notebook_task:
            notebook_path: ./notebooks/autoloader_ingestion.py
            source: WORKSPACE
          libraries:
            - pypi:
                package: delta-spark==3.2.0

    daily_aggregations:
      name: "Daily Revenue Aggregations - ${var.env}"
      depends_on:
        - job_name: order_ingestion
      tasks:
        - task_key: aggregate
          notebook_task:
            notebook_path: ./notebooks/daily_aggregations.py

For the Apache Spark standalone clusters outside Databricks when you need portability without managed services, see the Spark guide for standalone cluster job submission. For the dbt analytics engineering layer that transforms data inside Delta Lake into business-ready models, the dbt advanced guide covers model testing and incremental strategies. The Claude Skills 360 bundle includes Databricks skill sets covering Delta Lake, Auto Loader, MLflow integration, and Asset Bundle deployment. Start with the free tier to try Databricks pipeline generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free