ClickHouse processes billions of rows per second on a single server by storing data in columnar format, using vectorized execution, and compressing aggressively. Its MergeTree family of table engines handles OLAP workloads that would crush PostgreSQL. Materialized views pre-compute aggregations at insert time. The Kafka engine connects topics directly to ClickHouse tables. Claude Code generates ClickHouse schemas, materialized views, funnel/cohort SQL, ingestion pipelines, and the Python client code for analytics applications.
CLAUDE.md for ClickHouse Projects
## Analytics Stack
- ClickHouse 24.x (single-node or 3-node sharded cluster)
- Client: clickhouse-connect (Python) or @clickhouse/client (Node.js)
- Ingestion: Kafka → Kafka engine → MV → target table (recommended)
OR: Python batch inserts with async_insert=1 (simpler)
- Primary key: low-cardinality first (tenant, date), then query discriminator
- Deduplication: ReplacingMergeTree for upsert semantics, final keyword in queries
- Aggregation: AggregatingMergeTree + Aggregate functions (uniqState, sumState)
- Partitioning: by month for time-series, by tenant for multi-tenant
- TTL: data retention at table level (DELETE or MOVE to cold tier)
Schema Design
-- events table: main MergeTree for raw events
CREATE TABLE events
(
-- Partition and order keys
tenant_id LowCardinality(String),
event_date Date,
event_time DateTime64(3, 'UTC'),
-- Event identity
event_id UUID DEFAULT generateUUIDv4(),
session_id UUID,
user_id Nullable(UUID),
anonymous_id String,
-- Event data
event_type LowCardinality(String),
page_url String,
page_title String,
referrer String,
-- Device/browser context
device_type LowCardinality(String), -- mobile, tablet, desktop
browser LowCardinality(String),
os LowCardinality(String),
country LowCardinality(FixedString(2)),
-- Flexible properties
properties Map(String, String),
-- Ingestion metadata
received_at DateTime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(event_date))
ORDER BY (tenant_id, event_date, event_type, event_time, event_id)
TTL event_time + INTERVAL 90 DAY DELETE
SETTINGS index_granularity = 8192, storage_policy = 'tiered';
-- Product metrics: ReplacingMergeTree for upserts
CREATE TABLE order_metrics
(
tenant_id LowCardinality(String),
order_id String,
user_id UUID,
status LowCardinality(String),
total_cents UInt64,
item_count UInt16,
created_date Date,
created_at DateTime64(3, 'UTC'),
updated_at DateTime64(3, 'UTC'), -- Used as version
-- Enriched fields
user_segment LowCardinality(String),
acquisition_channel LowCardinality(String)
)
ENGINE = ReplacingMergeTree(updated_at) -- Dedup by updated_at as version
PARTITION BY (tenant_id, toYYYYMM(created_date))
ORDER BY (tenant_id, created_date, order_id);
Materialized Views for Pre-Aggregation
-- Pre-aggregate daily event counts at insert time
CREATE MATERIALIZED VIEW daily_event_counts_mv
TO daily_event_counts
AS SELECT
tenant_id,
event_date,
event_type,
device_type,
country,
count() AS event_count,
uniqExact(user_id) AS unique_users,
uniqExact(session_id) AS sessions
FROM events
GROUP BY tenant_id, event_date, event_type, device_type, country;
CREATE TABLE daily_event_counts
(
tenant_id LowCardinality(String),
event_date Date,
event_type LowCardinality(String),
device_type LowCardinality(String),
country LowCardinality(FixedString(2)),
event_count UInt64,
unique_users UInt64,
sessions UInt64
)
ENGINE = SummingMergeTree()
ORDER BY (tenant_id, event_date, event_type, device_type, country);
-- Query pre-aggregated (100-1000x faster than scanning events)
SELECT
event_date,
event_type,
sum(event_count) AS total_events,
sum(unique_users) AS dau
FROM daily_event_counts
WHERE tenant_id = 'acme' AND event_date >= today() - 30
GROUP BY 1, 2
ORDER BY 1, 3 DESC;
Funnel Analysis
-- Multi-step funnel: signup → first_order → second_order
-- windowFunnel aggregates steps in order within a time window
SELECT
tenant_id,
event_date,
countIf(funnel_step >= 1) AS step1_signup,
countIf(funnel_step >= 2) AS step2_first_order,
countIf(funnel_step >= 3) AS step3_second_order,
round(countIf(funnel_step >= 2) / countIf(funnel_step >= 1) * 100, 1) AS signup_to_first_pct,
round(countIf(funnel_step >= 3) / countIf(funnel_step >= 2) * 100, 1) AS first_to_second_pct
FROM (
SELECT
tenant_id,
toDate(min(event_time)) AS event_date,
user_id,
windowFunnel(7 * 86400)( -- 7-day window
event_time,
event_type = 'user_signed_up',
event_type = 'order_completed',
event_type = 'order_completed' AND event_id != first_order_id -- 2nd order
) AS funnel_step
FROM events
WHERE tenant_id = 'acme'
AND event_date BETWEEN '2026-10-01' AND '2026-11-30'
AND event_type IN ('user_signed_up', 'order_completed')
GROUP BY tenant_id, user_id
)
GROUP BY tenant_id, event_date
ORDER BY event_date;
Cohort Retention Analysis
-- Weekly cohort retention
WITH cohort_users AS (
SELECT
user_id,
toMonday(min(event_time)) AS cohort_week
FROM events
WHERE tenant_id = 'acme'
AND event_type = 'user_signed_up'
AND event_date >= '2026-01-01'
GROUP BY user_id
),
weekly_activity AS (
SELECT DISTINCT
e.user_id,
toMonday(e.event_time) AS activity_week
FROM events e
WHERE tenant_id = 'acme'
AND e.event_type IN ('order_completed', 'product_viewed', 'cart_updated')
),
cohort_activity AS (
SELECT
c.cohort_week,
dateDiff('week', c.cohort_week, a.activity_week) AS weeks_since_signup,
count(DISTINCT c.user_id) AS active_users
FROM cohort_users c
JOIN weekly_activity a ON c.user_id = a.user_id
AND a.activity_week >= c.cohort_week
GROUP BY c.cohort_week, weeks_since_signup
)
SELECT
cohort_week,
weeks_since_signup,
active_users,
first_value(active_users) OVER (PARTITION BY cohort_week ORDER BY weeks_since_signup) AS cohort_size,
round(active_users / first_value(active_users) OVER (PARTITION BY cohort_week ORDER BY weeks_since_signup) * 100, 1) AS retention_pct
FROM cohort_activity
ORDER BY cohort_week, weeks_since_signup;
Kafka Engine Integration
-- Kafka → ClickHouse pipeline (no separate consumer needed)
CREATE TABLE events_kafka_queue
(
tenant_id String,
event_time String, -- Will be parsed
event_type String,
user_id String,
properties String -- JSON
)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'analytics.events',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4,
kafka_max_block_size = 65536;
-- MV moves data from Kafka queue → main events table
CREATE MATERIALIZED VIEW events_kafka_mv TO events AS
SELECT
tenant_id,
parseDateTimeBestEffort(event_time) AS event_time,
toDate(event_time) AS event_date,
event_type,
accurateCastOrNull(user_id, 'UUID') AS user_id,
JSONExtractString(properties, 'page_url') AS page_url,
-- ... other field mappings
FROM events_kafka_queue;
Python Client
# analytics/client.py
import clickhouse_connect
import pandas as pd
from functools import lru_cache
@lru_cache(maxsize=1)
def get_client():
return clickhouse_connect.get_client(
host='clickhouse',
port=8123,
username='analytics',
password=SECRET_CLICKHOUSE_PASSWORD,
database='analytics',
settings={
'async_insert': 1,
'wait_for_async_insert': 0,
},
)
def insert_events(events: list[dict]) -> None:
"""High-throughput insert with async_insert."""
client = get_client()
client.insert('events', events, column_names=list(events[0].keys()))
def query_daily_stats(tenant_id: str, days: int = 30) -> pd.DataFrame:
"""Query pre-aggregated daily stats."""
result = get_client().query_df("""
SELECT
event_date,
sum(event_count) AS events,
sum(unique_users) AS dau
FROM daily_event_counts
WHERE tenant_id = {tenant:String}
AND event_date >= today() - {days:Int32}
GROUP BY event_date
ORDER BY event_date
""", parameters={'tenant': tenant_id, 'days': days})
return result
For the Kafka ingestion pipeline that feeds the Kafka engine, see the Apache Kafka consumer guide for producer and exactly-once patterns. For the existing ClickHouse guide covering materialized views and HNSW indexes, the ClickHouse guide covers MergeTree engine fundamentals. The Claude Skills 360 bundle includes ClickHouse skill sets covering funnel analysis, cohort SQL, and Kafka engine integration. Start with the free tier to try ClickHouse schema generation.