dbt (data build tool) brings software engineering practices to analytics: version control, testing, documentation, and modular code. Claude Code accelerates dbt development significantly — generating model SQL, writing comprehensive tests, creating macros, and maintaining documentation that stays in sync with model changes.
This guide covers dbt development with Claude Code: model organization, incremental strategies, testing, macros, and CI/CD.
dbt Project Setup
CLAUDE.md for dbt Projects
## dbt Configuration
- dbt Core 1.8+ with BigQuery adapter (dbt-bigquery)
- Project: analytics
- Profiles: ~/.dbt/profiles.yml (dev: analytics_dev dataset, prod: analytics)
- Model layers: staging → intermediate → marts
- Database conventions:
- staging: `stg_{source}_{entity}` — one-to-one source transformations, no joins
- intermediate: `int_{entity}_{transformation}` — cross-source joins, business logic
- marts: `{domain}_{entity}` — final tables consumed by BI tools and analysts
- Test requirements: all primary keys unique+not_null, all FK relationships tested
- Incremental strategy: merge on primary key for event tables, delete+insert for dimension rebuilds
Model Organization
Create the staging models for our e-commerce data.
Sources: raw.orders, raw.order_items, raw.products, raw.customers.
Apply standard typing and naming, no business logic yet.
-- models/staging/ecommerce/stg_ecommerce__orders.sql
-- Staging layer: rename columns, cast types, no business logic
with source as (
select * from {{ source('ecommerce', 'orders') }}
),
renamed as (
select
-- Primary key
order_id,
-- Foreign keys
customer_id,
shipping_address_id,
-- Timestamps (cast to consistent timezone)
cast(created_at as timestamp) as created_at,
cast(updated_at as timestamp) as updated_at,
cast(shipped_at as timestamp) as shipped_at,
cast(delivered_at as timestamp) as delivered_at,
-- Dimensions
lower(trim(status)) as status,
lower(trim(channel)) as acquisition_channel,
-- Measures (keep in cents to avoid float precision)
subtotal_cents,
shipping_cents,
tax_cents,
discount_cents,
total_cents,
-- Metadata
_ingested_at
from source
where
-- Remove test orders
customer_id not in (select customer_id from {{ ref('seed_test_customers') }})
)
select * from renamed
-- models/staging/ecommerce/_stg_ecommerce__sources.yml
version: 2
sources:
- name: ecommerce
database: raw_data
schema: ecommerce
description: "Raw e-commerce operational data from PostgreSQL via Fivetran"
freshness:
warn_after: {count: 24, period: hour}
error_after: {count: 48, period: hour}
loaded_at_field: _ingested_at
tables:
- name: orders
description: "Customer orders"
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
Intermediate Models — Business Logic
Build an intermediate model that calculates order-level metrics:
order value, item count, days to ship, and whether the order had a discount.
-- models/intermediate/ecommerce/int_orders__enriched.sql
with orders as (
select * from {{ ref('stg_ecommerce__orders') }}
),
order_items as (
select
order_id,
count(*) as line_item_count,
count(distinct product_id) as unique_product_count,
sum(quantity) as total_quantity
from {{ ref('stg_ecommerce__order_items') }}
group by order_id
),
final as (
select
-- Order identifiers
o.order_id,
o.customer_id,
o.status,
o.acquisition_channel,
o.created_at,
o.shipped_at,
o.delivered_at,
-- Order value
round(o.total_cents / 100.0, 2) as total_amount,
round(o.subtotal_cents / 100.0, 2) as subtotal_amount,
round(o.discount_cents / 100.0, 2) as discount_amount,
o.discount_cents > 0 as had_discount,
-- Fulfillment timing
oi.line_item_count,
oi.unique_product_count,
oi.total_quantity,
-- Ship time in hours (null if not yet shipped)
timestamp_diff(o.shipped_at, o.created_at, hour) as hours_to_ship,
-- Delivery time in days (null if not yet delivered)
timestamp_diff(o.delivered_at, o.created_at, day) as days_to_deliver,
-- Order cohort (for retention analysis)
date_trunc(o.created_at, month) as order_month
from orders o
left join order_items oi using (order_id)
)
select * from final
Incremental Models for Event Data
We have a page_events table with 500M+ rows.
Build an incremental model that only processes new events.
-- models/intermediate/analytics/int_page_events__sessions.sql
-- Incremental: only process events from last 3 days on each run
-- (overlap handles late-arriving events)
{{
config(
materialized='incremental',
unique_key='session_id',
incremental_strategy='merge',
partition_by={
"field": "session_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=['user_id', 'session_date'],
on_schema_change='append_new_columns'
)
}}
with events as (
select * from {{ ref('stg_analytics__page_events') }}
{% if is_incremental() %}
-- Lookback window for late-arriving events
where event_timestamp >= timestamp_sub(
(select max(session_date) from {{ this }}),
interval 3 day
)
{% endif %}
),
-- Session segmentation: new session if > 30 min gap
session_boundaries as (
select
*,
case
when timestamp_diff(
event_timestamp,
lag(event_timestamp) over (partition by user_id order by event_timestamp),
minute
) > 30
or lag(event_timestamp) over (partition by user_id order by event_timestamp) is null
then 1
else 0
end as is_new_session
from events
),
session_labeled as (
select
*,
-- Generate session ID from user + session start time
concat(
user_id, '-',
format_timestamp('%Y%m%d%H%M%S',
first_value(event_timestamp) over (
partition by user_id, sum(is_new_session) over (partition by user_id order by event_timestamp)
order by event_timestamp
)
)
) as session_id,
date(event_timestamp) as session_date
from session_boundaries
),
session_metrics as (
select
session_id,
user_id,
session_date,
min(event_timestamp) as session_start,
max(event_timestamp) as session_end,
timestamp_diff(max(event_timestamp), min(event_timestamp), second) as session_duration_seconds,
count(*) as page_views,
count(distinct page_url) as unique_pages,
first_value(page_url) over (partition by session_id order by event_timestamp) as landing_page,
last_value(page_url) over (partition by session_id order by event_timestamp
rows between unbounded preceding and unbounded following) as exit_page,
logical_or(event_type = 'conversion') as had_conversion,
countif(event_type = 'conversion') as conversion_count
from session_labeled
group by session_id, user_id, session_date
)
select * from session_metrics
Testing Strategy
Write a comprehensive test suite for the orders mart.
Cover data quality, business logic, and SLA assertions.
# models/marts/ecommerce/_marts__orders.yml
version: 2
models:
- name: orders
description: "Order-level fact table — source of truth for all order metrics"
tests:
# Model-level tests
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
name: orders_no_negative_totals
- dbt_utils.expression_is_true:
expression: "subtotal_amount <= total_amount + 0.01"
name: orders_subtotal_not_greater_than_total
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_id
- name: status
tests:
- not_null
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
- name: total_amount
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000 # Flag anomalously large orders
- name: hours_to_ship
tests:
- dbt_utils.accepted_range:
min_value: 0
max_value: 720 # 30 days max — flag if longer
where: "status in ('shipped', 'delivered')"
Custom generic test for business rules:
-- tests/generic/assert_no_shipped_orders_missing_tracking.sql
{% test assert_no_shipped_orders_missing_tracking(model) %}
select *
from {{ model }}
where status = 'shipped'
and tracking_number is null
{% endtest %}
-- Applied in schema.yml:
-- - name: orders
-- tests:
-- - assert_no_shipped_orders_missing_tracking
Macros for DRY SQL
We calculate "active customer" status in 5 different models.
Extract it into a reusable macro.
-- macros/ecommerce/is_active_customer.sql
{% macro is_active_customer(order_date_column, lookback_days=90) %}
(
{{ order_date_column }} >= date_sub(current_date(), interval {{ lookback_days }} day)
and {{ order_date_column }} <= current_date()
)
{% endmacro %}
-- Usage in any model:
-- where {{ is_active_customer('last_order_date') }}
-- where {{ is_active_customer('last_order_date', lookback_days=180) }}
Macro for generating pivot columns:
-- macros/pivot_channels.sql
{% macro get_acquisition_channels() %}
{% set query %}
select distinct acquisition_channel
from {{ ref('stg_ecommerce__orders') }}
order by 1
{% endset %}
{% set results = run_query(query) %}
{% if execute %}
{{ return(results.columns[0].values()) }}
{% else %}
{{ return([]) }}
{% endif %}
{% endmacro %}
-- Usage in a revenue-by-channel pivot:
select
order_month,
{% for channel in get_acquisition_channels() %}
sum(case when acquisition_channel = '{{ channel }}' then total_amount else 0 end) as revenue_{{ channel | replace('-', '_') }}
{% if not loop.last %},{% endif %}
{% endfor %}
from {{ ref('int_orders__enriched') }}
group by order_month
dbt Cloud CI
Set up CI that runs tests on modified models only
and fails the PR if any test breaks.
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
pull_request:
paths:
- 'models/**'
- 'tests/**'
- 'macros/**'
- 'dbt_project.yml'
jobs:
dbt-check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dbt
run: pip install dbt-bigquery==1.8.0 dbt-utils
- name: Configure BigQuery credentials
run: |
echo '${{ secrets.GCP_SA_KEY }}' > /tmp/gcp-key.json
- name: Write dbt profiles
run: |
mkdir -p ~/.dbt
cat > ~/.dbt/profiles.yml <<EOF
analytics:
target: ci
outputs:
ci:
type: bigquery
method: service-account
project: mycompany-data
dataset: analytics_ci_${{ github.run_id }}
keyfile: /tmp/gcp-key.json
threads: 4
EOF
- name: Install dbt dependencies
run: dbt deps
- name: Get changed models
id: changed
run: |
# Find models changed in this PR
CHANGED=$(git diff --name-only origin/${{ github.base_ref }} HEAD | \
grep '^models/' | \
sed 's|models/||;s|\.sql||;s|/|.|g' | \
tr '\n' ' ')
echo "models=$CHANGED" >> $GITHUB_OUTPUT
- name: Run dbt for changed models + downstream
run: |
# Build changed models and all downstream dependencies
dbt run --select "${{ steps.changed.outputs.models }}+" --target ci
- name: Run tests for changed models
run: |
dbt test --select "${{ steps.changed.outputs.models }}+" --target ci
- name: Cleanup CI dataset
if: always()
run: |
bq rm -r --force mycompany-data:analytics_ci_${{ github.run_id }}
For the data pipeline foundations including Kafka for event streaming into your warehouse, see the Kafka guide. For orchestrating dbt runs with Python-based workflows, the data engineering guide covers Airflow and Prefect integration patterns. The Claude Skills 360 bundle includes dbt skill sets for model templates, testing patterns, and Git-backed CI workflows. Start with the free tier to try dbt model generation.