Claude Code for dbt: Analytics Engineering, Testing, and Data Modeling — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for dbt: Analytics Engineering, Testing, and Data Modeling
Data Engineering

Claude Code for dbt: Analytics Engineering, Testing, and Data Modeling

Published: September 7, 2026
Read time: 9 min read
By: Claude Skills 360

dbt (data build tool) brings software engineering practices to analytics: version control, testing, documentation, and modular code. Claude Code accelerates dbt development significantly — generating model SQL, writing comprehensive tests, creating macros, and maintaining documentation that stays in sync with model changes.

This guide covers dbt development with Claude Code: model organization, incremental strategies, testing, macros, and CI/CD.

dbt Project Setup

CLAUDE.md for dbt Projects

## dbt Configuration
- dbt Core 1.8+ with BigQuery adapter (dbt-bigquery)
- Project: analytics
- Profiles: ~/.dbt/profiles.yml (dev: analytics_dev dataset, prod: analytics)
- Model layers: staging → intermediate → marts
- Database conventions:
  - staging: `stg_{source}_{entity}` — one-to-one source transformations, no joins
  - intermediate: `int_{entity}_{transformation}` — cross-source joins, business logic
  - marts: `{domain}_{entity}` — final tables consumed by BI tools and analysts
- Test requirements: all primary keys unique+not_null, all FK relationships tested
- Incremental strategy: merge on primary key for event tables, delete+insert for dimension rebuilds

Model Organization

Create the staging models for our e-commerce data.
Sources: raw.orders, raw.order_items, raw.products, raw.customers.
Apply standard typing and naming, no business logic yet.
-- models/staging/ecommerce/stg_ecommerce__orders.sql
-- Staging layer: rename columns, cast types, no business logic

with source as (
    select * from {{ source('ecommerce', 'orders') }}
),

renamed as (
    select
        -- Primary key
        order_id,

        -- Foreign keys
        customer_id,
        shipping_address_id,

        -- Timestamps (cast to consistent timezone)
        cast(created_at as timestamp) as created_at,
        cast(updated_at as timestamp) as updated_at,
        cast(shipped_at as timestamp) as shipped_at,
        cast(delivered_at as timestamp) as delivered_at,

        -- Dimensions
        lower(trim(status)) as status,
        lower(trim(channel)) as acquisition_channel,

        -- Measures (keep in cents to avoid float precision)
        subtotal_cents,
        shipping_cents,
        tax_cents,
        discount_cents,
        total_cents,

        -- Metadata
        _ingested_at

    from source
    where
        -- Remove test orders
        customer_id not in (select customer_id from {{ ref('seed_test_customers') }})
)

select * from renamed
-- models/staging/ecommerce/_stg_ecommerce__sources.yml
version: 2

sources:
  - name: ecommerce
    database: raw_data
    schema: ecommerce
    description: "Raw e-commerce operational data from PostgreSQL via Fivetran"
    freshness:
      warn_after: {count: 24, period: hour}
      error_after: {count: 48, period: hour}
    loaded_at_field: _ingested_at

    tables:
      - name: orders
        description: "Customer orders"
        columns:
          - name: order_id
            description: "Unique order identifier"
            tests:
              - unique
              - not_null
          - name: status
            tests:
              - accepted_values:
                  values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']

Intermediate Models — Business Logic

Build an intermediate model that calculates order-level metrics:
order value, item count, days to ship, and whether the order had a discount.
-- models/intermediate/ecommerce/int_orders__enriched.sql

with orders as (
    select * from {{ ref('stg_ecommerce__orders') }}
),

order_items as (
    select
        order_id,
        count(*) as line_item_count,
        count(distinct product_id) as unique_product_count,
        sum(quantity) as total_quantity
    from {{ ref('stg_ecommerce__order_items') }}
    group by order_id
),

