Claude Code for Change Data Capture: Debezium, Kafka Connect, and Event Streaming — Claude Skills 360 Blog
Blog / Architecture / Claude Code for Change Data Capture: Debezium, Kafka Connect, and Event Streaming
Architecture

Claude Code for Change Data Capture: Debezium, Kafka Connect, and Event Streaming

Published: August 16, 2026
Read time: 9 min read
By: Claude Skills 360

Change data capture turns your database’s transaction log into a stream of events — letting downstream services react to data changes without polling or tight coupling. Debezium captures every INSERT, UPDATE, and DELETE from PostgreSQL’s WAL and publishes them to Kafka topics. Claude Code generates CDC pipeline configurations, consumer implementations, and the schema evolution patterns needed for long-running pipelines.

This guide covers CDC with Claude Code: Debezium configuration, Kafka consumer patterns, handling tombstones, schema evolution, and dead letter queue processing.

CLAUDE.md for CDC Pipelines

## Change Data Capture Stack
- PostgreSQL 15 with logical replication (wal_level = logical)
- Debezium 2.4 via Kafka Connect
- Kafka with Schema Registry (Confluent or Apicurio)
- Consumers: TypeScript (kafkajs) or Python (confluent-kafka)

## Key patterns
- Debezium publishes to topic: {server}.{schema}.{table}
- Each message: { before, after, op, source } where op = c/u/d/r (create/update/delete/read/snapshot)
- Tombstone records (null value) follow delete events — consumers must handle null
- Schema Registry for Avro — never parse raw JSON in production CDC
- Always track consumer group offset — never reset to beginning in production

PostgreSQL Setup for Logical Replication

Set up PostgreSQL for Debezium CDC.
What configuration changes are needed and what permissions does Debezium need?
-- postgresql.conf changes (requires restart)
-- wal_level = logical
-- max_replication_slots = 4
-- max_wal_senders = 4

-- Create dedicated Debezium user with minimal permissions
CREATE USER debezium WITH PASSWORD 'secure_password' REPLICATION;

-- Grant read access to tables being captured
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;

-- Grant replication slot creation
GRANT USAGE ON SCHEMA public TO debezium;

-- Required for Debezium's heartbeat table (keeps slot alive during quiet periods)
CREATE TABLE public.debezium_heartbeat (id SERIAL PRIMARY KEY, ts TIMESTAMP DEFAULT now());
GRANT ALL ON TABLE public.debezium_heartbeat TO debezium;

-- For tables with TOAST columns or large objects
ALTER TABLE orders REPLICA IDENTITY FULL; -- Captures full before/after rows
-- Default REPLICA IDENTITY only captures primary key in before row

Debezium Connector Configuration

