Claude Code for ClickHouse: Real-Time Analytics at Scale — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for ClickHouse: Real-Time Analytics at Scale
Data Engineering

Claude Code for ClickHouse: Real-Time Analytics at Scale

Published: November 4, 2026
Read time: 8 min read
By: Claude Skills 360

ClickHouse is a columnar database built for analytics: it ingests billions of events per second and returns aggregations in milliseconds. Its secret is column-oriented storage with SIMD-accelerated compression and vectorized query execution — reading only the columns needed, compressed, from disk. ClickHouse powers the real-time dashboards, funnel analytics, and ad-tech platforms that can’t tolerate the latency of PostgreSQL GROUP BY on large tables. Claude Code writes ClickHouse schemas, materialized views, time-series queries, and the ingestion patterns that keep data fresh.

CLAUDE.md for ClickHouse Projects

## Analytics Stack
- ClickHouse 24.x (latest stable) — self-hosted or ClickHouse Cloud
- Table engine: MergeTree family (ReplicatedMergeTree for HA)
- Ingestion: Kafka engine or HTTP bulk insert via Python clickhouse-connect
- Partitioning: toYYYYMM(event_time) for most time-series tables
- Order by: always (tenant_id, event_type, event_time) — determines sort key + sparse index
- Materialized views: pre-aggregate on insert; avoid real-time rollups in SELECT
- TTL: set on all raw event tables; raw = 30 days, aggregated = 3 years
- No JOINs on large tables — denormalize or use dictionaries

Schema Design

-- Raw events table with MergeTree
CREATE TABLE events (
  tenant_id     UInt32,
  event_id      UUID DEFAULT generateUUIDv4(),
  event_type    LowCardinality(String),  -- LowCardinality for repeated strings
  user_id       UInt64,
  session_id    UInt64,
  properties    Map(String, String),     -- Flexible event properties
  event_time    DateTime64(3, 'UTC'),
  ingested_at   DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(event_time))  -- Partition pruning
ORDER BY (tenant_id, event_type, user_id, event_time)
TTL event_time + INTERVAL 30 DAY DELETE  -- Auto-expire old data
SETTINGS index_granularity = 8192;

-- Pre-aggregated hourly metrics (updated by materialized view on insert)
CREATE TABLE events_hourly (
  tenant_id   UInt32,
  event_type  LowCardinality(String),
  hour        DateTime,
  count       UInt64,
  unique_users AggregateFunction(uniq, UInt64),  -- HyperLogLog state
  p50_duration SimpleAggregateFunction(max, Float32)  -- Approximation
)
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (tenant_id, event_type, hour)
TTL hour + INTERVAL 3 YEAR DELETE;

Materialized Views

-- Materialized view: triggers on every insert to events, aggregates to hourly
CREATE MATERIALIZED VIEW events_hourly_mv
TO events_hourly
AS
SELECT
  tenant_id,
  event_type,
  toStartOfHour(event_time) AS hour,
  count()                  AS count,
  uniqState(user_id)       AS unique_users,  -- Saves HLL state, not final value
  maxSimpleState(toFloat32(properties['duration_ms'])) AS p50_duration
FROM events
GROUP BY tenant_id, event_type, hour;

-- Query the aggregated table (merge intermediate states)
SELECT
  event_type,
  hour,
  sum(count) AS total_events,
  uniqMerge(unique_users) AS unique_users  -- Merge HLL states from multiple rows
FROM events_hourly
WHERE tenant_id = 42
  AND hour >= now() - INTERVAL 7 DAY
GROUP BY event_type, hour
ORDER BY hour, event_type;

Time-Series Queries

-- Funnel analysis: what % of users complete each step?
WITH funnel AS (
  SELECT
    user_id,
    windowFunnel(3600)(  -- 1-hour conversion window
      event_time,
      event_type = 'page_view',
      event_type = 'add_to_cart',
      event_type = 'checkout_start',
      event_type = 'purchase'
    ) AS step
  FROM events
  WHERE tenant_id = 42
    AND event_time >= today() - 30
  GROUP BY user_id
)
SELECT
  step,
  count() AS users,
  round(100.0 * count() / max(count()) OVER (), 1) AS pct
