Claude Code for Data Engineering: ETL Pipelines, dbt, and Data Workflows — Claude Skills 360 Blog
Blog / Development / Claude Code for Data Engineering: ETL Pipelines, dbt, and Data Workflows
Development

Claude Code for Data Engineering: ETL Pipelines, dbt, and Data Workflows

Published: May 23, 2026
Read time: 9 min read
By: Claude Skills 360

Data pipelines have specific failure modes: schema drift from upstream sources, duplicate records from retry logic, slowly changing dimensions, and orchestration dependencies that fail silently. Claude Code is effective for data engineering because it understands these patterns and generates pipeline code that handles them correctly — not just the happy path.

This guide covers data engineering with Claude Code: ETL pipelines in Python, dbt models, Airflow DAGs, data warehouse queries, and pipeline debugging.

Setting Up Claude Code for Data Engineering

Data stack context is critical:

# Data Engineering Context

## Stack
- Warehouse: Snowflake (via sqlalchemy + snowflake-sqlalchemy)
- Transformation: dbt Core 1.7
- Orchestration: Airflow 2.8 (Astronomer Cosmos for dbt)
- Ingestion: custom Python + Singer taps
- Language: Python 3.12

## Data Conventions
- Source tables: raw.{source}.{table_name} (immutable, append-only)
- Staging: stg_{source}__{table_name} (cleaned, renamed)
- Intermediate: int_{description} (joined, aggregated)
- Marts: {domain}/{entity}_{metric_type} (e.g., sales/orders_daily)
- Surrogate keys: dbt_utils.generate_surrogate_key on natural keys
- All timestamps: UTC, stored as TIMESTAMP_TZ

## Never
- Modify raw tables
- Use SELECT * in any model (always explicit columns)
- Hardcode dates in models — use dbt variables or macros

See the CLAUDE.md guide for complete configuration.

ETL Pipeline Development

Extraction with Change Data Capture

Write a Python extractor for a PostgreSQL source.
Pull only records changed since the last run.
Handle: new rows (inserted_at > last_run), 
updates (updated_at > last_run), 
soft deletes (deleted_at IS NOT NULL AND deleted_at > last_run).
from datetime import datetime, timezone
from typing import Iterator
import psycopg2
from psycopg2.extras import RealDictCursor

def extract_changed_records(
    conn_string: str,
    table: str,
    last_run: datetime,
    batch_size: int = 10_000,
) -> Iterator[list[dict]]:
    """
    Yields batches of records changed since last_run.
    Handles inserts, updates, and soft deletes.
    """
    query = """
        SELECT *
        FROM {table}
        WHERE 
            inserted_at > %(last_run)s
            OR updated_at > %(last_run)s
            OR (deleted_at IS NOT NULL AND deleted_at > %(last_run)s)
        ORDER BY GREATEST(
            COALESCE(inserted_at, '1970-01-01'),
            COALESCE(updated_at, '1970-01-01'),
            COALESCE(deleted_at, '1970-01-01')
        ) ASC
    """.format(table=table)  # table name not user-controlled, safe to format
    
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            cur.execute(query, {'last_run': last_run})
            
            while True:
                batch = cur.fetchmany(batch_size)
                if not batch:
                    break
                yield [dict(row) for row in batch]

def run_extraction(table: str, state_path: str) -> int:
    """Returns count of extracted records."""
    # Load last watermark
    last_run = load_watermark(state_path) or datetime(1970, 1, 1, tzinfo=timezone.utc)
    new_watermark = datetime.now(timezone.utc)
    
    total = 0
    for batch in extract_changed_records(SOURCE_CONN, table, last_run):
        load_to_warehouse(batch, table)
        total += len(batch)
    
    # Only save watermark after successful load
    save_watermark(state_path, new_watermark)
    
    return total

The watermark save happens only after the successful load — if the load fails, the next run will re-extract from the previous watermark. This ensures at-least-once delivery (no records skipped), with idempotent upsert in the load step handling duplicates.

Idempotent Loading

