Claude Code for Redis Advanced: Streams, Lua Scripts, Pub/Sub, and Clustering — Claude Skills 360 Blog
Blog / Infrastructure / Claude Code for Redis Advanced: Streams, Lua Scripts, Pub/Sub, and Clustering
Infrastructure

Claude Code for Redis Advanced: Streams, Lua Scripts, Pub/Sub, and Clustering

Published: December 5, 2026
Read time: 9 min read
By: Claude Skills 360

Redis is far more than a cache — it’s a multi-model data store with streams, time-series, search, JSON documents, and probabilistic data structures. Redis Streams enable event sourcing and consumer groups with replay capability. Lua scripts provide atomic multi-step operations without transactions. RedisSearch powers full-text and vector similarity search without Elasticsearch. Claude Code generates Redis Streams consumers, Lua atomic scripts, search index configuration, and the cluster setup for production high availability.

CLAUDE.md for Redis Projects

## Redis Stack
- Redis Stack 7.x (includes Search, JSON, TimeSeries, Bloom)
- Client: ioredis (Node.js) or redis-py with asyncio (Python)
- Streams: XADD for event sourcing, consumer groups for competing consumers
- Lua: atomic multi-step operations (rate limiting, distributed locks, credit deduction)
- Search: RedisSearch for full-text + vector similarity on documents
- Keys: namespaced with colons (user:{id}:session, order:{id}:status)
- Expire: always set TTL for session/cache keys — never set for Streams
- Cluster: 3-node cluster for HA (no sentinel — cluster preferred)

Redis Streams for Event Sourcing

// lib/eventStore.ts — Redis Streams as event log
import IORedis from 'ioredis';

const redis = new IORedis({ host: 'redis', port: 6379 });

interface DomainEvent {
    type: string;
    aggregateId: string;
    payload: Record<string, unknown>;
    version: number;
}

// Append event to stream
export async function appendEvent(event: DomainEvent): Promise<string> {
    const streamKey = `events:${event.aggregateId}`;
    
    const messageId = await redis.xadd(
        streamKey,
        '*',  // Auto-generate ID (timestamp-based)
        'type', event.type,
        'aggregate_id', event.aggregateId,
        'version', String(event.version),
        'payload', JSON.stringify(event.payload),
        'timestamp', String(Date.now()),
    );
    
    if (!messageId) throw new Error('Failed to append event');
    
    // Also append to global event log for projections
    await redis.xadd(
        'events:all',
        '*',
        'stream_key', streamKey,
        'message_id', messageId,
        'type', event.type,
    );
    
    return messageId;
}

// Read events for an aggregate (replay)
export async function getEvents(aggregateId: string, fromVersion = 0): Promise<DomainEvent[]> {
    const streamKey = `events:${aggregateId}`;
    
    // Count-based reads use stream positions
    const rawMessages = await redis.xrange(streamKey, '-', '+');
    
    return rawMessages
        .map(([id, fields]) => {
            const obj: Record<string, string> = {};
            for (let i = 0; i < fields.length; i += 2) {
                obj[fields[i]] = fields[i + 1];
            }
            return {
                type: obj.type,
                aggregateId: obj.aggregate_id,
                payload: JSON.parse(obj.payload),
                version: parseInt(obj.version),
            };
        })
        .filter(e => e.version >= fromVersion);
}

Consumer Groups for Event Processing

// consumers/orderProjection.ts — competing consumers
async function startOrderProjectionConsumer(consumerId: string) {
    const GROUP_NAME = 'order-projections';
    const STREAM_KEY = 'events:all';
    const BATCH_SIZE = 10;
    
    // Create consumer group (idempotent — errors if already exists with same ID)
    try {
        await redis.xgroup('CREATE', STREAM_KEY, GROUP_NAME, '0', 'MKSTREAM');
    } catch (e: any) {
        if (!e.message.includes('BUSYGROUP')) throw e;
    }
    
    while (true) {
        try {
            // Read new messages for this consumer
            const results = await redis.xreadgroup(
                'GROUP', GROUP_NAME, consumerId,
                'COUNT', BATCH_SIZE,
                'BLOCK', 2000,  // Block for 2 seconds if no messages
                'STREAMS', STREAM_KEY,
                '>',  // Only undelivered messages
            );
            
            if (!results) continue;  // Timeout — loop again
            
            const [, messages] = results[0];
            
            for (const [messageId, fields] of messages) {
                try {
                    await processProjectionEvent(fields);
                    // Acknowledge — removes from Pending Entry List
                    await redis.xack(STREAM_KEY, GROUP_NAME, messageId);
                } catch (e) {
                    console.error(`Failed to process ${messageId}:`, e);
                    // Don't ack — will be redelivered or moved to DLQ after maxRetries
                }
            }
        } catch (e) {
            console.error('Consumer error:', e);
            await new Promise(r => setTimeout(r, 1000));
        }
    }
}

