Claude Code for ClickHouse Analytics: Real-Time OLAP and High-Throughput Ingestion — Claude Skills 360 Blog
Blog / Data / Claude Code for ClickHouse Analytics: Real-Time OLAP and High-Throughput Ingestion
Data

Claude Code for ClickHouse Analytics: Real-Time OLAP and High-Throughput Ingestion

Published: December 8, 2026
Read time: 10 min read
By: Claude Skills 360

ClickHouse processes billions of rows per second on a single server by storing data in columnar format, using vectorized execution, and compressing aggressively. Its MergeTree family of table engines handles OLAP workloads that would crush PostgreSQL. Materialized views pre-compute aggregations at insert time. The Kafka engine connects topics directly to ClickHouse tables. Claude Code generates ClickHouse schemas, materialized views, funnel/cohort SQL, ingestion pipelines, and the Python client code for analytics applications.

CLAUDE.md for ClickHouse Projects

## Analytics Stack
- ClickHouse 24.x (single-node or 3-node sharded cluster)
- Client: clickhouse-connect (Python) or @clickhouse/client (Node.js)
- Ingestion: Kafka → Kafka engine → MV → target table (recommended)
         OR: Python batch inserts with async_insert=1 (simpler)
- Primary key: low-cardinality first (tenant, date), then query discriminator
- Deduplication: ReplacingMergeTree for upsert semantics, final keyword in queries
- Aggregation: AggregatingMergeTree + Aggregate functions (uniqState, sumState)
- Partitioning: by month for time-series, by tenant for multi-tenant
- TTL: data retention at table level (DELETE or MOVE to cold tier)

Schema Design

-- events table: main MergeTree for raw events
CREATE TABLE events
(
    -- Partition and order keys
    tenant_id       LowCardinality(String),
    event_date      Date,
    event_time      DateTime64(3, 'UTC'),
    
    -- Event identity
    event_id        UUID DEFAULT generateUUIDv4(),
    session_id      UUID,
    user_id         Nullable(UUID),
    anonymous_id    String,
    
    -- Event data
    event_type      LowCardinality(String),
    page_url        String,
    page_title      String,
    referrer        String,
    
    -- Device/browser context
    device_type     LowCardinality(String),  -- mobile, tablet, desktop
    browser         LowCardinality(String),
    os              LowCardinality(String),
    country         LowCardinality(FixedString(2)),
    
    -- Flexible properties
    properties      Map(String, String),
    
    -- Ingestion metadata
    received_at     DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(event_date))
ORDER BY (tenant_id, event_date, event_type, event_time, event_id)
TTL event_time + INTERVAL 90 DAY DELETE
SETTINGS index_granularity = 8192, storage_policy = 'tiered';

-- Product metrics: ReplacingMergeTree for upserts
CREATE TABLE order_metrics
(
    tenant_id       LowCardinality(String),
    order_id        String,
    user_id         UUID,
    status          LowCardinality(String),
    total_cents     UInt64,
    item_count      UInt16,
    created_date    Date,
    created_at      DateTime64(3, 'UTC'),
    updated_at      DateTime64(3, 'UTC'),  -- Used as version
    
    -- Enriched fields
    user_segment    LowCardinality(String),
    acquisition_channel LowCardinality(String)
)
ENGINE = ReplacingMergeTree(updated_at)  -- Dedup by updated_at as version
PARTITION BY (tenant_id, toYYYYMM(created_date))
ORDER BY (tenant_id, created_date, order_id);

Materialized Views for Pre-Aggregation

-- Pre-aggregate daily event counts at insert time
CREATE MATERIALIZED VIEW daily_event_counts_mv
TO daily_event_counts
AS SELECT
    tenant_id,
    event_date,
    event_type,
    device_type,
    country,
    count()             AS event_count,
    uniqExact(user_id)  AS unique_users,
    uniqExact(session_id) AS sessions
FROM events
GROUP BY tenant_id, event_date, event_type, device_type, country;

CREATE TABLE daily_event_counts
(
    tenant_id       LowCardinality(String),
    event_date      Date,
    event_type      LowCardinality(String),
    device_type     LowCardinality(String),
    country         LowCardinality(FixedString(2)),
    event_count     UInt64,
    unique_users    UInt64,
    sessions        UInt64
)
ENGINE = SummingMergeTree()
ORDER BY (tenant_id, event_date, event_type, device_type, country);

-- Query pre-aggregated (100-1000x faster than scanning events)
SELECT
    event_date,
    event_type,
    sum(event_count)    AS total_events,
    sum(unique_users)   AS dau
FROM daily_event_counts
WHERE tenant_id = 'acme' AND event_date >= today() - 30
GROUP BY 1, 2
ORDER BY 1, 3 DESC;

Funnel Analysis

