Claude Code for BullMQ: Background Job Processing with Redis — Claude Skills 360 Blog
Blog / Backend / Claude Code for BullMQ: Background Job Processing with Redis
Backend

Claude Code for BullMQ: Background Job Processing with Redis

Published: April 3, 2027
Read time: 8 min read
By: Claude Skills 360

BullMQ processes background jobs using Redis as the broker — new Queue("queue-name", { connection }) creates a queue. queue.add("job-name", data, opts) enqueues a job. new Worker("queue-name", processor, { connection, concurrency }) processes jobs with a handler function. Job options include attempts, backoff, delay, priority, and removeOnComplete. new QueueScheduler("queue-name") is required for delayed and repeatable jobs. queue.add("job", data, { repeat: { pattern: "0 * * * *" } }) schedules hourly recurring jobs. FlowProducer creates parent-child job chains where children complete before the parent. QueueEvents monitors job lifecycle events globally. Bull Board provides a React dashboard for queue inspection. Claude Code generates typed BullMQ workers, retry configuration, cron-based recurring jobs, job priority systems, and monitoring patterns for production Node.js job queues.

CLAUDE.md for BullMQ

## BullMQ Stack
- Version: bullmq >= 5.10
- Queue: new Queue("name", { connection: { host, port } }) — enqueues jobs
- Worker: new Worker("name", async job => { ... }, { connection, concurrency: 5 })
- Opts: { attempts: 3, backoff: { type: "exponential", delay: 1000 }, removeOnComplete: 100 }
- Cron: queue.add("job", data, { repeat: { pattern: "0 9 * * *" } }) — requires QueueScheduler
- Priority: { priority: 1 } — lower number = higher priority (1 is highest)
- Events: queueEvents.on("completed", ({ jobId }) => ...) — global queue monitoring
- Flow: FlowProducer for parent-child chains — parent waits for all children
- Dashboard: @bull-board/express + BullMQAdapter

Queue Setup

// lib/queues/connection.ts — shared Redis connection
import { ConnectionOptions } from "bullmq"

export const redisConnection: ConnectionOptions = {
  host: process.env.REDIS_HOST ?? "localhost",
  port: parseInt(process.env.REDIS_PORT ?? "6379"),
  password: process.env.REDIS_PASSWORD,
  tls: process.env.NODE_ENV === "production" ? {} : undefined,
  maxRetriesPerRequest: null,  // Required for BullMQ
}
// lib/queues/email-queue.ts — typed queue with job data
import { Queue, QueueScheduler } from "bullmq"
import { redisConnection } from "./connection"

// Typed job data unions
export type EmailJobData =
  | { type: "order_confirmation"; orderId: string; userId: string }
  | { type: "welcome"; userId: string; name: string; email: string }
  | { type: "password_reset"; userId: string; resetToken: string; email: string }
  | { type: "marketing_blast"; campaignId: string; recipientIds: string[] }

export const emailQueue = new Queue<EmailJobData>("email", {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: "exponential", delay: 2_000 },
    removeOnComplete: { count: 1000, age: 24 * 3600 },  // Keep last 1000 or 24h
    removeOnFail: { count: 5000 },  // Keep failed jobs longer for inspection
  },
})

// Required for delayed/repeatable jobs
export const emailQueueScheduler = new QueueScheduler("email", {
  connection: redisConnection,
})

// Job addition helpers
export const enqueueOrderConfirmation = (orderId: string, userId: string) =>
  emailQueue.add("order_confirmation", { type: "order_confirmation", orderId, userId }, {
    priority: 1,  // High priority — customer is waiting
    attempts: 5,
  })

export const scheduleWeeklyDigest = () =>
  emailQueue.add(
    "weekly_digest",
    { type: "marketing_blast", campaignId: "weekly", recipientIds: [] },
    {
      repeat: { pattern: "0 9 * * 1" },  // Every Monday at 9am
      jobId: "weekly-digest",  // Unique ID prevents duplicate scheduled jobs
    }
  )

Worker Implementation

// lib/workers/email-worker.ts — processes email jobs
import { Worker, type Job } from "bullmq"
import { redisConnection } from "../queues/connection"
import type { EmailJobData } from "../queues/email-queue"
import { sendOrderConfirmation, sendWelcomeEmail, sendPasswordReset } from "@/lib/email"

async function processEmailJob(job: Job<EmailJobData>): Promise<void> {
  const { data } = job

  switch (data.type) {
    case "order_confirmation":
      await job.updateProgress(10)
      const orderData = await fetchOrderData(data.orderId)
      await job.updateProgress(50)
      await sendOrderConfirmation({ to: orderData.email, ...orderData })
      await job.updateProgress(100)
      break

    case "welcome":
      await sendWelcomeEmail({ to: data.email, name: data.name })
      break

    case "password_reset":
      await sendPasswordReset({
        to: data.email,
        resetToken: data.resetToken,
        expiresIn: "1 hour",
      })
      break

    case "marketing_blast": {
      // Process in batches to avoid rate limits
      const BATCH_SIZE = 50
      const recipients = await fetchRecipients(data.recipientIds)

      for (let i = 0; i < recipients.length; i += BATCH_SIZE) {
        const batch = recipients.slice(i, i + BATCH_SIZE)
        await sendBatchEmails(batch, data.campaignId)
        await job.updateProgress(Math.round(((i + BATCH_SIZE) / recipients.length) * 100))

        // Rate limit: 50 emails per second
        if (i + BATCH_SIZE < recipients.length) {
          await new Promise(resolve => setTimeout(resolve, 1_000))
        }
      }
      break
    }

    default:
      throw new Error(`Unknown email job type: ${(data as any).type}`)
  }
}