final as (
    select
        -- Order identifiers
        o.order_id,
        o.customer_id,
        o.status,
        o.acquisition_channel,
        o.created_at,
        o.shipped_at,
        o.delivered_at,

        -- Order value
        round(o.total_cents / 100.0, 2) as total_amount,
        round(o.subtotal_cents / 100.0, 2) as subtotal_amount,
        round(o.discount_cents / 100.0, 2) as discount_amount,
        o.discount_cents > 0 as had_discount,

        -- Fulfillment timing
        oi.line_item_count,
        oi.unique_product_count,
        oi.total_quantity,

        -- Ship time in hours (null if not yet shipped)
        timestamp_diff(o.shipped_at, o.created_at, hour) as hours_to_ship,

        -- Delivery time in days (null if not yet delivered)
        timestamp_diff(o.delivered_at, o.created_at, day) as days_to_deliver,

        -- Order cohort (for retention analysis)
        date_trunc(o.created_at, month) as order_month

    from orders o
    left join order_items oi using (order_id)
)

select * from final

Incremental Models for Event Data

We have a page_events table with 500M+ rows.
Build an incremental model that only processes new events.
-- models/intermediate/analytics/int_page_events__sessions.sql
-- Incremental: only process events from last 3 days on each run
-- (overlap handles late-arriving events)

{{
    config(
        materialized='incremental',
        unique_key='session_id',
        incremental_strategy='merge',
        partition_by={
            "field": "session_date",
            "data_type": "date",
            "granularity": "day"
        },
        cluster_by=['user_id', 'session_date'],
        on_schema_change='append_new_columns'
    )
}}

with events as (
    select * from {{ ref('stg_analytics__page_events') }}

    {% if is_incremental() %}
        -- Lookback window for late-arriving events
        where event_timestamp >= timestamp_sub(
            (select max(session_date) from {{ this }}),
            interval 3 day
        )
    {% endif %}
),

-- Session segmentation: new session if > 30 min gap
session_boundaries as (
    select
        *,
        case
            when timestamp_diff(
                event_timestamp,
                lag(event_timestamp) over (partition by user_id order by event_timestamp),
                minute
            ) > 30
            or lag(event_timestamp) over (partition by user_id order by event_timestamp) is null
            then 1
            else 0
        end as is_new_session
    from events
),

session_labeled as (
    select
        *,
        -- Generate session ID from user + session start time
        concat(
            user_id, '-',
            format_timestamp('%Y%m%d%H%M%S', 
                first_value(event_timestamp) over (
                    partition by user_id, sum(is_new_session) over (partition by user_id order by event_timestamp)
                    order by event_timestamp
                )
            )
        ) as session_id,
        date(event_timestamp) as session_date
    from session_boundaries
),

session_metrics as (
    select
        session_id,
        user_id,
        session_date,
        min(event_timestamp) as session_start,
        max(event_timestamp) as session_end,
        timestamp_diff(max(event_timestamp), min(event_timestamp), second) as session_duration_seconds,
        count(*) as page_views,
        count(distinct page_url) as unique_pages,
        first_value(page_url) over (partition by session_id order by event_timestamp) as landing_page,
        last_value(page_url) over (partition by session_id order by event_timestamp 
            rows between unbounded preceding and unbounded following) as exit_page,
        logical_or(event_type = 'conversion') as had_conversion,
        countif(event_type = 'conversion') as conversion_count
    from session_labeled
    group by session_id, user_id, session_date
)

select * from session_metrics

Testing Strategy

Write a comprehensive test suite for the orders mart.
Cover data quality, business logic, and SLA assertions.
# models/marts/ecommerce/_marts__orders.yml
version: 2

models:
  - name: orders
    description: "Order-level fact table — source of truth for all order metrics"
    
    tests:
      # Model-level tests
      - dbt_utils.expression_is_true:
          expression: "total_amount >= 0"
          name: orders_no_negative_totals
      - dbt_utils.expression_is_true:
          expression: "subtotal_amount <= total_amount + 0.01"
          name: orders_subtotal_not_greater_than_total

    columns:
      - name: order_id
        tests:
          - unique
          - not_null
          
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('customers')
              field: customer_id
              
      - name: status
        tests:
          - not_null
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded']
              
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000  # Flag anomalously large orders
              
      - name: hours_to_ship
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 720  # 30 days max — flag if longer
              where: "status in ('shipped', 'delivered')"

Custom generic test for business rules:

