Claude Code for Data Pipelines: ETL, dbt, and Orchestration Patterns — Claude Skills 360 Blog
Blog / Data / Claude Code for Data Pipelines: ETL, dbt, and Orchestration Patterns
Data

Claude Code for Data Pipelines: ETL, dbt, and Orchestration Patterns

Published: December 4, 2026
Read time: 10 min read
By: Claude Skills 360

Data pipelines transform raw operational data into analytics-ready datasets. The modern data stack combines dbt for SQL transformations, Dagster or Airflow for orchestration, and Great Expectations or dbt tests for data quality. Claude Code generates dbt models with proper incremental strategies, Dagster asset graphs, data quality test suites, and the Python extraction code that lands data into your warehouse.

CLAUDE.md for Data Pipeline Projects

## Data Stack
- Warehouse: Snowflake (production) / DuckDB (local development)
- Transformation: dbt 1.8+ with dbt-core
- Orchestration: Dagster 1.x (asset-based, not op-based)
- Data quality: dbt tests (built-in + dbt-expectations) + Great Expectations for source checks
- Extraction: Python with SQLAlchemy (OLTP sources) + Fivetran/Airbyte (SaaS sources)
- Source schema: raw schema (landing zone) → staging → marts
- Incremental: default to incremental with unique_key; full refresh for small dimensions
- Testing: every model has not_null + unique tests on PK; relationships on FKs

Python Extraction (OLTP → Data Warehouse)

# pipelines/extractors/orders_extractor.py
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
import pandas as pd
from typing import Iterator

logger = logging.getLogger(__name__)

class OrdersExtractor:
    """Extract orders from PostgreSQL OLTP database → Snowflake raw schema."""
    
    def __init__(self, source_url: str, target_url: str):
        self.source = create_engine(source_url)
        self.target = create_engine(target_url)
    
    def get_last_extracted_at(self) -> datetime | None:
        """Check watermark table for last successful extraction."""
        with self.target.connect() as conn:
            result = conn.execute(text("""
                SELECT max_extracted_at FROM raw.extraction_watermarks
                WHERE table_name = 'orders'
            """)).fetchone()
        return result[0] if result else None
    
    def extract_batch(self, since: datetime | None, batch_size: int = 10_000) -> Iterator[pd.DataFrame]:
        """Extract in batches to handle large datasets without OOM."""
        where_clause = "WHERE updated_at > :since" if since else ""
        query = f"""
            SELECT
                id,
                customer_id,
                status,
                total_cents,
                currency,
                created_at,
                updated_at,
                metadata::text AS metadata_json,
                NOW() AS extracted_at
            FROM orders
            {where_clause}
            ORDER BY updated_at
        """
        
        with self.source.connect() as conn:
            for chunk in pd.read_sql(
                text(query),
                conn,
                params={"since": since},
                chunksize=batch_size,
            ):
                yield chunk
    
    def load_batch(self, df: pd.DataFrame) -> int:
        """Upsert batch into raw Snowflake table."""
        # Create temp table, merge, drop
        with self.target.begin() as conn:
            df.to_sql(
                "orders_staging",
                conn,
                schema="raw",
                if_exists="replace",
                index=False,
                method="multi",
            )
            
            result = conn.execute(text("""
                MERGE INTO raw.orders AS target
                USING raw.orders_staging AS source
                ON target.id = source.id
                WHEN MATCHED THEN UPDATE SET
                    status = source.status,
                    total_cents = source.total_cents,
                    updated_at = source.updated_at,
                    extracted_at = source.extracted_at
                WHEN NOT MATCHED THEN INSERT
                    SELECT * FROM source
            """))
            
            return result.rowcount
    
    def run(self) -> dict:
        since = self.get_last_extracted_at()
        logger.info(f"Extracting orders since: {since or 'beginning'}")
        
        total_rows = 0
        max_updated_at = None
        
        for batch_df in self.extract_batch(since):
            rows = self.load_batch(batch_df)
            total_rows += rows
            batch_max = batch_df['updated_at'].max()
            if max_updated_at is None or batch_max > max_updated_at:
                max_updated_at = batch_max
        
        # Update watermark
        if max_updated_at:
            with self.target.begin() as conn:
                conn.execute(text("""
                    INSERT INTO raw.extraction_watermarks (table_name, max_extracted_at)
                    VALUES ('orders', :max_ts)
                    ON CONFLICT (table_name) DO UPDATE SET max_extracted_at = :max_ts
                """), {"max_ts": max_updated_at})
        
        logger.info(f"Extracted {total_rows} order rows")
        return {"rows_extracted": total_rows, "max_updated_at": max_updated_at}