export const emailWorker = new Worker<EmailJobData>(
  "email",
  processEmailJob,
  {
    connection: redisConnection,
    concurrency: 10,  // Process 10 jobs simultaneously
    limiter: {
      max: 100,     // Max 100 jobs
      duration: 60_000,  // Per minute (rate limiting at worker level)
    },
  }
)

// Worker lifecycle events
emailWorker.on("completed", job => {
  console.log(`[Email Worker] Job ${job.id} (${job.data.type}) completed`)
})

emailWorker.on("failed", (job, err) => {
  console.error(`[Email Worker] Job ${job?.id} failed:`, {
    type: job?.data.type,
    attempt: job?.attemptsMade,
    error: err.message,
  })
})

emailWorker.on("error", err => {
  console.error("[Email Worker] Worker error:", err)
})

// Graceful shutdown
process.on("SIGTERM", async () => {
  await emailWorker.close()
  process.exit(0)
})

async function fetchOrderData(_orderId: string) {
  return { email: "[email protected]", orderNumber: "123", items: [], totalCents: 0 }
}
async function fetchRecipients(ids: string[]) { return ids.map(id => ({ id, email: "" })) }
async function sendBatchEmails(_recipients: unknown[], _campaignId: string) { /* ... */ }

FlowProducer for Job Chains

// lib/queues/order-flow.ts — dependent job chains
import { FlowProducer } from "bullmq"
import { redisConnection } from "./connection"

const flowProducer = new FlowProducer({ connection: redisConnection })

// Create order → send confirmation + update inventory (parallel children)
// Both children must complete before the parent (fulfillment) job runs
export async function createOrderFlow(orderId: string, userId: string) {
  return flowProducer.add({
    name: "order_fulfillment",
    queueName: "fulfillment",
    data: { orderId },
    children: [
      {
        name: "send_confirmation",
        queueName: "email",
        data: { type: "order_confirmation", orderId, userId },
      },
      {
        name: "update_inventory",
        queueName: "inventory",
        data: { orderId },
      },
      {
        name: "notify_warehouse",
        queueName: "notifications",
        data: { orderId, channel: "warehouse" },
      },
    ],
  })
}

Bull Board Dashboard

// app.ts — Express server with Bull Board
import express from "express"
import { createBullBoard } from "@bull-board/api"
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"
import { ExpressAdapter } from "@bull-board/express"
import { emailQueue } from "./lib/queues/email-queue"

const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath("/admin/queues")

createBullBoard({
  queues: [new BullMQAdapter(emailQueue)],
  serverAdapter,
})

const app = express()
app.use("/admin/queues", serverAdapter.getRouter())

For the pg-boss alternative when background jobs backed by PostgreSQL (not Redis) are preferred — pg-boss uses a jobs table in your existing database, eliminating the Redis dependency at the cost of lower throughput and higher database load for high-volume queues, see the database-backed queue guide. For the Inngest alternative when a serverless-native event-driven function platform with no persistent infrastructure (Redis or database) is needed — Inngest runs steps as serverless function invocations orchestrated by their cloud, allowing long-running workflows without a queue server, see the serverless workflow guide. The Claude Skills 360 bundle includes BullMQ skill sets covering workers, cron jobs, and job chains. Start with the free tier to try queue generation.

Keep Reading

Backend

Claude Code for Bun: Fast JavaScript Runtime and Toolkit

Build with Bun and Claude Code — Bun.serve for HTTP servers, Bun.file for fast file I/O, Bun.$ for shell commands, Bun.sql for SQLite and PostgreSQL, Bun.build for bundling, bun:test for testing, Bun.hash for hashing, bun.lock for deterministic installs, bun run for package.json scripts, hot reloading with --hot, bun init for project scaffolding, and compatibility with Node.js modules.

6 min read Jun 13, 2027
Backend

Claude Code for Express.js Advanced: Patterns for Production APIs

Advanced Express.js patterns with Claude Code — typed request handlers with RequestHandler generics, async error handling middleware, Zod validation middleware factory, rate limiting with express-rate-limit and Redis store, helmet security middleware, compression, dependency injection with tsyringe, file upload with multer and S3, pagination utilities, JWT middleware, and structured logging with pino.

6 min read Jun 8, 2027
Backend

Claude Code for KeystoneJS: Node.js CMS and App Framework

Build full-stack apps with KeystoneJS and Claude Code — config with lists, fields.text and fields.relationship for schema definition, access control with isAuthenticated and isAdmin functions, hooks with beforeOperation and afterOperation, GraphQL API auto-generation from schema, AdminUI for content management, session with statelessSessions, Prisma adapter for database, file storage with images and files fields, and custom REST endpoints.

6 min read Jun 7, 2027

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free