Write the load step that handles duplicates correctly.
Source has natural keys. Re-running shouldn't create duplicates.
We're loading to Snowflake.
def load_to_warehouse(records: list[dict], table: str, natural_keys: list[str]) -> None:
    """
    Upsert records to Snowflake staging table.
    Uses MERGE to handle duplicates idempotently.
    """
    # Stage in a temp table first
    stage_table = f"raw.stage.{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    engine = get_snowflake_engine()
    
    with engine.begin() as conn:
        # Write batch to temp staging table
        pd.DataFrame(records).to_sql(
            stage_table.split('.')[-1],
            conn,
            schema='raw.stage',
            if_exists='replace',
            index=False,
        )
        
        # MERGE from stage into target
        natural_key_join = ' AND '.join(
            f't.{k} = s.{k}' for k in natural_keys
        )
        
        conn.execute(f"""
            MERGE INTO raw.{table} t
            USING {stage_table} s
            ON {natural_key_join}
            WHEN MATCHED AND s._extracted_at > t._extracted_at THEN
                UPDATE SET *
            WHEN NOT MATCHED THEN
                INSERT (*)
                VALUES (*)
        """)
        
        # Cleanup staging table
        conn.execute(f"DROP TABLE IF EXISTS {stage_table}")

MERGE handles both insert (new record) and update (existing record with newer data). The s._extracted_at > t._extracted_at condition prevents overwriting newer records with older extracts on out-of-order delivery.

dbt Models

Staging Models

Write a staging model for the raw orders table.
Clean: rename columns, standardize types, 
add surrogate key, add metadata columns.
Our source has inconsistent timestamps (some NULL, some zero).
-- models/staging/stg_postgres__orders.sql
with source as (
    select * from {{ source('postgres', 'orders') }}
),

renamed as (
    select
        -- Keys
        id::varchar as order_id,
        user_id::varchar as user_id,
        
        -- Attributes
        status,
        lower(trim(currency)) as currency,  -- Normalize upstream inconsistencies
        
        -- Amounts (source stores as float, normalize to integer cents)
        round(total_amount * 100)::integer as total_cents,
        
        -- Timestamps (handle NULL and zero values from source)
        case 
            when created_at = '1970-01-01 00:00:00' then null
            else created_at::timestamp_tz
        end as created_at,
        
        nullif(updated_at, '1970-01-01 00:00:00')::timestamp_tz as updated_at,
        deleted_at::timestamp_tz as deleted_at,
        
        -- Metadata
        _extracted_at,
        
    from source
)

