RQ (Redis Queue) processes background jobs with Redis. pip install rq. Connect: from redis import Redis; from rq import Queue; redis = Redis(); q = Queue(connection=redis). Enqueue: job = q.enqueue(my_func, arg1, arg2). Named queue: Queue("emails", connection=redis). Job id: job.id. Status: job.get_status(). Result: job.result (after completion). Refresh: job.refresh(). Fetch: from rq.job import Job; Job.fetch(job_id, connection=redis). Worker CLI: rq worker — processes default queue. Multiple: rq worker emails high default. Retry: q.enqueue(fn, retry=Retry(max=3, interval=[10, 30, 60])). Timeout: q.enqueue(fn, job_timeout=120). TTL: q.enqueue(fn, result_ttl=3600). At: from rq.job import Callback; q.enqueue_at(datetime(2024,4,1,9), fn). In: q.enqueue_in(timedelta(minutes=5), fn). Depends on: q.enqueue(fn2, depends_on=job1). On success: q.enqueue(fn, on_success=Callback(success_fn)). Failed: from rq import FailedJobRegistry; FailedJobRegistry(queue=q). job.exc_info — exception traceback. Cancel: job.cancel(). Delete: job.delete(). Scheduler: pip install rq-scheduler. from rqscheduler import Scheduler. scheduler.enqueue_in(timedelta(hours=1), fn). scheduler.cron("0 9 * * *", fn). Simple worker (sync): from rq import SimpleWorker. Claude Code generates RQ task queues, worker pools, job pipelines, and FastAPI background task systems.
CLAUDE.md for RQ
## RQ Stack
- Version: rq >= 1.16 | pip install rq
- Queue: from rq import Queue; q = Queue(connection=Redis())
- Enqueue: job = q.enqueue(fn, *args, job_timeout=120, result_ttl=3600)
- Retry: q.enqueue(fn, retry=Retry(max=3, interval=[10, 30, 60]))
- Worker: rq worker [queue_names] | SimpleWorker for in-process testing
- Monitor: job.get_status() | Job.fetch(id, conn) | FailedJobRegistry
RQ Background Task Pipeline
# app/tasks.py — RQ Queue, enqueue, retry, callbacks, job tracking, and FastAPI
from __future__ import annotations
import time
from datetime import timedelta
from typing import Any, Callable
from redis import Redis
from rq import Queue, Retry
from rq.job import Job, Callback
from rq.registry import FailedJobRegistry, StartedJobRegistry
from rq.worker import SimpleWorker
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection and queue factory
# ─────────────────────────────────────────────────────────────────────────────
def make_redis(
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: str | None = None,
decode_responses: bool = False,
) -> Redis:
"""Create a Redis connection for RQ."""
return Redis(host=host, port=port, db=db, password=password,
decode_responses=decode_responses)
def make_queue(
name: str = "default",
redis: Redis | None = None,
host: str = "localhost",
port: int = 6379,
is_async: bool = True,
) -> Queue:
"""
Create an RQ Queue.
is_async=False: execute jobs synchronously (for testing without Redis).
Example:
q = make_queue("emails")
q_low = make_queue("low")
q_test = make_queue(is_async=False) # no Redis needed
"""
conn = redis or make_redis(host=host, port=port)
return Queue(name, connection=conn, is_async=is_async)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Enqueue helpers
# ─────────────────────────────────────────────────────────────────────────────
def enqueue(
q: Queue,
fn: Callable,
*args: Any,
timeout: int = 180,
result_ttl: int = 3600,
failure_ttl: int = 86400,
retry_max: int = 0,
retry_intervals: list[int] | None = None,
description: str | None = None,
on_success: Callback | None = None,
on_failure: Callback | None = None,
**kwargs: Any,
) -> Job:
"""
Enqueue a function for background execution.
timeout: max seconds the job may run (default 3 min).
result_ttl: seconds to keep result in Redis after completion.
failure_ttl: seconds to keep failed job info.
retry_max: number of automatic retries on failure.
retry_intervals: backoff seconds between retries (default [10, 30, 60]).
Example:
job = enqueue(emails_q, send_welcome_email, user_id=42, timeout=30)
print(job.id)
"""
kw: dict = {
"job_timeout": timeout,
"result_ttl": result_ttl,
"failure_ttl": failure_ttl,
}
if retry_max > 0:
kw["retry"] = Retry(max=retry_max, interval=retry_intervals or [10, 30, 60])
if description:
kw["job_description"] = description
if on_success:
kw["on_success"] = on_success
if on_failure:
kw["on_failure"] = on_failure
return q.enqueue(fn, *args, **{**kw, **kwargs})
def enqueue_in(
q: Queue,
delay: timedelta,
fn: Callable,
*args: Any,
**kwargs: Any,
) -> Job:
"""
Enqueue a job to run after a delay.
Requires: rq >= 1.10 for built-in enqueue_in support.
"""
return q.enqueue_in(delay, fn, *args, **kwargs)
def enqueue_chain(
q: Queue,
steps: list[tuple[Callable, tuple, dict]],
**enqueue_kwargs: Any,
) -> list[Job]:
"""
Enqueue a chain of dependent jobs (step N depends on step N-1).
steps: [(fn, args, kwargs), ...]
Returns list of Job objects.
Example:
jobs = enqueue_chain(q, [
(extract_data, (url,), {}),
(transform_data, (), {}),
(load_data, (db_url,), {}),
])
"""
jobs: list[Job] = []
for i, (fn, args, kw) in enumerate(steps):
depends_on = jobs[-1] if jobs else None
job_kw = dict(enqueue_kwargs)
if depends_on:
job_kw["depends_on"] = depends_on
job = q.enqueue(fn, *args, **{**job_kw, **kw})
jobs.append(job)
return jobs
# ─────────────────────────────────────────────────────────────────────────────
# 3. Job monitoring
# ─────────────────────────────────────────────────────────────────────────────
def get_job(job_id: str, redis: Redis) -> Job | None:
"""Fetch a job by ID. Returns None if not found."""
try:
return Job.fetch(job_id, connection=redis)
except Exception:
return None
def job_status(job_id: str, redis: Redis) -> dict[str, Any]:
"""
Return a status dict for a job.
status: "queued" | "started" | "finished" | "failed" | "stopped" | "canceled"
"""
job = get_job(job_id, redis)
if job is None:
return {"id": job_id, "status": "not_found"}
job.refresh()
return {
"id": job.id,
"status": job.get_status().value,
"result": job.result,
"description": job.description,
"enqueued_at": str(job.enqueued_at),
"started_at": str(job.started_at),
"ended_at": str(job.ended_at),
"exc_info": job.exc_info,
}
def wait_for_job(
job: Job,
poll_interval: float = 0.5,
timeout: float = 30.0,
) -> Any:
"""
Poll until a job finishes, then return its result.
Raises TimeoutError if it doesn't complete within timeout seconds.
Raises RuntimeError if the job failed.
"""
start = time.monotonic()
while True:
job.refresh()
status = job.get_status().value
if status == "finished":
return job.result
if status == "failed":
raise RuntimeError(f"Job {job.id} failed: {job.exc_info}")
if time.monotonic() - start > timeout:
raise TimeoutError(f"Job {job.id} timed out after {timeout}s")
time.sleep(poll_interval)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Registry helpers
# ─────────────────────────────────────────────────────────────────────────────
def failed_jobs(q: Queue, count: int = 10) -> list[dict]:
"""Return info on the most recent failed jobs."""
registry = FailedJobRegistry(queue=q)
conn = q.connection
result = []
for job_id in registry.get_job_ids()[:count]:
job = get_job(job_id, conn)
if job:
result.append({
"id": job.id,
"func": job.func_name,
"exc_info": str(job.exc_info)[:200],
})
return result
def requeue_failed(q: Queue, max_jobs: int = 50) -> int:
"""Re-enqueue all failed jobs. Returns count re-queued."""
registry = FailedJobRegistry(queue=q)
count = 0
for job_id in registry.get_job_ids()[:max_jobs]:
try:
registry.requeue(job_id)
count += 1
except Exception:
pass
return count
# ─────────────────────────────────────────────────────────────────────────────
# 5. Example tasks (define in separate module for worker imports)
# ─────────────────────────────────────────────────────────────────────────────
def send_email(to: str, subject: str, body: str) -> dict:
"""Example task: simulate sending an email."""
time.sleep(0.1) # simulate network call
return {"status": "sent", "to": to, "subject": subject}
def process_image(image_url: str, size: tuple[int, int] = (128, 128)) -> dict:
"""Example task: simulate image processing."""
time.sleep(0.2)
return {"url": image_url, "size": size, "status": "processed"}
def generate_report(report_id: int, format: str = "pdf") -> dict:
"""Example task: simulate report generation."""
time.sleep(0.5)
return {"report_id": report_id, "format": format, "url": f"/reports/{report_id}.{format}"}
# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = '''
from fastapi import FastAPI, BackgroundTasks, Depends
from rq import Queue
from redis import Redis
from app.tasks import enqueue, job_status, send_email, generate_report
app = FastAPI()
_redis = Redis()
_queue = Queue("emails", connection=_redis)
_report_queue = Queue("reports", connection=_redis)
def get_queue() -> Queue:
return _queue
@app.post("/send-email/")
def trigger_email(to: str, subject: str, body: str, q: Queue = Depends(get_queue)):
job = enqueue(q, send_email, to, subject, body, timeout=30)
return {"job_id": job.id, "status": "queued"}
@app.get("/jobs/{job_id}")
def check_job(job_id: str):
return job_status(job_id, _redis)
@app.post("/reports/")
def create_report(report_id: int, format: str = "pdf"):
job = enqueue(_report_queue, generate_report, report_id, format=format, timeout=120)
return {"job_id": job.id}
'''
# ─────────────────────────────────────────────────────────────────────────────
# Demo (sync mode — no Redis needed)
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== RQ in sync mode (no Redis) ===")
q = make_queue(is_async=False) # jobs run immediately in-process
job = enqueue(q, send_email, "[email protected]", "Welcome!", "Hello Alice")
print(f" Job {job.id}: {job.get_status()}")
print(f" Result: {job.result}")
job2 = enqueue(q, generate_report, 42, format="pdf", timeout=60)
print(f" Report job {job2.id}: {job2.result}")
print("\n=== Job chain ===")
jobs = enqueue_chain(q, [
(send_email, ("[email protected]", "Step 1", "body"), {}),
(generate_report, (99,), {"format": "xlsx"}),
])
print(f" Chain jobs: {[j.id for j in jobs]}")
for j in jobs:
print(f" {j.id}: {j.result}")
For the Celery alternative — Celery supports multiple brokers (Redis, RabbitMQ, SQS), task routing, canvas primitives (chord, chain, group), and periodic tasks via Celery Beat — it is the enterprise choice for complex task graphs; RQ is simpler (Redis-only, pure Python, zero configuration) and is the right choice for most web apps where you want background jobs without Celery’s complexity and configuration surface area. For the dramatiq alternative — dramatiq uses actor-based message passing with retry/rate-limiting middleware and supports RabbitMQ and Redis; RQ’s API is more Pythonic (q.enqueue(fn, arg)) and comes with a built-in web dashboard (rq-dashboard), making it faster to get started while still handling retries, job chaining, and failure registries. The Claude Skills 360 bundle includes RQ skill sets covering make_redis()/make_queue() factory, enqueue() with timeout/retry/callbacks, enqueue_in() delayed execution, enqueue_chain() dependent pipelines, job_status()/wait_for_job() monitoring, failed_jobs()/requeue_failed() registry helpers, send_email/process_image/generate_report example tasks, and FastAPI Depends(get_queue) integration. Start with the free tier to try Redis Queue background task code generation.