BullMQ processes background jobs using Redis as the broker — new Queue("queue-name", { connection }) creates a queue. queue.add("job-name", data, opts) enqueues a job. new Worker("queue-name", processor, { connection, concurrency }) processes jobs with a handler function. Job options include attempts, backoff, delay, priority, and removeOnComplete. new QueueScheduler("queue-name") is required for delayed and repeatable jobs. queue.add("job", data, { repeat: { pattern: "0 * * * *" } }) schedules hourly recurring jobs. FlowProducer creates parent-child job chains where children complete before the parent. QueueEvents monitors job lifecycle events globally. Bull Board provides a React dashboard for queue inspection. Claude Code generates typed BullMQ workers, retry configuration, cron-based recurring jobs, job priority systems, and monitoring patterns for production Node.js job queues.
CLAUDE.md for BullMQ
## BullMQ Stack
- Version: bullmq >= 5.10
- Queue: new Queue("name", { connection: { host, port } }) — enqueues jobs
- Worker: new Worker("name", async job => { ... }, { connection, concurrency: 5 })
- Opts: { attempts: 3, backoff: { type: "exponential", delay: 1000 }, removeOnComplete: 100 }
- Cron: queue.add("job", data, { repeat: { pattern: "0 9 * * *" } }) — requires QueueScheduler
- Priority: { priority: 1 } — lower number = higher priority (1 is highest)
- Events: queueEvents.on("completed", ({ jobId }) => ...) — global queue monitoring
- Flow: FlowProducer for parent-child chains — parent waits for all children
- Dashboard: @bull-board/express + BullMQAdapter
Queue Setup
// lib/queues/connection.ts — shared Redis connection
import { ConnectionOptions } from "bullmq"
export const redisConnection: ConnectionOptions = {
host: process.env.REDIS_HOST ?? "localhost",
port: parseInt(process.env.REDIS_PORT ?? "6379"),
password: process.env.REDIS_PASSWORD,
tls: process.env.NODE_ENV === "production" ? {} : undefined,
maxRetriesPerRequest: null, // Required for BullMQ
}
// lib/queues/email-queue.ts — typed queue with job data
import { Queue, QueueScheduler } from "bullmq"
import { redisConnection } from "./connection"
// Typed job data unions
export type EmailJobData =
| { type: "order_confirmation"; orderId: string; userId: string }
| { type: "welcome"; userId: string; name: string; email: string }
| { type: "password_reset"; userId: string; resetToken: string; email: string }
| { type: "marketing_blast"; campaignId: string; recipientIds: string[] }
export const emailQueue = new Queue<EmailJobData>("email", {
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: { type: "exponential", delay: 2_000 },
removeOnComplete: { count: 1000, age: 24 * 3600 }, // Keep last 1000 or 24h
removeOnFail: { count: 5000 }, // Keep failed jobs longer for inspection
},
})
// Required for delayed/repeatable jobs
export const emailQueueScheduler = new QueueScheduler("email", {
connection: redisConnection,
})
// Job addition helpers
export const enqueueOrderConfirmation = (orderId: string, userId: string) =>
emailQueue.add("order_confirmation", { type: "order_confirmation", orderId, userId }, {
priority: 1, // High priority — customer is waiting
attempts: 5,
})
export const scheduleWeeklyDigest = () =>
emailQueue.add(
"weekly_digest",
{ type: "marketing_blast", campaignId: "weekly", recipientIds: [] },
{
repeat: { pattern: "0 9 * * 1" }, // Every Monday at 9am
jobId: "weekly-digest", // Unique ID prevents duplicate scheduled jobs
}
)
Worker Implementation
// lib/workers/email-worker.ts — processes email jobs
import { Worker, type Job } from "bullmq"
import { redisConnection } from "../queues/connection"
import type { EmailJobData } from "../queues/email-queue"
import { sendOrderConfirmation, sendWelcomeEmail, sendPasswordReset } from "@/lib/email"
async function processEmailJob(job: Job<EmailJobData>): Promise<void> {
const { data } = job
switch (data.type) {
case "order_confirmation":
await job.updateProgress(10)
const orderData = await fetchOrderData(data.orderId)
await job.updateProgress(50)
await sendOrderConfirmation({ to: orderData.email, ...orderData })
await job.updateProgress(100)
break
case "welcome":
await sendWelcomeEmail({ to: data.email, name: data.name })
break
case "password_reset":
await sendPasswordReset({
to: data.email,
resetToken: data.resetToken,
expiresIn: "1 hour",
})
break
case "marketing_blast": {
// Process in batches to avoid rate limits
const BATCH_SIZE = 50
const recipients = await fetchRecipients(data.recipientIds)
for (let i = 0; i < recipients.length; i += BATCH_SIZE) {
const batch = recipients.slice(i, i + BATCH_SIZE)
await sendBatchEmails(batch, data.campaignId)
await job.updateProgress(Math.round(((i + BATCH_SIZE) / recipients.length) * 100))
// Rate limit: 50 emails per second
if (i + BATCH_SIZE < recipients.length) {
await new Promise(resolve => setTimeout(resolve, 1_000))
}
}
break
}
default:
throw new Error(`Unknown email job type: ${(data as any).type}`)
}
}
export const emailWorker = new Worker<EmailJobData>(
"email",
processEmailJob,
{
connection: redisConnection,
concurrency: 10, // Process 10 jobs simultaneously
limiter: {
max: 100, // Max 100 jobs
duration: 60_000, // Per minute (rate limiting at worker level)
},
}
)
// Worker lifecycle events
emailWorker.on("completed", job => {
console.log(`[Email Worker] Job ${job.id} (${job.data.type}) completed`)
})
emailWorker.on("failed", (job, err) => {
console.error(`[Email Worker] Job ${job?.id} failed:`, {
type: job?.data.type,
attempt: job?.attemptsMade,
error: err.message,
})
})
emailWorker.on("error", err => {
console.error("[Email Worker] Worker error:", err)
})
// Graceful shutdown
process.on("SIGTERM", async () => {
await emailWorker.close()
process.exit(0)
})
async function fetchOrderData(_orderId: string) {
return { email: "[email protected]", orderNumber: "123", items: [], totalCents: 0 }
}
async function fetchRecipients(ids: string[]) { return ids.map(id => ({ id, email: "" })) }
async function sendBatchEmails(_recipients: unknown[], _campaignId: string) { /* ... */ }
FlowProducer for Job Chains
// lib/queues/order-flow.ts — dependent job chains
import { FlowProducer } from "bullmq"
import { redisConnection } from "./connection"
const flowProducer = new FlowProducer({ connection: redisConnection })
// Create order → send confirmation + update inventory (parallel children)
// Both children must complete before the parent (fulfillment) job runs
export async function createOrderFlow(orderId: string, userId: string) {
return flowProducer.add({
name: "order_fulfillment",
queueName: "fulfillment",
data: { orderId },
children: [
{
name: "send_confirmation",
queueName: "email",
data: { type: "order_confirmation", orderId, userId },
},
{
name: "update_inventory",
queueName: "inventory",
data: { orderId },
},
{
name: "notify_warehouse",
queueName: "notifications",
data: { orderId, channel: "warehouse" },
},
],
})
}
Bull Board Dashboard
// app.ts — Express server with Bull Board
import express from "express"
import { createBullBoard } from "@bull-board/api"
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"
import { ExpressAdapter } from "@bull-board/express"
import { emailQueue } from "./lib/queues/email-queue"
const serverAdapter = new ExpressAdapter()
serverAdapter.setBasePath("/admin/queues")
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
})
const app = express()
app.use("/admin/queues", serverAdapter.getRouter())
For the pg-boss alternative when background jobs backed by PostgreSQL (not Redis) are preferred — pg-boss uses a jobs table in your existing database, eliminating the Redis dependency at the cost of lower throughput and higher database load for high-volume queues, see the database-backed queue guide. For the Inngest alternative when a serverless-native event-driven function platform with no persistent infrastructure (Redis or database) is needed — Inngest runs steps as serverless function invocations orchestrated by their cloud, allowing long-running workflows without a queue server, see the serverless workflow guide. The Claude Skills 360 bundle includes BullMQ skill sets covering workers, cron jobs, and job chains. Start with the free tier to try queue generation.