select
    {{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_sk,
    *,
    current_timestamp() as _dbt_updated_at
from renamed

The case when created_at = '1970-01-01' pattern handles Unix epoch zero values that some systems use for “no date”. nullif is cleaner for the update case. Claude generates these defensive patterns for real-world messy source data.

Incremental Models

Build an incremental model for daily order aggregates.
Only process new/updated orders since last run.
Handle late-arriving data: if an order from 3 days ago
gets updated, update the 3-day-old aggregate.
-- models/marts/sales/orders_daily.sql
{{ config(
    materialized='incremental',
    unique_key='order_date',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
) }}

with orders as (
    select * from {{ ref('stg_postgres__orders') }}
    where deleted_at is null

    {% if is_incremental() %}
        -- Process orders from last 3 days to catch late arrivals
        and date(coalesce(updated_at, created_at)) >= dateadd(day, -3, current_date())
    {% endif %}
),

daily_agg as (
    select
        date(created_at) as order_date,
        count(*) as order_count,
        count(distinct user_id) as unique_customers,
        sum(total_cents) as revenue_cents,
        avg(total_cents) as avg_order_value_cents
    from orders
    group by 1
)

select * from daily_agg

The -3 days lookback window catches orders with updated_at later than created_at (payments confirmed next day, refunds applied later). Without this, late-arriving updates would be silently missed.

dbt Tests

Add data quality tests for the orders staging model.
Test: order_id is unique and not null, status has valid values,
total_cents is positive, created_at is not in the future.
# models/staging/schema.yml
version: 2

models:
  - name: stg_postgres__orders
    description: "Cleaned and renamed orders from PostgreSQL source"
    
    columns:
      - name: order_id
        description: "Natural key from source system"
        tests:
          - unique
          - not_null
      
      - name: order_sk
        tests:
          - unique
          - not_null
      
      - name: status
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
      
      - name: total_cents
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "total_cents >= 0"
              name: total_cents_non_negative
      
      - name: created_at
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "created_at <= current_timestamp()"
              name: created_at_not_in_future

Custom dbt Macros

Write a dbt macro that generates a slowly changing dimension (SCD Type 2).
Given a source table, create snapshot logic with:
valid_from, valid_to, is_current flags.

dbt has native snapshot support:

-- snapshots/snap_users.sql
{% snapshot snap_users %}

{{
    config(
        target_schema='snapshots',
        unique_key='user_id',
        strategy='timestamp',
        updated_at='updated_at',
        invalidate_hard_deletes=True,
    )
}}

select * from {{ source('postgres', 'users') }}

{% endsnapshot %}

Claude configures invalidate_hard_deletes=True (marks records as deleted when source row disappears, rather than leaving stale current rows) and explains the timestamp vs check strategy tradeoff.

Airflow DAG Authoring

Write an Airflow DAG that:
1. Extracts from PostgreSQL (parametrized table list)
2. Validates record counts
3. Runs dbt transformations
4. Sends Slack notification on success or failure
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from cosmos import DbtDag, ProjectConfig, RenderConfig, ExecutionConfig

TABLES_TO_EXTRACT = ['orders', 'users', 'products', 'payments']

@dag(
    dag_id='daily_etl',
    start_date=datetime(2026, 1, 1),
    schedule='0 2 * * *',  # 2am UTC daily
    catchup=False,
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': False,
    },
    tags=['etl', 'daily'],
)
def daily_etl():
    
    @task
    def extract_table(table: str) -> dict:
        count = run_extraction(table, state_path=f'/tmp/{table}.state')
        return {'table': table, 'count': count}
    
    @task
    def validate_counts(results: list[dict]) -> None:
        for r in results:
            if r['count'] == 0:
                raise ValueError(f"Zero records extracted for {r['table']} -- possible source issue")
    
    @task(trigger_rule='all_done')
    def notify(results: list[dict], **context) -> None:
        dag_run = context['dag_run']
        state = 'succeeded' if dag_run.state == 'success' else 'FAILED'
        
        total = sum(r['count'] for r in results)
        
        SlackWebhookHook(slack_webhook_conn_id='slack_alerts').send(
            text=f"Daily ETL {state}: {total:,} records extracted at {datetime.now():%Y-%m-%d %H:%M}"
        )
    
    # Fan-out extractions (parallel)
    extract_results = extract_table.expand(table=TABLES_TO_EXTRACT)
    validate = validate_counts(results=extract_results)
    notification = notify(results=extract_results)
    
    # dbt runs after extraction
    validate >> dbt_task_group >> notification

dag = daily_etl()

trigger_rule='all_done' on the notification task means it runs whether the DAG succeeded or failed. expand() creates parallel extract tasks from the list — TaskFlow API fan-out.

Pipeline Debugging

My dbt model run failed with:
"SQL execution failed. Statement: [model.my_project.int_order_items]
Error: Division by zero"

Claude identifies: the model has an expression like revenue / order_count without a null/zero check. It adds:

-- Before:
avg_order_value as (revenue_cents / order_count)

-- After:
avg_order_value as (
    case 
        when order_count > 0 then revenue_cents / order_count
        else null
    end
)

For Airflow scheduling errors, task failures, and sensor timeouts, Claude reads the error log and task context to identify: upstream dependency failures, XCom serialization issues, connection pool exhaustion, and operator configuration problems.

Data Engineering with Claude Code

Data engineering has high value from Claude Code because:

  1. SQL complexity — window functions, CTEs, incremental logic are tedious to write correctly
  2. Defensive patterns — handling NULL, zero-date, late-arriving data requires knoweldge of common upstream data quality issues
  3. Configuration — Airflow, dbt, and Snowflake each have configuration options that affect correctness

For database optimization patterns that apply to warehouse queries, see the database guide. For testing data pipelines, the testing guide covers assertion patterns that work for data validation. The Claude Skills 360 bundle includes data engineering skill sets for dbt, Airflow, and warehouse SQL. Start with the free tier to try the ETL and dbt patterns.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free