// Claim stuck messages (PEL cleanup — run periodically)
async function reclaimStuckMessages(maxIdleMs = 30_000) {
    const STREAM_KEY = 'events:all';
    const GROUP_NAME = 'order-projections';
    
    const pending = await redis.xautoclaim(
        STREAM_KEY, GROUP_NAME, 'recovery-consumer',
        maxIdleMs,
        '0-0',
        'COUNT', 100,
    );
    
    return pending[1].length;  // Count of reclaimed messages
}

Lua Scripts for Atomic Operations

-- scripts/ratelimit.lua — sliding window rate limiter
-- KEYS[1] = rate limit key (e.g., "ratelimit:user:123:api")
-- ARGV[1] = current timestamp (ms)
-- ARGV[2] = window size (ms)
-- ARGV[3] = max requests in window
-- Returns: 1 (allowed), 0 (rate limited), remaining count

local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local window_start = now - window

-- Remove expired entries from sorted set
redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)

-- Count requests in current window
local count = redis.call('ZCARD', key)

if count >= limit then
    return {0, 0}  -- Rate limited, 0 remaining
end

-- Add current request with timestamp as score
redis.call('ZADD', key, now, now .. '-' .. math.random(1000000))
redis.call('PEXPIRE', key, window)

return {1, limit - count - 1}  -- Allowed, remaining
// Using Lua scripts from Node.js
const rateLimitScript = fs.readFileSync('./scripts/ratelimit.lua', 'utf-8');

redis.defineCommand('ratelimit', {
    numberOfKeys: 1,
    lua: rateLimitScript,
});

async function checkRateLimit(identifier: string, windowMs: number, max: number) {
    const key = `ratelimit:${identifier}`;
    const [allowed, remaining] = await (redis as any).ratelimit(
        key,
        Date.now(),
        windowMs,
        max,
    ) as [number, number];
    
    return { allowed: allowed === 1, remaining };
}
// lib/search.ts — RedisSearch index
import { createClient } from 'redis';
import { SchemaFieldTypes, VectorAlgorithms } from 'redis';

const client = createClient({ url: 'redis://redis:6379' });

// Create search index on JSON documents
async function createOrderIndex() {
    try {
        await client.ft.create('idx:orders', {
            'customer_name': { type: SchemaFieldTypes.TEXT, WEIGHT: 2.0 },
            'product_names': { type: SchemaFieldTypes.TEXT },
            'status': { type: SchemaFieldTypes.TAG },
            'total_cents': { type: SchemaFieldTypes.NUMERIC, SORTABLE: true },
            'created_at': { type: SchemaFieldTypes.NUMERIC, SORTABLE: true },
            'embedding': {
                type: SchemaFieldTypes.VECTOR,
                ALGORITHM: VectorAlgorithms.HNSW,
                TYPE: 'FLOAT32',
                DIM: 1536,  // OpenAI text-embedding-3-small dimensions
                DISTANCE_METRIC: 'COSINE',
            },
        }, {
            ON: 'JSON',
            PREFIX: 'order:',
        });
    } catch (e: any) {
        if (!e.message.includes('Index already exists')) throw e;
    }
}

// Full-text search
async function searchOrders(query: string, status?: string, limit = 20) {
    const filter = status ? `@status:{${status}}` : '';
    const textQuery = query ? `(@customer_name|product_names:${query})` : '*';
    
    const results = await client.ft.search(
        'idx:orders',
        `${textQuery} ${filter}`.trim(),
        {
            LIMIT: { from: 0, size: limit },
            SORTBY: { BY: 'created_at', DIRECTION: 'DESC' },
            RETURN: ['customer_name', 'status', 'total_cents', 'created_at'],
        },
    );
    
    return results.documents.map(doc => doc.value);
}

// Vector similarity search (semantic)
async function semanticOrderSearch(embedding: number[], k = 10): Promise<any[]> {
    const buffer = Buffer.from(new Float32Array(embedding).buffer);
    
    const results = await client.ft.search(
        'idx:orders',
        `*=>[KNN ${k} @embedding $BLOB AS score]`,
        {
            PARAMS: { BLOB: buffer },
            SORTBY: { BY: 'score', DIRECTION: 'ASC' },
            DIALECT: 2,
        },
    );
    
    return results.documents.map(doc => ({
        ...doc.value,
        similarity: 1 - parseFloat(String(doc.value.score ?? '1')),
    }));
}

For the NATS messaging that complements Redis Streams for lighter-weight pub/sub, see the NATS messaging guide for JetStream consumer patterns. For background job queues built on Redis with BullMQ, the background jobs guide covers queue patterns and dead letter handling. The Claude Skills 360 bundle includes Redis skill sets covering Streams consumer groups, Lua atomics, and RedisSearch configuration. Start with the free tier to try Redis Streams pattern generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free