dbt Models

-- models/staging/stg_orders.sql — standardize raw data
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        cluster_by=['created_date'],
        on_schema_change='sync_all_columns'
    )
}}

WITH source AS (
    SELECT * FROM {{ source('raw', 'orders') }}
    {% if is_incremental() %}
        -- Only process rows updated since last run
        WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),

cleaned AS (
    SELECT
        id                              AS order_id,
        customer_id,
        LOWER(status)                   AS status,
        total_cents,
        UPPER(currency)                 AS currency_code,
        total_cents::FLOAT / 100        AS total_amount,
        created_at::TIMESTAMP_NTZ       AS created_at,
        updated_at::TIMESTAMP_NTZ       AS updated_at,
        DATE(created_at)                AS created_date,
        TRY_PARSE_JSON(metadata_json)   AS metadata,
        extracted_at
    FROM source
    WHERE id IS NOT NULL
        AND total_cents >= 0
        AND status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')
)

SELECT * FROM cleaned
-- models/marts/orders/fct_orders.sql — fact table with business logic
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge',
        cluster_by=['created_date']
    )
}}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    {% if is_incremental() %}
        WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
    {% endif %}
),

customers AS (
    SELECT * FROM {{ ref('dim_customers') }}
),

order_items AS (
    SELECT
        order_id,
        COUNT(*)                        AS item_count,
        SUM(quantity)                   AS total_quantity,
        COUNT(DISTINCT product_id)      AS distinct_products
    FROM {{ ref('stg_order_items') }}
    GROUP BY 1
),

final AS (
    SELECT
        o.order_id,
        o.customer_id,
        c.customer_segment,
        c.acquisition_channel,
        o.status,
        o.currency_code,
        o.total_amount,
        
        -- Derived metrics
        oi.item_count,
        oi.total_quantity,
        oi.distinct_products,
        
        -- Date dimensions
        o.created_date,
        DATE_TRUNC('week', o.created_date)    AS created_week,
        DATE_TRUNC('month', o.created_date)   AS created_month,
        
        -- Status flags
        o.status = 'delivered'                 AS is_delivered,
        o.status = 'cancelled'                 AS is_cancelled,
        o.status IN ('pending', 'processing')  AS is_active,
        
        o.created_at,
        o.updated_at
        
    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.customer_id
    LEFT JOIN order_items oi ON o.order_id = oi.order_id
)

SELECT * FROM final

dbt Tests

# models/staging/schema.yml
version: 2

models:
  - name: stg_orders
    description: "Cleaned and standardized orders from OLTP source"
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id
      
      - name: status
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
      
      - name: total_amount
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000
      
      - name: created_at
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: timestamp_ntz

Dagster Asset Graph

# pipelines/assets/orders_assets.py
from dagster import (
    asset, AssetIn, Output, MetadataValue,
    DailyPartitionsDefinition, BackfillPolicy,
)
import pandas as pd

daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")

@asset(
    partitions_def=daily_partitions,
    backfill_policy=BackfillPolicy.single_run(),
    group_name="raw",
)
def raw_orders(context) -> Output[pd.DataFrame]:
    """Extract orders for the given partition date."""
    partition_date = context.partition_key
    
    extractor = OrdersExtractor(
        source_url=context.resources.postgres.url,
        target_url=context.resources.snowflake.url,
    )
    
    result = extractor.run_for_date(partition_date)
    
    return Output(
        result["dataframe"],
        metadata={
            "row_count": MetadataValue.int(result["rows"]),
            "partition_date": MetadataValue.text(partition_date),
        },
    )

@asset(
    ins={"raw_orders": AssetIn()},
    group_name="staging",
    deps=["raw_orders"],
)
def stg_orders(context, raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
    """Run dbt stg_orders model via dbt-dagster integration."""
    dbt_output = context.resources.dbt.run(
        select="stg_orders",
        vars={"partition_date": context.partition_key},
    )
    row_count = dbt_output.result["results"][0]["adapter_response"]["rows_affected"]
    
    return Output(
        None,  # dbt manages the table
        metadata={"rows_affected": MetadataValue.int(row_count)},
    )

For the Apache Spark processing that handles petabyte-scale versions of these pipelines, see the Apache Spark guide for distributed DataFrame transformations. For the dbt advanced patterns including macros and cross-database compatibility, the dbt analytics guide covers model design and macro patterns. The Claude Skills 360 bundle includes data pipeline skill sets covering dbt models, Dagster assets, and incremental extraction patterns. Start with the free tier to try ETL pipeline generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free