Data pipelines transform raw operational data into analytics-ready datasets. The modern data stack combines dbt for SQL transformations, Dagster or Airflow for orchestration, and Great Expectations or dbt tests for data quality. Claude Code generates dbt models with proper incremental strategies, Dagster asset graphs, data quality test suites, and the Python extraction code that lands data into your warehouse.
CLAUDE.md for Data Pipeline Projects
## Data Stack
- Warehouse: Snowflake (production) / DuckDB (local development)
- Transformation: dbt 1.8+ with dbt-core
- Orchestration: Dagster 1.x (asset-based, not op-based)
- Data quality: dbt tests (built-in + dbt-expectations) + Great Expectations for source checks
- Extraction: Python with SQLAlchemy (OLTP sources) + Fivetran/Airbyte (SaaS sources)
- Source schema: raw schema (landing zone) → staging → marts
- Incremental: default to incremental with unique_key; full refresh for small dimensions
- Testing: every model has not_null + unique tests on PK; relationships on FKs
Python Extraction (OLTP → Data Warehouse)
# pipelines/extractors/orders_extractor.py
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
import pandas as pd
from typing import Iterator
logger = logging.getLogger(__name__)
class OrdersExtractor:
"""Extract orders from PostgreSQL OLTP database → Snowflake raw schema."""
def __init__(self, source_url: str, target_url: str):
self.source = create_engine(source_url)
self.target = create_engine(target_url)
def get_last_extracted_at(self) -> datetime | None:
"""Check watermark table for last successful extraction."""
with self.target.connect() as conn:
result = conn.execute(text("""
SELECT max_extracted_at FROM raw.extraction_watermarks
WHERE table_name = 'orders'
""")).fetchone()
return result[0] if result else None
def extract_batch(self, since: datetime | None, batch_size: int = 10_000) -> Iterator[pd.DataFrame]:
"""Extract in batches to handle large datasets without OOM."""
where_clause = "WHERE updated_at > :since" if since else ""
query = f"""
SELECT
id,
customer_id,
status,
total_cents,
currency,
created_at,
updated_at,
metadata::text AS metadata_json,
NOW() AS extracted_at
FROM orders
{where_clause}
ORDER BY updated_at
"""
with self.source.connect() as conn:
for chunk in pd.read_sql(
text(query),
conn,
params={"since": since},
chunksize=batch_size,
):
yield chunk
def load_batch(self, df: pd.DataFrame) -> int:
"""Upsert batch into raw Snowflake table."""
# Create temp table, merge, drop
with self.target.begin() as conn:
df.to_sql(
"orders_staging",
conn,
schema="raw",
if_exists="replace",
index=False,
method="multi",
)
result = conn.execute(text("""
MERGE INTO raw.orders AS target
USING raw.orders_staging AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET
status = source.status,
total_cents = source.total_cents,
updated_at = source.updated_at,
extracted_at = source.extracted_at
WHEN NOT MATCHED THEN INSERT
SELECT * FROM source
"""))
return result.rowcount
def run(self) -> dict:
since = self.get_last_extracted_at()
logger.info(f"Extracting orders since: {since or 'beginning'}")
total_rows = 0
max_updated_at = None
for batch_df in self.extract_batch(since):
rows = self.load_batch(batch_df)
total_rows += rows
batch_max = batch_df['updated_at'].max()
if max_updated_at is None or batch_max > max_updated_at:
max_updated_at = batch_max
# Update watermark
if max_updated_at:
with self.target.begin() as conn:
conn.execute(text("""
INSERT INTO raw.extraction_watermarks (table_name, max_extracted_at)
VALUES ('orders', :max_ts)
ON CONFLICT (table_name) DO UPDATE SET max_extracted_at = :max_ts
"""), {"max_ts": max_updated_at})
logger.info(f"Extracted {total_rows} order rows")
return {"rows_extracted": total_rows, "max_updated_at": max_updated_at}
dbt Models
-- models/staging/stg_orders.sql — standardize raw data
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
cluster_by=['created_date'],
on_schema_change='sync_all_columns'
)
}}
WITH source AS (
SELECT * FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
-- Only process rows updated since last run
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
cleaned AS (
SELECT
id AS order_id,
customer_id,
LOWER(status) AS status,
total_cents,
UPPER(currency) AS currency_code,
total_cents::FLOAT / 100 AS total_amount,
created_at::TIMESTAMP_NTZ AS created_at,
updated_at::TIMESTAMP_NTZ AS updated_at,
DATE(created_at) AS created_date,
TRY_PARSE_JSON(metadata_json) AS metadata,
extracted_at
FROM source
WHERE id IS NOT NULL
AND total_cents >= 0
AND status IN ('pending', 'processing', 'shipped', 'delivered', 'cancelled')
)
SELECT * FROM cleaned
-- models/marts/orders/fct_orders.sql — fact table with business logic
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
cluster_by=['created_date']
)
}}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
),
customers AS (
SELECT * FROM {{ ref('dim_customers') }}
),
order_items AS (
SELECT
order_id,
COUNT(*) AS item_count,
SUM(quantity) AS total_quantity,
COUNT(DISTINCT product_id) AS distinct_products
FROM {{ ref('stg_order_items') }}
GROUP BY 1
),
final AS (
SELECT
o.order_id,
o.customer_id,
c.customer_segment,
c.acquisition_channel,
o.status,
o.currency_code,
o.total_amount,
-- Derived metrics
oi.item_count,
oi.total_quantity,
oi.distinct_products,
-- Date dimensions
o.created_date,
DATE_TRUNC('week', o.created_date) AS created_week,
DATE_TRUNC('month', o.created_date) AS created_month,
-- Status flags
o.status = 'delivered' AS is_delivered,
o.status = 'cancelled' AS is_cancelled,
o.status IN ('pending', 'processing') AS is_active,
o.created_at,
o.updated_at
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
)
SELECT * FROM final
dbt Tests
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
description: "Cleaned and standardized orders from OLTP source"
columns:
- name: order_id
tests:
- not_null
- unique
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: status
tests:
- not_null
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
- name: total_amount
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
- name: created_at
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: timestamp_ntz
Dagster Asset Graph
# pipelines/assets/orders_assets.py
from dagster import (
asset, AssetIn, Output, MetadataValue,
DailyPartitionsDefinition, BackfillPolicy,
)
import pandas as pd
daily_partitions = DailyPartitionsDefinition(start_date="2025-01-01")
@asset(
partitions_def=daily_partitions,
backfill_policy=BackfillPolicy.single_run(),
group_name="raw",
)
def raw_orders(context) -> Output[pd.DataFrame]:
"""Extract orders for the given partition date."""
partition_date = context.partition_key
extractor = OrdersExtractor(
source_url=context.resources.postgres.url,
target_url=context.resources.snowflake.url,
)
result = extractor.run_for_date(partition_date)
return Output(
result["dataframe"],
metadata={
"row_count": MetadataValue.int(result["rows"]),
"partition_date": MetadataValue.text(partition_date),
},
)
@asset(
ins={"raw_orders": AssetIn()},
group_name="staging",
deps=["raw_orders"],
)
def stg_orders(context, raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
"""Run dbt stg_orders model via dbt-dagster integration."""
dbt_output = context.resources.dbt.run(
select="stg_orders",
vars={"partition_date": context.partition_key},
)
row_count = dbt_output.result["results"][0]["adapter_response"]["rows_affected"]
return Output(
None, # dbt manages the table
metadata={"rows_affected": MetadataValue.int(row_count)},
)
For the Apache Spark processing that handles petabyte-scale versions of these pipelines, see the Apache Spark guide for distributed DataFrame transformations. For the dbt advanced patterns including macros and cross-database compatibility, the dbt analytics guide covers model design and macro patterns. The Claude Skills 360 bundle includes data pipeline skill sets covering dbt models, Dagster assets, and incremental extraction patterns. Start with the free tier to try ETL pipeline generation.