-- tests/generic/assert_no_shipped_orders_missing_tracking.sql
{% test assert_no_shipped_orders_missing_tracking(model) %}

select *
from {{ model }}
where status = 'shipped'
  and tracking_number is null

{% endtest %}

-- Applied in schema.yml:
-- - name: orders
--   tests:
--     - assert_no_shipped_orders_missing_tracking

Macros for DRY SQL

We calculate "active customer" status in 5 different models.
Extract it into a reusable macro.
-- macros/ecommerce/is_active_customer.sql
{% macro is_active_customer(order_date_column, lookback_days=90) %}

    (
        {{ order_date_column }} >= date_sub(current_date(), interval {{ lookback_days }} day)
        and {{ order_date_column }} <= current_date()
    )

{% endmacro %}

-- Usage in any model:
-- where {{ is_active_customer('last_order_date') }}
-- where {{ is_active_customer('last_order_date', lookback_days=180) }}

Macro for generating pivot columns:

-- macros/pivot_channels.sql
{% macro get_acquisition_channels() %}
    {% set query %}
        select distinct acquisition_channel
        from {{ ref('stg_ecommerce__orders') }}
        order by 1
    {% endset %}
    
    {% set results = run_query(query) %}
    {% if execute %}
        {{ return(results.columns[0].values()) }}
    {% else %}
        {{ return([]) }}
    {% endif %}
{% endmacro %}

-- Usage in a revenue-by-channel pivot:
select
    order_month,
    {% for channel in get_acquisition_channels() %}
    sum(case when acquisition_channel = '{{ channel }}' then total_amount else 0 end) as revenue_{{ channel | replace('-', '_') }}
    {% if not loop.last %},{% endif %}
    {% endfor %}
from {{ ref('int_orders__enriched') }}
group by order_month

dbt Cloud CI

Set up CI that runs tests on modified models only
and fails the PR if any test breaks.
# .github/workflows/dbt-ci.yml
name: dbt CI

on:
  pull_request:
    paths:
      - 'models/**'
      - 'tests/**'
      - 'macros/**'
      - 'dbt_project.yml'

jobs:
  dbt-check:
    runs-on: ubuntu-latest
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          
      - name: Install dbt
        run: pip install dbt-bigquery==1.8.0 dbt-utils
        
      - name: Configure BigQuery credentials
        run: |
          echo '${{ secrets.GCP_SA_KEY }}' > /tmp/gcp-key.json
          
      - name: Write dbt profiles
        run: |
          mkdir -p ~/.dbt
          cat > ~/.dbt/profiles.yml <<EOF
          analytics:
            target: ci
            outputs:
              ci:
                type: bigquery
                method: service-account
                project: mycompany-data
                dataset: analytics_ci_${{ github.run_id }}
                keyfile: /tmp/gcp-key.json
                threads: 4
          EOF
          
      - name: Install dbt dependencies
        run: dbt deps
        
      - name: Get changed models
        id: changed
        run: |
          # Find models changed in this PR
          CHANGED=$(git diff --name-only origin/${{ github.base_ref }} HEAD | \
            grep '^models/' | \
            sed 's|models/||;s|\.sql||;s|/|.|g' | \
            tr '\n' ' ')
          echo "models=$CHANGED" >> $GITHUB_OUTPUT
          
      - name: Run dbt for changed models + downstream
        run: |
          # Build changed models and all downstream dependencies
          dbt run --select "${{ steps.changed.outputs.models }}+" --target ci
          
      - name: Run tests for changed models
        run: |
          dbt test --select "${{ steps.changed.outputs.models }}+" --target ci
          
      - name: Cleanup CI dataset
        if: always()
        run: |
          bq rm -r --force mycompany-data:analytics_ci_${{ github.run_id }}

For the data pipeline foundations including Kafka for event streaming into your warehouse, see the Kafka guide. For orchestrating dbt runs with Python-based workflows, the data engineering guide covers Airflow and Prefect integration patterns. The Claude Skills 360 bundle includes dbt skill sets for model templates, testing patterns, and Git-backed CI workflows. Start with the free tier to try dbt model generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free