Claude Code for Snowflake: Data Warehouse SQL Patterns and Automation — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for Snowflake: Data Warehouse SQL Patterns and Automation
Data Engineering

Claude Code for Snowflake: Data Warehouse SQL Patterns and Automation

Published: November 1, 2026
Read time: 8 min read
By: Claude Skills 360

Snowflake separates storage and compute, scales warehouses in seconds, and charges only for what you run — but the power features are easy to miss. Streams and Tasks replace ad-hoc incremental logic with CDC-style change tracking. Dynamic data masking protects PII without duplicating tables. Time Travel enables zero-code point-in-time recovery. Snowpark brings Python and Pandas to data that never leaves Snowflake. Claude Code writes the SQL and Python that exploits these features correctly, with the cost controls that prevent runaway warehouse usage.

CLAUDE.md for Snowflake Projects

## Snowflake Stack
- Snowflake with Terraform for infrastructure (warehouses, databases, roles)
- dbt for SQL transformations; Snowpark Python for ML-adjacent ops
- Streams + Tasks for incremental processing pipelines
- Dynamic data masking on PII columns (email, SSN, phone)
- Virtual warehouses: XS for dev/test, S-M for production, multi-cluster for concurrency
- Cost control: auto-suspend=60s, auto-resume=true, query tag every session
- Time Travel: 7 days on raw schema, 1 day on aggregates

Virtual Warehouse and Resource Management

-- Warehouses: right-size for workload, auto-suspend to save cost
CREATE WAREHOUSE IF NOT EXISTS TRANSFORM_WH
  WAREHOUSE_SIZE = 'SMALL'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  INITIALLY_SUSPENDED = TRUE
  COMMENT = 'dbt transformations — auto-resumes on demand';

-- Multi-cluster warehouse for concurrent BI queries
CREATE WAREHOUSE IF NOT EXISTS ANALYTICS_WH
  WAREHOUSE_SIZE = 'MEDIUM'
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3
  SCALING_POLICY = 'ECONOMY'
  AUTO_SUSPEND = 120
  AUTO_RESUME = TRUE;

-- Resource monitor: alert and suspend if monthly credits exceed limit
CREATE RESOURCE MONITOR monthly_limit
  WITH CREDIT_QUOTA = 500
  TRIGGERS
    ON 80 PERCENT DO NOTIFY
    ON 100 PERCENT DO SUSPEND;

ALTER WAREHOUSE ANALYTICS_WH SET RESOURCE_MONITOR = monthly_limit;

-- Always tag your queries for cost attribution
ALTER SESSION SET QUERY_TAG = 'dbt:daily_run:orders_mart';

Streams and Tasks (Incremental CDC)

-- Stream: tracks inserts/updates/deletes on a table (CDC-style)
CREATE STREAM IF NOT EXISTS raw.orders_stream
  ON TABLE raw.orders
  APPEND_ONLY = FALSE;  -- Track all DML, not just inserts

-- Task: runs SQL on a schedule to consume the stream
CREATE TASK IF NOT EXISTS process_orders_task
  WAREHOUSE = TRANSFORM_WH
  SCHEDULE = '5 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')  -- Skip if nothing to process
AS
  MERGE INTO staging.orders AS target
  USING (
    SELECT
      order_id,
      user_id,
      status,
      total_cents,
      created_at,
      METADATA$ACTION AS _action,
      METADATA$ISUPDATE AS _is_update
    FROM raw.orders_stream
  ) AS source
  ON target.order_id = source.order_id
  WHEN MATCHED AND source._action = 'DELETE' THEN DELETE
  WHEN MATCHED AND source._action = 'INSERT' THEN UPDATE SET
    target.status = source.status,
    target.total_cents = source.total_cents,
    target._updated_at = CURRENT_TIMESTAMP()
  WHEN NOT MATCHED AND source._action = 'INSERT' THEN INSERT (
    order_id, user_id, status, total_cents, created_at, _updated_at
  ) VALUES (
    source.order_id, source.user_id, source.status,
    source.total_cents, source.created_at, CURRENT_TIMESTAMP()
  );

