InfluxDB is purpose-built for time series data — high-ingest metrics, IoT sensor streams, and APM telemetry. InfluxDB v3 (Cloud Serverless and Dedicated) uses Apache Arrow Flight SQL for queries and HTTP for writes. @influxdata/influxdb3-client: new InfluxDBClient({ host: INFLUXDB_HOST, token: TOKEN, database: BUCKET }). Write: client.write("cpu,host=server01,region=us-east usage_user=45.2,usage_system=3.1") — line protocol: measurement,tag1=val1,tag2=val2 field1=val1,field2=val2 nanosecond_timestamp. client.writePoints([new Point("temperature").tag("sensor", "s1").floatField("value", 23.5)]) with fluent API. Query with SQL: for await (const row of client.query("SELECT * FROM cpu WHERE time > now() - INTERVAL '1 hour'", { database: BUCKET })). Downsampling: INSERT INTO cpu_1h SELECT time_bucket(INTERVAL '1 hour', time) AS time, mean(usage_user) AS usage_user FROM cpu GROUP BY time_bucket(INTERVAL '1 hour', time), host. Flux (v2): from(bucket: "metrics") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu") |> mean(). Telegraf input: [[inputs.cpu]] / [[inputs.mem]] / [[inputs.disk]] with [[outputs.influxdb_v2]]. Retention policy: Cloud Serverless sets retention in bucket settings; v2: influx bucket update --retention 30d. Claude Code generates InfluxDB write clients, SQL time series queries, downsampling pipelines, and Telegraf configurations.
CLAUDE.md for InfluxDB
## InfluxDB Stack
- Client: @influxdata/influxdb3-client (v3/Cloud Serverless) or @influxdata/influxdb-client (v2)
- Connect: new InfluxDBClient({ host: process.env.INFLUXDB_HOST!, token: process.env.INFLUXDB_TOKEN!, database: BUCKET })
- Write: client.write(lineProtocol) or client.writePoints([new Point(...).tag().floatField()])
- Query: for await (const row of client.query(sql)) — returns AsyncIterable<Record<string, unknown>>
- Line protocol: measurement,tag=val field=val nanosecond_timestamp
- Batching: collect 1000 points or 1s intervals before writing — reduces HTTP overhead
InfluxDB Client
// lib/influxdb/client.ts — InfluxDB v3 write and query client
import { InfluxDBClient, Point } from "@influxdata/influxdb3-client"
const INFLUXDB_HOST = process.env.INFLUXDB_HOST! // e.g. https://us-east-1-1.aws.cloud2.influxdata.com
const INFLUXDB_TOKEN = process.env.INFLUXDB_TOKEN!
const INFLUXDB_DATABASE = process.env.INFLUXDB_DATABASE ?? "metrics"
// Singleton client
const globalInflux = globalThis as unknown as { influxClient?: InfluxDBClient }
export function getClient(): InfluxDBClient {
if (!globalInflux.influxClient) {
globalInflux.influxClient = new InfluxDBClient({
host: INFLUXDB_HOST,
token: INFLUXDB_TOKEN,
database: INFLUXDB_DATABASE,
})
}
return globalInflux.influxClient
}
// ── Write helpers ──────────────────────────────────────────────────────────
export type MetricPoint = {
measurement: string
tags?: Record<string, string>
fields: Record<string, number | string | boolean>
timestamp?: Date
}
/** Write a single metric point */
export async function writeMetric(metric: MetricPoint): Promise<void> {
const client = getClient()
const point = new Point(metric.measurement)
for (const [k, v] of Object.entries(metric.tags ?? {})) {
point.tag(k, v)
}
for (const [k, v] of Object.entries(metric.fields)) {
if (typeof v === "number") {
Number.isInteger(v) ? point.intField(k, v) : point.floatField(k, v)
} else if (typeof v === "boolean") {
point.booleanField(k, v)
} else {
point.stringField(k, v)
}
}
if (metric.timestamp) point.timestamp(metric.timestamp)
await client.write(point)
}
/** Batch write for high-throughput scenarios */
export async function writeMetricsBatch(metrics: MetricPoint[]): Promise<void> {
const client = getClient()
const points = metrics.map((m) => {
const p = new Point(m.measurement)
for (const [k, v] of Object.entries(m.tags ?? {})) p.tag(k, v)
for (const [k, v] of Object.entries(m.fields)) {
if (typeof v === "number") {
Number.isInteger(v) ? p.intField(k, v) : p.floatField(k, v)
} else if (typeof v === "boolean") {
p.booleanField(k, v)
} else {
p.stringField(k, v)
}
}
if (m.timestamp) p.timestamp(m.timestamp)
return p
})
await client.write(points)
}
// ── Query helpers ──────────────────────────────────────────────────────────
export type TimeSeriesRow = {
time: string
measurement: string
[field: string]: unknown
}
/** Execute a SQL query and collect all rows */
export async function queryRows<T extends TimeSeriesRow = TimeSeriesRow>(
sql: string,
options: { database?: string } = {},
): Promise<T[]> {
const client = getClient()
const rows: T[] = []
for await (const row of client.query(sql, { database: options.database ?? INFLUXDB_DATABASE })) {
rows.push(row as T)
}
return rows
}
/** Time series aggregation — returns { time, value }[] */
export async function timeseries(options: {
measurement: string
field: string
agg?: "mean" | "sum" | "count" | "min" | "max" | "last"
interval?: string // e.g. "5 minutes", "1 hour"
where?: string // extra WHERE conditions
lookback?: string // e.g. "24 hours", "7 days"
tags?: Record<string, string>
}): Promise<Array<{ time: string; value: number }>> {
const {
measurement, field,
agg = "mean",
interval = "5 minutes",
lookback = "1 hour",
where,
tags = {},
} = options
const tagFilters = Object.entries(tags)
.map(([k, v]) => `${k} = '${v}'`)
.join(" AND ")
const whereClause = [
`time > now() - INTERVAL '${lookback}'`,
tagFilters || null,
where || null,
].filter(Boolean).join(" AND ")
const sql = `
SELECT
time_bucket(INTERVAL '${interval}', time) AS time,
${agg}(${field}) AS value
FROM ${measurement}
WHERE ${whereClause}
GROUP BY 1
ORDER BY 1 ASC
`
const rows = await queryRows<{ time: string; value: number }>(sql)
return rows
}
Batch Write Buffer
// lib/influxdb/buffer.ts — buffered write for high-frequency metrics
import { writeMetricsBatch, type MetricPoint } from "./client"
export class MetricBuffer {
private buffer: MetricPoint[] = []
private flushTimer: ReturnType<typeof setTimeout> | null = null
private readonly maxSize: number
private readonly flushInterval: number
constructor(options: { maxSize?: number; flushIntervalMs?: number } = {}) {
this.maxSize = options.maxSize ?? 1000
this.flushInterval = options.flushIntervalMs ?? 1000
}
/** Add a metric to the buffer — flushes automatically */
push(metric: MetricPoint): void {
this.buffer.push(metric)
if (this.buffer.length >= this.maxSize) {
void this.flush()
return
}
if (!this.flushTimer) {
this.flushTimer = setTimeout(() => void this.flush(), this.flushInterval)
}
}
/** Flush buffer to InfluxDB */
async flush(): Promise<void> {
if (this.flushTimer) {
clearTimeout(this.flushTimer)
this.flushTimer = null
}
if (this.buffer.length === 0) return
const batch = this.buffer.splice(0, this.buffer.length)
try {
await writeMetricsBatch(batch)
} catch (err) {
console.error("[InfluxDB] Batch write failed:", err)
// Re-add to buffer on failure (up to maxSize)
if (this.buffer.length + batch.length <= this.maxSize * 2) {
this.buffer.unshift(...batch)
}
}
}
}
// Global buffer singleton
export const metricBuffer = new MetricBuffer({ maxSize: 500, flushIntervalMs: 2000 })
process.once("SIGTERM", () => metricBuffer.flush().catch(console.error))
process.once("SIGINT", () => metricBuffer.flush().catch(console.error))
Next.js API Routes
// app/api/metrics/write/route.ts — ingest metrics from client/services
import { NextResponse } from "next/server"
import { z } from "zod"
import { metricBuffer } from "@/lib/influxdb/buffer"
import { auth } from "@/lib/auth"
const MetricSchema = z.object({
measurement: z.string().max(64),
tags: z.record(z.string()).optional(),
fields: z.record(z.union([z.number(), z.string(), z.boolean()])),
timestamp: z.string().datetime().optional(),
})
const BatchSchema = z.array(MetricSchema).max(500)
export async function POST(req: Request) {
const session = await auth()
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 })
const body = await req.json()
const points = BatchSchema.parse(Array.isArray(body) ? body : [body])
for (const p of points) {
metricBuffer.push({
...p,
timestamp: p.timestamp ? new Date(p.timestamp) : undefined,
// Add user context as tags
tags: { ...p.tags, userId: session.user.id },
})
}
return NextResponse.json({ queued: points.length })
}
// app/api/metrics/query/route.ts — query time series data
import { timeseries } from "@/lib/influxdb/client"
export async function GET(req: Request) {
const session = await auth()
if (!session) return NextResponse.json({ error: "Unauthorized" }, { status: 401 })
const url = new URL(req.url)
const measurement = url.searchParams.get("measurement") ?? "events"
const field = url.searchParams.get("field") ?? "value"
const agg = (url.searchParams.get("agg") ?? "mean") as "mean" | "sum" | "count"
const interval = url.searchParams.get("interval") ?? "5 minutes"
const lookback = url.searchParams.get("lookback") ?? "24 hours"
const data = await timeseries({ measurement, field, agg, interval, lookback })
return NextResponse.json({ data })
}
Telegraf Configuration
# telegraf.conf — collect system metrics and ship to InfluxDB v3
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
flush_interval = "10s"
# System inputs
[[inputs.cpu]]
percpu = false
totalcpu = true
collect_cpu_time = false
[[inputs.mem]]
[[inputs.disk]]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
[[inputs.diskio]]
[[inputs.net]]
interfaces = ["eth*", "en*"]
# HTTP endpoint metrics (scrape your /metrics route)
[[inputs.http]]
urls = ["http://localhost:3000/api/metrics?format=json"]
method = "GET"
headers = { Authorization = "Bearer ${METRICS_TOKEN}" }
data_format = "json"
# Output to InfluxDB v3
[[outputs.influxdb_v2]]
urls = ["${INFLUXDB_HOST}"]
token = "${INFLUXDB_TOKEN}"
organization = "" # not used in v3
bucket = "${INFLUXDB_DATABASE}"
For the Prometheus/Grafana alternative when needing pull-based metrics scraping across many services with a rich PromQL query language, Alertmanager for complex routing, and the broader Prometheus ecosystem (exporters for every system) — Prometheus is the standard for infrastructure monitoring while InfluxDB is optimized for write-heavy time series workloads like IoT sensor streams, high-frequency trading, and observability pipelines where you need compressed columnar storage and fast range queries. For the TimescaleDB alternative when needing a time series database with full PostgreSQL compatibility — relational joins, triggers, existing Postgres tooling (pgAdmin, Prisma, Drizzle), and PostGIS geography support — TimescaleDB extends PostgreSQL with hypertables while InfluxDB is a purpose-built TSDB with a line protocol optimized for high-cardinality metric streams. The Claude Skills 360 bundle includes InfluxDB skill sets covering write clients, SQL time series queries, and Telegraf configuration. Start with the free tier to try time series generation.