Data pipelines have specific failure modes: schema drift from upstream sources, duplicate records from retry logic, slowly changing dimensions, and orchestration dependencies that fail silently. Claude Code is effective for data engineering because it understands these patterns and generates pipeline code that handles them correctly — not just the happy path.
This guide covers data engineering with Claude Code: ETL pipelines in Python, dbt models, Airflow DAGs, data warehouse queries, and pipeline debugging.
Setting Up Claude Code for Data Engineering
Data stack context is critical:
# Data Engineering Context
## Stack
- Warehouse: Snowflake (via sqlalchemy + snowflake-sqlalchemy)
- Transformation: dbt Core 1.7
- Orchestration: Airflow 2.8 (Astronomer Cosmos for dbt)
- Ingestion: custom Python + Singer taps
- Language: Python 3.12
## Data Conventions
- Source tables: raw.{source}.{table_name} (immutable, append-only)
- Staging: stg_{source}__{table_name} (cleaned, renamed)
- Intermediate: int_{description} (joined, aggregated)
- Marts: {domain}/{entity}_{metric_type} (e.g., sales/orders_daily)
- Surrogate keys: dbt_utils.generate_surrogate_key on natural keys
- All timestamps: UTC, stored as TIMESTAMP_TZ
## Never
- Modify raw tables
- Use SELECT * in any model (always explicit columns)
- Hardcode dates in models — use dbt variables or macros
See the CLAUDE.md guide for complete configuration.
ETL Pipeline Development
Extraction with Change Data Capture
Write a Python extractor for a PostgreSQL source.
Pull only records changed since the last run.
Handle: new rows (inserted_at > last_run),
updates (updated_at > last_run),
soft deletes (deleted_at IS NOT NULL AND deleted_at > last_run).
from datetime import datetime, timezone
from typing import Iterator
import psycopg2
from psycopg2.extras import RealDictCursor
def extract_changed_records(
conn_string: str,
table: str,
last_run: datetime,
batch_size: int = 10_000,
) -> Iterator[list[dict]]:
"""
Yields batches of records changed since last_run.
Handles inserts, updates, and soft deletes.
"""
query = """
SELECT *
FROM {table}
WHERE
inserted_at > %(last_run)s
OR updated_at > %(last_run)s
OR (deleted_at IS NOT NULL AND deleted_at > %(last_run)s)
ORDER BY GREATEST(
COALESCE(inserted_at, '1970-01-01'),
COALESCE(updated_at, '1970-01-01'),
COALESCE(deleted_at, '1970-01-01')
) ASC
""".format(table=table) # table name not user-controlled, safe to format
with psycopg2.connect(conn_string) as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(query, {'last_run': last_run})
while True:
batch = cur.fetchmany(batch_size)
if not batch:
break
yield [dict(row) for row in batch]
def run_extraction(table: str, state_path: str) -> int:
"""Returns count of extracted records."""
# Load last watermark
last_run = load_watermark(state_path) or datetime(1970, 1, 1, tzinfo=timezone.utc)
new_watermark = datetime.now(timezone.utc)
total = 0
for batch in extract_changed_records(SOURCE_CONN, table, last_run):
load_to_warehouse(batch, table)
total += len(batch)
# Only save watermark after successful load
save_watermark(state_path, new_watermark)
return total
The watermark save happens only after the successful load — if the load fails, the next run will re-extract from the previous watermark. This ensures at-least-once delivery (no records skipped), with idempotent upsert in the load step handling duplicates.
Idempotent Loading
Write the load step that handles duplicates correctly.
Source has natural keys. Re-running shouldn't create duplicates.
We're loading to Snowflake.
def load_to_warehouse(records: list[dict], table: str, natural_keys: list[str]) -> None:
"""
Upsert records to Snowflake staging table.
Uses MERGE to handle duplicates idempotently.
"""
# Stage in a temp table first
stage_table = f"raw.stage.{table}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
engine = get_snowflake_engine()
with engine.begin() as conn:
# Write batch to temp staging table
pd.DataFrame(records).to_sql(
stage_table.split('.')[-1],
conn,
schema='raw.stage',
if_exists='replace',
index=False,
)
# MERGE from stage into target
natural_key_join = ' AND '.join(
f't.{k} = s.{k}' for k in natural_keys
)
conn.execute(f"""
MERGE INTO raw.{table} t
USING {stage_table} s
ON {natural_key_join}
WHEN MATCHED AND s._extracted_at > t._extracted_at THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT (*)
VALUES (*)
""")
# Cleanup staging table
conn.execute(f"DROP TABLE IF EXISTS {stage_table}")
MERGE handles both insert (new record) and update (existing record with newer data). The s._extracted_at > t._extracted_at condition prevents overwriting newer records with older extracts on out-of-order delivery.
dbt Models
Staging Models
Write a staging model for the raw orders table.
Clean: rename columns, standardize types,
add surrogate key, add metadata columns.
Our source has inconsistent timestamps (some NULL, some zero).
-- models/staging/stg_postgres__orders.sql
with source as (
select * from {{ source('postgres', 'orders') }}
),
renamed as (
select
-- Keys
id::varchar as order_id,
user_id::varchar as user_id,
-- Attributes
status,
lower(trim(currency)) as currency, -- Normalize upstream inconsistencies
-- Amounts (source stores as float, normalize to integer cents)
round(total_amount * 100)::integer as total_cents,
-- Timestamps (handle NULL and zero values from source)
case
when created_at = '1970-01-01 00:00:00' then null
else created_at::timestamp_tz
end as created_at,
nullif(updated_at, '1970-01-01 00:00:00')::timestamp_tz as updated_at,
deleted_at::timestamp_tz as deleted_at,
-- Metadata
_extracted_at,
from source
)
select
{{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_sk,
*,
current_timestamp() as _dbt_updated_at
from renamed
The case when created_at = '1970-01-01' pattern handles Unix epoch zero values that some systems use for “no date”. nullif is cleaner for the update case. Claude generates these defensive patterns for real-world messy source data.
Incremental Models
Build an incremental model for daily order aggregates.
Only process new/updated orders since last run.
Handle late-arriving data: if an order from 3 days ago
gets updated, update the 3-day-old aggregate.
-- models/marts/sales/orders_daily.sql
{{ config(
materialized='incremental',
unique_key='order_date',
incremental_strategy='merge',
on_schema_change='append_new_columns'
) }}
with orders as (
select * from {{ ref('stg_postgres__orders') }}
where deleted_at is null
{% if is_incremental() %}
-- Process orders from last 3 days to catch late arrivals
and date(coalesce(updated_at, created_at)) >= dateadd(day, -3, current_date())
{% endif %}
),
daily_agg as (
select
date(created_at) as order_date,
count(*) as order_count,
count(distinct user_id) as unique_customers,
sum(total_cents) as revenue_cents,
avg(total_cents) as avg_order_value_cents
from orders
group by 1
)
select * from daily_agg
The -3 days lookback window catches orders with updated_at later than created_at (payments confirmed next day, refunds applied later). Without this, late-arriving updates would be silently missed.
dbt Tests
Add data quality tests for the orders staging model.
Test: order_id is unique and not null, status has valid values,
total_cents is positive, created_at is not in the future.
# models/staging/schema.yml
version: 2
models:
- name: stg_postgres__orders
description: "Cleaned and renamed orders from PostgreSQL source"
columns:
- name: order_id
description: "Natural key from source system"
tests:
- unique
- not_null
- name: order_sk
tests:
- unique
- not_null
- name: status
tests:
- not_null
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: total_cents
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "total_cents >= 0"
name: total_cents_non_negative
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "created_at <= current_timestamp()"
name: created_at_not_in_future
Custom dbt Macros
Write a dbt macro that generates a slowly changing dimension (SCD Type 2).
Given a source table, create snapshot logic with:
valid_from, valid_to, is_current flags.
dbt has native snapshot support:
-- snapshots/snap_users.sql
{% snapshot snap_users %}
{{
config(
target_schema='snapshots',
unique_key='user_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True,
)
}}
select * from {{ source('postgres', 'users') }}
{% endsnapshot %}
Claude configures invalidate_hard_deletes=True (marks records as deleted when source row disappears, rather than leaving stale current rows) and explains the timestamp vs check strategy tradeoff.
Airflow DAG Authoring
Write an Airflow DAG that:
1. Extracts from PostgreSQL (parametrized table list)
2. Validates record counts
3. Runs dbt transformations
4. Sends Slack notification on success or failure
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from cosmos import DbtDag, ProjectConfig, RenderConfig, ExecutionConfig
TABLES_TO_EXTRACT = ['orders', 'users', 'products', 'payments']
@dag(
dag_id='daily_etl',
start_date=datetime(2026, 1, 1),
schedule='0 2 * * *', # 2am UTC daily
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': False,
},
tags=['etl', 'daily'],
)
def daily_etl():
@task
def extract_table(table: str) -> dict:
count = run_extraction(table, state_path=f'/tmp/{table}.state')
return {'table': table, 'count': count}
@task
def validate_counts(results: list[dict]) -> None:
for r in results:
if r['count'] == 0:
raise ValueError(f"Zero records extracted for {r['table']} -- possible source issue")
@task(trigger_rule='all_done')
def notify(results: list[dict], **context) -> None:
dag_run = context['dag_run']
state = 'succeeded' if dag_run.state == 'success' else 'FAILED'
total = sum(r['count'] for r in results)
SlackWebhookHook(slack_webhook_conn_id='slack_alerts').send(
text=f"Daily ETL {state}: {total:,} records extracted at {datetime.now():%Y-%m-%d %H:%M}"
)
# Fan-out extractions (parallel)
extract_results = extract_table.expand(table=TABLES_TO_EXTRACT)
validate = validate_counts(results=extract_results)
notification = notify(results=extract_results)
# dbt runs after extraction
validate >> dbt_task_group >> notification
dag = daily_etl()
trigger_rule='all_done' on the notification task means it runs whether the DAG succeeded or failed. expand() creates parallel extract tasks from the list — TaskFlow API fan-out.
Pipeline Debugging
My dbt model run failed with:
"SQL execution failed. Statement: [model.my_project.int_order_items]
Error: Division by zero"
Claude identifies: the model has an expression like revenue / order_count without a null/zero check. It adds:
-- Before:
avg_order_value as (revenue_cents / order_count)
-- After:
avg_order_value as (
case
when order_count > 0 then revenue_cents / order_count
else null
end
)
For Airflow scheduling errors, task failures, and sensor timeouts, Claude reads the error log and task context to identify: upstream dependency failures, XCom serialization issues, connection pool exhaustion, and operator configuration problems.
Data Engineering with Claude Code
Data engineering has high value from Claude Code because:
- SQL complexity — window functions, CTEs, incremental logic are tedious to write correctly
- Defensive patterns — handling NULL, zero-date, late-arriving data requires knoweldge of common upstream data quality issues
- Configuration — Airflow, dbt, and Snowflake each have configuration options that affect correctness
For database optimization patterns that apply to warehouse queries, see the database guide. For testing data pipelines, the testing guide covers assertion patterns that work for data validation. The Claude Skills 360 bundle includes data engineering skill sets for dbt, Airflow, and warehouse SQL. Start with the free tier to try the ETL and dbt patterns.