-- Start the task
ALTER TASK process_orders_task RESUME;

Dynamic Data Masking (PII Protection)

-- Masking policy: show full email to admins, masked to analysts
CREATE MASKING POLICY IF NOT EXISTS email_mask
  AS (val STRING)
  RETURNS STRING ->
    CASE
      WHEN CURRENT_ROLE() IN ('ADMIN', 'DATA_ENGINEER') THEN val
      WHEN CURRENT_ROLE() = 'ANALYST' THEN REGEXP_REPLACE(val, '.+@', '***@')
      ELSE '****'  -- Hidden from all other roles
    END;

-- Phone masking: show last 4 digits only
CREATE MASKING POLICY IF NOT EXISTS phone_mask
  AS (val STRING)
  RETURNS STRING ->
    CASE
      WHEN CURRENT_ROLE() = 'ADMIN' THEN val
      ELSE CONCAT('***-***-', RIGHT(REGEXP_REPLACE(val, '[^0-9]', ''), 4))
    END;

-- Apply masking policies to columns
ALTER TABLE raw.users
  MODIFY COLUMN email SET MASKING POLICY email_mask;

ALTER TABLE raw.users
  MODIFY COLUMN phone SET MASKING POLICY phone_mask;

Time Travel and Fail-Safe

-- Query data as it was at a point in time
SELECT * FROM raw.orders
AT (TIMESTAMP => DATEADD('hour', -2, CURRENT_TIMESTAMP()));

-- Recover accidentally deleted rows: data from 1 hour ago
INSERT INTO raw.orders
SELECT * FROM raw.orders BEFORE (STATEMENT => '<statement_id>')
WHERE order_id NOT IN (SELECT order_id FROM raw.orders);

-- Clone table at a point in time (zero-copy — shares storage)
CREATE TABLE raw.orders_backup
  CLONE raw.orders
  AT (TIMESTAMP => DATEADD('day', -1, CURRENT_TIMESTAMP()));

-- Travel via offset (seconds before now)
SELECT COUNT(*) FROM raw.orders AT (OFFSET => -3600);

Snowpark Python

# snowpark/order_features.py — Python that runs inside Snowflake
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import FloatType

# Connect to Snowflake
session = Session.builder.configs({
    "account": "myorg-myaccount",
    "user": "svc_snowpark",
    "private_key_path": "/secrets/rsa_key.p8",
    "database": "PRODUCTION",
    "schema": "FEATURES",
    "warehouse": "TRANSFORM_WH",
}).create()

def compute_rfm_features(session: Session) -> None:
    """Compute RFM (Recency, Frequency, Monetary) features."""
    orders = session.table("raw.orders").filter(F.col("status") == "delivered")
    
    # All Snowpark transformations run as a single SQL query in Snowflake
    rfm = (
        orders
        .group_by("user_id")
        .agg(
            F.datediff("day", F.max("created_at"), F.current_timestamp()).alias("recency_days"),
            F.count("order_id").alias("frequency"),
            F.sum("total_cents").alias("monetary_cents"),
        )
        .with_column("monetary_usd", F.col("monetary_cents") / F.lit(100))
        .drop("monetary_cents")
    )
    
    # Write back to Snowflake (no data ever leaves)
    rfm.write.mode("overwrite").save_as_table("features.rfm_scores")
    print(f"Wrote {rfm.count()} RFM records")

compute_rfm_features(session)

For the dbt transformations that run on top of Snowflake tables, the dbt advanced guide covers dbt models, contracts, and semantic layer on Snowflake. For the data visualization dashboards that query Snowflake warehouses, the data visualization guide covers connected BI patterns. The Claude Skills 360 bundle includes Snowflake skill sets covering Streams/Tasks, dynamic masking, Time Travel, and Snowpark Python. Start with the free tier to try Snowflake pipeline generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free