FROM funnel
GROUP BY step
ORDER BY step;

-- Retention cohorts
SELECT
  toDate(first_event) AS cohort_date,
  dateDiff('day', first_event, activity_date) AS day_offset,
  count(DISTINCT user_id) AS retained
FROM (
  SELECT
    user_id,
    min(event_time) OVER (PARTITION BY user_id) AS first_event,
    toDate(event_time) AS activity_date
  FROM events
  WHERE event_type = 'session_start' AND tenant_id = 42
)
GROUP BY cohort_date, day_offset
ORDER BY cohort_date, day_offset;

-- Per-minute moving average with ASOF JOIN
SELECT
  t1.minute,
  avg(t2.rps) AS moving_avg_5min
FROM (
  SELECT toStartOfMinute(event_time) AS minute, count() AS rps
  FROM events
  WHERE event_time >= now() - INTERVAL 1 HOUR
  GROUP BY minute
) t1
ASOF JOIN (
  SELECT toStartOfMinute(event_time) AS minute, count() AS rps
  FROM events
  WHERE event_time >= now() - INTERVAL 1 HOUR
  GROUP BY minute
) t2
ON t1.minute >= t2.minute AND t2.minute >= t1.minute - INTERVAL 5 MINUTE
GROUP BY t1.minute
ORDER BY t1.minute;

Python Client: Bulk Ingestion

# ingestion/clickhouse_ingestor.py
import clickhouse_connect
from datetime import datetime, timezone
import json

client = clickhouse_connect.get_client(
    host='clickhouse.example.com',
    port=8443,
    username='default',
    password=os.environ['CLICKHOUSE_PASSWORD'],
    secure=True,
)

def ingest_events(events: list[dict]) -> int:
    """Bulk insert events into ClickHouse. Returns count inserted."""
    if not events:
        return 0
    
    rows = [
        (
            event["tenant_id"],
            event["event_type"],
            event["user_id"],
            event.get("session_id", 0),
            event.get("properties", {}),  # Dict maps to Map(String, String)
            datetime.fromisoformat(event["event_time"]).replace(tzinfo=timezone.utc),
        )
        for event in events
    ]
    
    # clickhouse_connect handles batch insert efficiently
    client.insert(
        'events',
        data=rows,
        column_names=['tenant_id', 'event_type', 'user_id', 'session_id', 'properties', 'event_time'],
        settings={'async_insert': 1, 'wait_for_async_insert': 0},  # Async for high throughput
    )
    
    return len(rows)

def get_event_counts(tenant_id: int, hours: int = 24) -> list[dict]:
    """Query hourly event counts from pre-aggregated table."""
    result = client.query(
        """
        SELECT 
          event_type,
          hour,
          sum(count) AS total,
          uniqMerge(unique_users) AS uniq_users
        FROM events_hourly
        WHERE tenant_id = %(tenant_id)s
          AND hour >= now() - INTERVAL %(hours)s HOUR
        GROUP BY event_type, hour
        ORDER BY hour DESC, total DESC
        """,
        parameters={'tenant_id': tenant_id, 'hours': hours},
    )
    return result.named_results()

For the Kafka event streaming pipeline that feeds ClickHouse with real-time events, the Kafka Streams guide covers the Kafka producer patterns that upstream ClickHouse ingestion. For the Apache Spark batch processing that complements ClickHouse real-time analytics, the Apache Spark guide covers offline large-scale aggregations. The Claude Skills 360 bundle includes ClickHouse skill sets covering MergeTree schema design, materialized views, funnel/cohort queries, and Python ingestion. Start with the free tier to try ClickHouse analytics query generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free