Configure a Debezium PostgreSQL connector for the orders and
order_items tables. Include heartbeat, snapshot mode, and
schema change handling.
{
  "name": "orders-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",

    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${file:/kafka/connect/secrets:postgres.password}",
    "database.dbname": "production",
    "database.server.name": "prod-db",

    "table.include.list": "public.orders,public.order_items,public.products",

    "plugin.name": "pgoutput",
    "slot.name": "debezium_orders_slot",
    "publication.name": "dbz_publication",

    "snapshot.mode": "initial",
    "snapshot.isolation.mode": "read_committed",

    "key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "heartbeat.interval.ms": "30000",
    "heartbeat.action.query": "UPDATE public.debezium_heartbeat SET ts = now()",

    "tombstones.on.delete": "true",

    "decimal.handling.mode": "string",
    "time.precision.mode": "adaptive_time_microseconds",

    "topic.prefix": "prod-db",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq.orders-connector",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}

Deploy via Kafka Connect REST API:

curl -X POST http://kafka-connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @connector-config.json

Consumer Implementation

Write a consumer that processes order change events.
It should update a read model in Redis and handle all event types
including deletes and snapshots.
// src/consumers/orders-cdc-consumer.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
import { Redis } from 'ioredis';

interface DebeziumOrderEvent {
  before: OrderRecord | null;
  after: OrderRecord | null;
  op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
  source: {
    ts_ms: number;
    db: string;
    schema: string;
    table: string;
    txId: number;
    lsn: number;
  };
  ts_ms: number;
}

interface OrderRecord {
  id: string;
  user_id: string;
  status: string;
  total_cents: number;
  created_at: string;
  updated_at: string;
}

export class OrdersCDCConsumer {
  private consumer: Consumer;
  private registry: SchemaRegistry;
  private redis: Redis;

  constructor(kafka: Kafka, registry: SchemaRegistry, redis: Redis) {
    this.consumer = kafka.consumer({ groupId: 'order-read-model-updater' });
    this.registry = registry;
    this.redis = redis;
  }

  async start() {
    await this.consumer.connect();
    await this.consumer.subscribe({
      topic: 'prod-db.public.orders',
      fromBeginning: false, // Don't reprocess history on restart
    });

    await this.consumer.run({
      eachMessage: async (payload) => {
        await this.processMessage(payload);
      },
    });
  }

  private async processMessage({ message, heartbeat }: EachMessagePayload) {
    // Tombstone record — null value follows a delete event
    if (message.value === null) {
      const keyDecoded = await this.registry.decode(message.key!);
      await this.handleTombstone(keyDecoded.id);
      return;
    }

    // Decode Avro message via Schema Registry
    const event = await this.registry.decode(message.value) as DebeziumOrderEvent;

    switch (event.op) {
      case 'c': // INSERT
      case 'r': // Snapshot/read — treat same as insert for read model
        await this.upsertOrderCache(event.after!);
        break;

      case 'u': // UPDATE
        await this.upsertOrderCache(event.after!);
        // Detect status transitions for downstream notifications
        if (event.before?.status !== event.after?.status) {
          await this.handleStatusChange(event.before!, event.after!);
        }
        break;

      case 'd': // DELETE
        // after is null for deletes — use before for the deleted record
        await this.removeOrderCache(event.before!.id);
        break;
    }

    // Heartbeat to Kafka — prevents consumer group session timeout on slow processing
    await heartbeat();
  }

  private async upsertOrderCache(order: OrderRecord) {
    const key = `order:${order.id}`;
    const userOrdersKey = `user:${order.user_id}:orders`;

    await this.redis.pipeline()
      .setex(key, 3600, JSON.stringify(order)) // Cache individual order for 1h
      .zadd(userOrdersKey, new Date(order.created_at).getTime(), order.id) // Sorted set for listing
      .expire(userOrdersKey, 3600)
      .exec();
  }

  private async removeOrderCache(orderId: string) {
    // We need the user_id to remove from the sorted set
    const orderJson = await this.redis.get(`order:${orderId}`);
    if (orderJson) {
      const order = JSON.parse(orderJson) as OrderRecord;
      await this.redis.pipeline()
        .del(`order:${orderId}`)
        .zrem(`user:${order.user_id}:orders`, orderId)
        .exec();
    }
  }

  private async handleTombstone(orderId: string) {
    // Tombstone confirms the delete event — clean up anything missed
    await this.removeOrderCache(orderId);
  }

  private async handleStatusChange(before: OrderRecord, after: OrderRecord) {
    // Publish to internal event bus for notification service
    // This decouples notifications from the CDC consumer
    await this.redis.publish(
      'order:status-changed',
      JSON.stringify({
        orderId: after.id,
        userId: after.user_id,
        previousStatus: before.status,
        newStatus: after.status,
        timestamp: after.updated_at,
      }),
    );
  }

  async stop() {
    await this.consumer.disconnect();
  }
}

Schema Evolution

The orders table is adding a new column. How do I handle
the schema change in Debezium without breaking consumers?
I need to add a `source_channel` column to the orders table.
Walk me through the CDC-safe migration procedure.

The safe migration sequence for CDC pipelines:

-- Step 1: Add nullable column with default (no lock, backward compatible)
ALTER TABLE orders ADD COLUMN source_channel VARCHAR(50) DEFAULT 'web';

-- Step 2: Debezium picks up the schema change automatically via Schema Registry
-- New events will include source_channel in after
-- Old events (from before migration) will NOT have source_channel

-- Verify Debezium schema is updated:
-- GET http://schema-registry:8081/subjects/prod-db.public.orders-value/versions/latest

Consumer handles schema evolution using the Schema Registry’s Avro compatibility:

// The registry handles schema evolution — consumers using GenericRecord
// automatically get new fields with their default values
// Consumers using a typed interface need to handle optional new fields:

interface OrderRecord {
  id: string;
  user_id: string;
  status: string;
  total_cents: number;
  // Optional — won't exist in events from before the migration
  source_channel?: string;
  created_at: string;
}

// In consumer, handle missing field gracefully:
const sourceChannel = order.source_channel ?? 'web'; // Default for historical events

Dead Letter Queue Processing

// src/consumers/dlq-processor.ts
// Process failed CDC messages from the dead letter queue
const dlqConsumer = kafka.consumer({ groupId: 'dlq-processor' });
await dlqConsumer.subscribe({ topic: 'dlq.orders-connector' });

await dlqConsumer.run({
  eachMessage: async ({ message }) => {
    // DLQ headers contain the original error context
    const errorClass = message.headers?.['kafka_dlt-exception-fqcn']?.toString();
    const errorMessage = message.headers?.['kafka_dlt-exception-message']?.toString();
    const originalTopic = message.headers?.['kafka_dlt-original-topic']?.toString();
    const originalOffset = message.headers?.['kafka_dlt-original-offset']?.toString();

    console.error('DLQ message', { errorClass, errorMessage, originalTopic, originalOffset });

    // Alert and store for manual review
    await alerting.error('CDC message failed', { errorClass, errorMessage });
    await db.failedEvents.insert({
      topic: originalTopic,
      offset: originalOffset,
      payload: message.value?.toString(),
      error: errorMessage,
      receivedAt: new Date(),
    });
  },
});

For the full Kafka event streaming setup including topics, partitions, and consumers, see the Kafka guide. For event-driven architecture patterns including outbox and sagas that complement CDC, see the event-driven architecture guide. The Claude Skills 360 bundle includes data pipeline skill sets covering CDC, stream processing, and real-time analytics. Start with the free tier to try event streaming code generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free