Change data capture turns your database’s transaction log into a stream of events — letting downstream services react to data changes without polling or tight coupling. Debezium captures every INSERT, UPDATE, and DELETE from PostgreSQL’s WAL and publishes them to Kafka topics. Claude Code generates CDC pipeline configurations, consumer implementations, and the schema evolution patterns needed for long-running pipelines.
This guide covers CDC with Claude Code: Debezium configuration, Kafka consumer patterns, handling tombstones, schema evolution, and dead letter queue processing.
CLAUDE.md for CDC Pipelines
## Change Data Capture Stack
- PostgreSQL 15 with logical replication (wal_level = logical)
- Debezium 2.4 via Kafka Connect
- Kafka with Schema Registry (Confluent or Apicurio)
- Consumers: TypeScript (kafkajs) or Python (confluent-kafka)
## Key patterns
- Debezium publishes to topic: {server}.{schema}.{table}
- Each message: { before, after, op, source } where op = c/u/d/r (create/update/delete/read/snapshot)
- Tombstone records (null value) follow delete events — consumers must handle null
- Schema Registry for Avro — never parse raw JSON in production CDC
- Always track consumer group offset — never reset to beginning in production
PostgreSQL Setup for Logical Replication
Set up PostgreSQL for Debezium CDC.
What configuration changes are needed and what permissions does Debezium need?
-- postgresql.conf changes (requires restart)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4
-- Create dedicated Debezium user with minimal permissions
CREATE USER debezium WITH PASSWORD 'secure_password' REPLICATION;
-- Grant read access to tables being captured
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
-- Grant replication slot creation
GRANT USAGE ON SCHEMA public TO debezium;
-- Required for Debezium's heartbeat table (keeps slot alive during quiet periods)
CREATE TABLE public.debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP DEFAULT now());
GRANT ALL ON TABLE public.debezium_heartbeat TO debezium;
-- For tables with TOAST columns or large objects
ALTER TABLE orders REPLICA IDENTITY FULL; -- Captures full before/after rows
-- Default REPLICA IDENTITY only captures primary key in before row
Debezium Connector Configuration
Configure a Debezium PostgreSQL connector for the orders and
order_items tables. Include heartbeat, snapshot mode, and
schema change handling.
{
"name": "orders-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/kafka/connect/secrets:postgres.password}",
"database.dbname": "production",
"database.server.name": "prod-db",
"table.include.list": "public.orders,public.order_items,public.products",
"plugin.name": "pgoutput",
"slot.name": "debezium_orders_slot",
"publication.name": "dbz_publication",
"snapshot.mode": "initial",
"snapshot.isolation.mode": "read_committed",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"heartbeat.interval.ms": "30000",
"heartbeat.action.query": "UPDATE public.debezium_heartbeat SET ts = now()",
"tombstones.on.delete": "true",
"decimal.handling.mode": "string",
"time.precision.mode": "adaptive_time_microseconds",
"topic.prefix": "prod-db",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq.orders-connector",
"errors.deadletterqueue.context.headers.enable": "true"
}
}
Deploy via Kafka Connect REST API:
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
Consumer Implementation
Write a consumer that processes order change events.
It should update a read model in Redis and handle all event types
including deletes and snapshots.
// src/consumers/orders-cdc-consumer.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { Redis } from 'ioredis';
interface DebeziumOrderEvent {
before: OrderRecord | null;
after: OrderRecord | null;
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
source: {
ts_ms: number;
db: string;
schema: string;
table: string;
txId: number;
lsn: number;
};
ts_ms: number;
}
interface OrderRecord {
id: string;
user_id: string;
status: string;
total_cents: number;
created_at: string;
updated_at: string;
}
export class OrdersCDCConsumer {
private consumer: Consumer;
private registry: SchemaRegistry;
private redis: Redis;
constructor(kafka: Kafka, registry: SchemaRegistry, redis: Redis) {
this.consumer = kafka.consumer({ groupId: 'order-read-model-updater' });
this.registry = registry;
this.redis = redis;
}
async start() {
await this.consumer.connect();
await this.consumer.subscribe({
topic: 'prod-db.public.orders',
fromBeginning: false, // Don't reprocess history on restart
});
await this.consumer.run({
eachMessage: async (payload) => {
await this.processMessage(payload);
},
});
}
private async processMessage({ message, heartbeat }: EachMessagePayload) {
// Tombstone record — null value follows a delete event
if (message.value === null) {
const keyDecoded = await this.registry.decode(message.key!);
await this.handleTombstone(keyDecoded.id);
return;
}
// Decode Avro message via Schema Registry
const event = await this.registry.decode(message.value) as DebeziumOrderEvent;
switch (event.op) {
case 'c': // INSERT
case 'r': // Snapshot/read — treat same as insert for read model
await this.upsertOrderCache(event.after!);
break;
case 'u': // UPDATE
await this.upsertOrderCache(event.after!);
// Detect status transitions for downstream notifications
if (event.before?.status !== event.after?.status) {
await this.handleStatusChange(event.before!, event.after!);
}
break;
case 'd': // DELETE
// after is null for deletes — use before for the deleted record
await this.removeOrderCache(event.before!.id);
break;
}
// Heartbeat to Kafka — prevents consumer group session timeout on slow processing
await heartbeat();
}
private async upsertOrderCache(order: OrderRecord) {
const key = `order:${order.id}`;
const userOrdersKey = `user:${order.user_id}:orders`;
await this.redis.pipeline()
.setex(key, 3600, JSON.stringify(order)) // Cache individual order for 1h
.zadd(userOrdersKey, new Date(order.created_at).getTime(), order.id) // Sorted set for listing
.expire(userOrdersKey, 3600)
.exec();
}
private async removeOrderCache(orderId: string) {
// We need the user_id to remove from the sorted set
const orderJson = await this.redis.get(`order:${orderId}`);
if (orderJson) {
const order = JSON.parse(orderJson) as OrderRecord;
await this.redis.pipeline()
.del(`order:${orderId}`)
.zrem(`user:${order.user_id}:orders`, orderId)
.exec();
}
}
private async handleTombstone(orderId: string) {
// Tombstone confirms the delete event — clean up anything missed
await this.removeOrderCache(orderId);
}
private async handleStatusChange(before: OrderRecord, after: OrderRecord) {
// Publish to internal event bus for notification service
// This decouples notifications from the CDC consumer
await this.redis.publish(
'order:status-changed',
JSON.stringify({
orderId: after.id,
userId: after.user_id,
previousStatus: before.status,
newStatus: after.status,
timestamp: after.updated_at,
}),
);
}
async stop() {
await this.consumer.disconnect();
}
}
Schema Evolution
The orders table is adding a new column. How do I handle
the schema change in Debezium without breaking consumers?
I need to add a `source_channel` column to the orders table.
Walk me through the CDC-safe migration procedure.
The safe migration sequence for CDC pipelines:
-- Step 1: Add nullable column with default (no lock, backward compatible)
ALTER TABLE orders ADD COLUMN source_channel VARCHAR(50) DEFAULT 'web';
-- Step 2: Debezium picks up the schema change automatically via Schema Registry
-- New events will include source_channel in after
-- Old events (from before migration) will NOT have source_channel
-- Verify Debezium schema is updated:
-- GET http://schema-registry:8081/subjects/prod-db.public.orders-value/versions/latest
Consumer handles schema evolution using the Schema Registry’s Avro compatibility:
// The registry handles schema evolution — consumers using GenericRecord
// automatically get new fields with their default values
// Consumers using a typed interface need to handle optional new fields:
interface OrderRecord {
id: string;
user_id: string;
status: string;
total_cents: number;
// Optional — won't exist in events from before the migration
source_channel?: string;
created_at: string;
}
// In consumer, handle missing field gracefully:
const sourceChannel = order.source_channel ?? 'web'; // Default for historical events
Dead Letter Queue Processing
// src/consumers/dlq-processor.ts
// Process failed CDC messages from the dead letter queue
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' });
await dlqConsumer.subscribe({ topic: 'dlq.orders-connector' });
await dlqConsumer.run({
eachMessage: async ({ message }) => {
// DLQ headers contain the original error context
const errorClass = message.headers?.['kafka_dlt-exception-fqcn']?.toString();
const errorMessage = message.headers?.['kafka_dlt-exception-message']?.toString();
const originalTopic = message.headers?.['kafka_dlt-original-topic']?.toString();
const originalOffset = message.headers?.['kafka_dlt-original-offset']?.toString();
console.error('DLQ message', { errorClass, errorMessage, originalTopic, originalOffset });
// Alert and store for manual review
await alerting.error('CDC message failed', { errorClass, errorMessage });
await db.failedEvents.insert({
topic: originalTopic,
offset: originalOffset,
payload: message.value?.toString(),
error: errorMessage,
receivedAt: new Date(),
});
},
});
For the full Kafka event streaming setup including topics, partitions, and consumers, see the Kafka guide. For event-driven architecture patterns including outbox and sagas that complement CDC, see the event-driven architecture guide. The Claude Skills 360 bundle includes data pipeline skill sets covering CDC, stream processing, and real-time analytics. Start with the free tier to try event streaming code generation.