Claude Code for TimescaleDB: Time-Series Data with PostgreSQL — Claude Skills 360 Blog
Blog / Data Engineering / Claude Code for TimescaleDB: Time-Series Data with PostgreSQL
Data Engineering

Claude Code for TimescaleDB: Time-Series Data with PostgreSQL

Published: November 11, 2026
Read time: 7 min read
By: Claude Skills 360

TimescaleDB is a PostgreSQL extension for time-series data. It adds hypertables (auto-partitioned by time), continuous aggregates (pre-computed rollups that update automatically), and compression policies that reduce storage 95%+ — while staying 100% compatible with PostgreSQL. Your existing pg drivers, ORMs, and SQL tooling all work. Claude Code writes TimescaleDB hypertable definitions, time_bucket aggregations, continuous aggregate refreshes, and the retention/compression policies that automate time-series lifecycle management.

CLAUDE.md for TimescaleDB Projects

## Time-Series Stack
- TimescaleDB 2.x on PostgreSQL 16
- All time-series tables: CREATE TABLE → convert with create_hypertable()
- chunk_time_interval: 1 day for high-frequency data, 1 week for lower-rate
- Continuous aggregates: pre-compute hourly/daily rollups at insert time
- Compression: enable on chunks older than 7 days (60-95% size reduction)
- Retention: drop raw data older than 90 days; keep aggregates 3 years
- Index on (device_id, time DESC) — most queries filter by device then time

Hypertable Schema

-- Enable TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Metrics table: regular CREATE TABLE first
CREATE TABLE device_metrics (
  time        TIMESTAMPTZ NOT NULL,
  device_id   UUID NOT NULL,
  metric_name TEXT NOT NULL,
  value       DOUBLE PRECISION NOT NULL,
  tags        JSONB DEFAULT '{}'
);

-- Convert to hypertable (auto-partitioned by time)
SELECT create_hypertable(
  'device_metrics',
  by_range('time', INTERVAL '1 day')  -- One chunk per day
);

-- Index: most queries are (device_id, time range)
CREATE INDEX ON device_metrics (device_id, time DESC);
-- GIN for JSONB tag filtering
CREATE INDEX ON device_metrics USING GIN (tags);

-- IoT sensor readings
CREATE TABLE sensor_readings (
  time        TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  sensor_id   TEXT NOT NULL,
  temperature REAL,
  humidity    REAL,
  pressure    REAL
);

SELECT create_hypertable('sensor_readings', by_range('time'));
SELECT add_dimension('sensor_readings', by_hash('sensor_id', 4));  -- Also partition by sensor

Time-Series Queries

-- time_bucket: group by time interval (the core function)
SELECT
  time_bucket('1 hour', time) AS hour,
  device_id,
  avg(value) FILTER (WHERE metric_name = 'cpu_usage') AS avg_cpu,
  max(value) FILTER (WHERE metric_name = 'memory_mb') AS max_memory,
  count(*) AS sample_count
FROM device_metrics
WHERE time > NOW() - INTERVAL '24 hours'
  AND device_id = 'device-123'
GROUP BY hour, device_id
ORDER BY hour;

-- First/last value in each bucket (gapfill for sparse data)
SELECT
  time_bucket_gapfill('5 minutes', time) AS bucket,
  sensor_id,
  locf(avg(temperature)) AS temp,  -- Last observation carried forward
  interpolate(avg(humidity)) AS humidity  -- Linear interpolation
FROM sensor_readings
WHERE time BETWEEN NOW() - INTERVAL '6 hours' AND NOW()
  AND sensor_id = 'sensor-42'
GROUP BY bucket, sensor_id
ORDER BY bucket;

-- Percentiles over rolling window
SELECT
  time_bucket('1 hour', time) AS hour,
  percentile_cont(0.50) WITHIN GROUP (ORDER BY value) AS p50,
  percentile_cont(0.95) WITHIN GROUP (ORDER BY value) AS p95,
  percentile_cont(0.99) WITHIN GROUP (ORDER BY value) AS p99