-- Multi-step funnel: signup → first_order → second_order
-- windowFunnel aggregates steps in order within a time window
SELECT
    tenant_id,
    event_date,
    countIf(funnel_step >= 1)   AS step1_signup,
    countIf(funnel_step >= 2)   AS step2_first_order,
    countIf(funnel_step >= 3)   AS step3_second_order,
    
    round(countIf(funnel_step >= 2) / countIf(funnel_step >= 1) * 100, 1) AS signup_to_first_pct,
    round(countIf(funnel_step >= 3) / countIf(funnel_step >= 2) * 100, 1) AS first_to_second_pct
    
FROM (
    SELECT
        tenant_id,
        toDate(min(event_time))  AS event_date,
        user_id,
        windowFunnel(7 * 86400)(  -- 7-day window
            event_time,
            event_type = 'user_signed_up',
            event_type = 'order_completed',
            event_type = 'order_completed' AND event_id != first_order_id  -- 2nd order
        ) AS funnel_step
    FROM events
    WHERE tenant_id = 'acme'
      AND event_date BETWEEN '2026-10-01' AND '2026-11-30'
      AND event_type IN ('user_signed_up', 'order_completed')
    GROUP BY tenant_id, user_id
)
GROUP BY tenant_id, event_date
ORDER BY event_date;

Cohort Retention Analysis

-- Weekly cohort retention
WITH cohort_users AS (
    SELECT
        user_id,
        toMonday(min(event_time)) AS cohort_week
    FROM events
    WHERE tenant_id = 'acme'
      AND event_type = 'user_signed_up'
      AND event_date >= '2026-01-01'
    GROUP BY user_id
),

weekly_activity AS (
    SELECT DISTINCT
        e.user_id,
        toMonday(e.event_time) AS activity_week
    FROM events e
    WHERE tenant_id = 'acme'
      AND e.event_type IN ('order_completed', 'product_viewed', 'cart_updated')
),

cohort_activity AS (
    SELECT
        c.cohort_week,
        dateDiff('week', c.cohort_week, a.activity_week) AS weeks_since_signup,
        count(DISTINCT c.user_id) AS active_users
    FROM cohort_users c
    JOIN weekly_activity a ON c.user_id = a.user_id
        AND a.activity_week >= c.cohort_week
    GROUP BY c.cohort_week, weeks_since_signup
)

SELECT
    cohort_week,
    weeks_since_signup,
    active_users,
    first_value(active_users) OVER (PARTITION BY cohort_week ORDER BY weeks_since_signup) AS cohort_size,
    round(active_users / first_value(active_users) OVER (PARTITION BY cohort_week ORDER BY weeks_since_signup) * 100, 1) AS retention_pct
FROM cohort_activity
ORDER BY cohort_week, weeks_since_signup;

Kafka Engine Integration

-- Kafka → ClickHouse pipeline (no separate consumer needed)
CREATE TABLE events_kafka_queue
(
    tenant_id   String,
    event_time  String,  -- Will be parsed
    event_type  String,
    user_id     String,
    properties  String   -- JSON
)
ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'analytics.events',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4,
    kafka_max_block_size = 65536;

-- MV moves data from Kafka queue → main events table
CREATE MATERIALIZED VIEW events_kafka_mv TO events AS
SELECT
    tenant_id,
    parseDateTimeBestEffort(event_time) AS event_time,
    toDate(event_time)                  AS event_date,
    event_type,
    accurateCastOrNull(user_id, 'UUID') AS user_id,
    JSONExtractString(properties, 'page_url') AS page_url,
    -- ... other field mappings
FROM events_kafka_queue;

Python Client

# analytics/client.py
import clickhouse_connect
import pandas as pd
from functools import lru_cache

@lru_cache(maxsize=1)
def get_client():
    return clickhouse_connect.get_client(
        host='clickhouse',
        port=8123,
        username='analytics',
        password=SECRET_CLICKHOUSE_PASSWORD,
        database='analytics',
        settings={
            'async_insert': 1,
            'wait_for_async_insert': 0,
        },
    )

def insert_events(events: list[dict]) -> None:
    """High-throughput insert with async_insert."""
    client = get_client()
    client.insert('events', events, column_names=list(events[0].keys()))

def query_daily_stats(tenant_id: str, days: int = 30) -> pd.DataFrame:
    """Query pre-aggregated daily stats."""
    result = get_client().query_df("""
        SELECT
            event_date,
            sum(event_count) AS events,
            sum(unique_users) AS dau
        FROM daily_event_counts
        WHERE tenant_id = {tenant:String}
          AND event_date >= today() - {days:Int32}
        GROUP BY event_date
        ORDER BY event_date
    """, parameters={'tenant': tenant_id, 'days': days})
    return result

For the Kafka ingestion pipeline that feeds the Kafka engine, see the Apache Kafka consumer guide for producer and exactly-once patterns. For the existing ClickHouse guide covering materialized views and HNSW indexes, the ClickHouse guide covers MergeTree engine fundamentals. The Claude Skills 360 bundle includes ClickHouse skill sets covering funnel analysis, cohort SQL, and Kafka engine integration. Start with the free tier to try ClickHouse schema generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free