FROM device_metrics
WHERE metric_name = 'response_time_ms'
  AND time > NOW() - INTERVAL '7 days'
GROUP BY hour
ORDER BY hour;

Continuous Aggregates

-- Continuous aggregate: automatically updated rollup
-- Created once, refreshed incrementally on new data
CREATE MATERIALIZED VIEW hourly_device_metrics
WITH (timescaledb.continuous) AS
SELECT
  time_bucket('1 hour', time) AS hour,
  device_id,
  metric_name,
  avg(value)  AS avg_value,
  max(value)  AS max_value,
  min(value)  AS min_value,
  count(*)    AS sample_count
FROM device_metrics
GROUP BY hour, device_id, metric_name;

-- Refresh policy: keep up-to-date automatically
SELECT add_continuous_aggregate_policy(
  'hourly_device_metrics',
  start_offset  => INTERVAL '3 hours',  -- Don't refresh data older than 3h (stable)
  end_offset    => INTERVAL '1 hour',   -- Don't include last 1h (may still change)
  schedule_interval => INTERVAL '1 hour'
);

-- Query the continuous aggregate (much faster than querying raw table)
SELECT hour, avg_value, max_value
FROM hourly_device_metrics
WHERE device_id = 'device-123'
  AND hour > NOW() - INTERVAL '7 days'
ORDER BY hour DESC;

Compression and Retention Policies

-- Compression: enable and configure
ALTER TABLE device_metrics SET (
  timescaledb.compress,
  timescaledb.compress_orderby = 'time DESC',
  timescaledb.compress_segmentby = 'device_id,metric_name'  -- Compress per device+metric
);

-- Compress chunks older than 7 days
SELECT add_compression_policy('device_metrics', INTERVAL '7 days');

-- Retention: drop raw data older than 90 days, keep aggregates
SELECT add_retention_policy('device_metrics', INTERVAL '90 days');
-- Don't add retention to continuous aggregate: keeps 3+ years of rollups

-- Check compression stats
SELECT
  chunk_name,
  pg_size_pretty(before_compression_total_bytes) AS before,
  pg_size_pretty(after_compression_total_bytes) AS after,
  round(100.0 * (1 - after_compression_total_bytes::float / before_compression_total_bytes), 1) AS reduction_pct
FROM chunk_compression_stats('device_metrics')
ORDER BY chunk_name;

Python Client

# timescale/client.py — insert and query time-series data
import asyncpg
from datetime import datetime, timezone

async def insert_metrics(pool: asyncpg.Pool, device_id: str, metrics: dict):
    """Batch insert metrics for a device."""
    rows = [
        (datetime.now(timezone.utc), device_id, name, value)
        for name, value in metrics.items()
    ]
    
    await pool.executemany(
        "INSERT INTO device_metrics (time, device_id, metric_name, value) VALUES ($1, $2, $3, $4)",
        rows,
    )

async def get_device_summary(pool: asyncpg.Pool, device_id: str, hours: int = 24) -> list[dict]:
    """Fetch hourly summary from continuous aggregate."""
    rows = await pool.fetch(
        """
        SELECT hour, metric_name, avg_value, max_value
        FROM hourly_device_metrics
        WHERE device_id = $1 AND hour > NOW() - INTERVAL '1 hour' * $2
        ORDER BY hour DESC
        """,
        device_id, hours,
    )
    return [dict(row) for row in rows]

For the ClickHouse alternative to TimescaleDB for pure analytics at extreme scale, the ClickHouse guide covers columnar analytics for billions of rows. For the PostgreSQL advanced tuning that applies to TimescaleDB’s PostgreSQL base, the PostgreSQL advanced guide covers index strategies and query plans. The Claude Skills 360 bundle includes TimescaleDB skill sets covering hypertable design, continuous aggregates, compression policies, and time_bucket queries. Start with the free tier to try time-series schema generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free