Dramatiq is a fast background task library. pip install dramatiq[rabbitmq] or pip install dramatiq[redis]. Define actor: import dramatiq; @dramatiq.actor def send_email(to, subject): .... Enqueue: send_email.send("[email protected]", "Hi"). Options: send_email.send_with_options(args=["[email protected]","Hi"], delay=5000) — 5s delay. Retry: @dramatiq.actor(max_retries=3, min_backoff=1000). Broker: from dramatiq.brokers.rabbitmq import RabbitmqBroker; dramatiq.set_broker(RabbitmqBroker(url="amqp://...")). Redis: from dramatiq.brokers.redis import RedisBroker; dramatiq.set_broker(RedisBroker(url="redis://localhost:6379")). Middleware: broker.add_middleware(Retries(max_retries=3, min_backoff=1000, max_backoff=60000)). broker.add_middleware(TimeLimit(time_limit=300000)) — 5 min. broker.add_middleware(AgeLimit(max_age=3600000)) — discard after 1h. broker.add_middleware(Results(backend=RedisBackend())). Queue: @dramatiq.actor(queue_name="emails"). Priority: @dramatiq.actor(priority=0) — lower = higher priority. Pipeline: (fetch_data.message(url) | process_data.message() | store_results.message()).run(). Group: dramatiq.group([send_email.message(u) for u in users]).run(). Rate limit: with rate_limiter(backend, "key", limit=10): actor.send(...). Worker: python -m dramatiq tasks --processes 4 --threads 8. StubBroker for tests: stub_broker = StubBroker(); dramatiq.set_broker(stub_broker); stub_broker.emit_after("enqueue", ...). @dramatiq.actor(store_results=True) def compute(..) -> int: return 42. future = compute.send(x), result = future.get_result(timeout=5000). Claude Code generates Dramatiq actor definitions, pipeline compositions, and broker configuration for RabbitMQ and Redis.
CLAUDE.md for Dramatiq
## Dramatiq Stack
- Version: dramatiq >= 1.17 | pip install "dramatiq[rabbitmq]" or "[redis]"
- Actor: @dramatiq.actor(max_retries=3, queue_name="default", priority=0)
- Enqueue: actor.send(*args) | actor.send_with_options(delay=ms, priority=n)
- Broker: RabbitmqBroker(url=) | RedisBroker(url=) | StubBroker (tests)
- Middleware: Retries | TimeLimit | AgeLimit | Results | CurrentMessage
- Compose: pipeline([a.message(), b.message()]).run() | group([...]).run()
- Worker: python -m dramatiq tasks --processes N --threads M
Dramatiq Background Task Pipeline
# tasks.py — Dramatiq actor definitions
from __future__ import annotations
import json
import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.middleware import (
AgeLimit,
CurrentMessage,
Retries,
TimeLimit,
)
from dramatiq.rate_limits import ConcurrentRateLimiter
from dramatiq.rate_limits.backends import RedisBackend
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Broker configuration
# ─────────────────────────────────────────────────────────────────────────────
def configure_broker(redis_url: str = "redis://localhost:6379") -> RedisBroker:
"""
Configure and register the Redis broker with standard middleware.
Call once at application startup before importing/defining actors.
"""
broker = RedisBroker(url=redis_url)
broker.add_middleware(
Retries(
max_retries=5,
min_backoff=1_000, # 1 second
max_backoff=120_000, # 2 minutes
)
)
broker.add_middleware(
TimeLimit(time_limit=300_000) # 5-minute hard limit per message
)
broker.add_middleware(
AgeLimit(max_age=3_600_000) # discard messages older than 1 hour
)
broker.add_middleware(CurrentMessage()) # access current message inside actor
dramatiq.set_broker(broker)
return broker
# ─────────────────────────────────────────────────────────────────────────────
# Simple actors
# ─────────────────────────────────────────────────────────────────────────────
@dramatiq.actor(queue_name="emails", max_retries=3, time_limit=30_000)
def send_email(to: str, subject: str, body: str, from_addr: str | None = None) -> None:
"""
Send a transactional email. Retries up to 3 times with exponential backoff.
Queue: emails (separate from default to prevent email delays from blocking other work)
"""
sender = from_addr or os.environ.get("EMAIL_FROM", "[email protected]")
logger.info("Sending email to=%s subject=%r", to, subject)
# In production: call your email provider API here
# e.g. requests.post("https://api.sendgrid.com/...", json={...})
time.sleep(0.1) # simulate API call
logger.info("Email sent to=%s message_id=sim-%d", to, int(time.time()))
@dramatiq.actor(queue_name="notifications", max_retries=5)
def send_push_notification(user_id: int, title: str, message: str) -> None:
"""Push notification via FCM/APNs. Retried up to 5 times."""
logger.info("Push notification user_id=%d title=%r", user_id, title)
time.sleep(0.05)
@dramatiq.actor(queue_name="default", max_retries=3, time_limit=600_000)
def process_image(image_id: int, operations: list[str]) -> dict[str, Any]:
"""
Image processing pipeline. Up to 10 minutes. Returns result dict.
Use Results middleware to retrieve return value from caller.
"""
logger.info("Processing image id=%d ops=%s", image_id, operations)
# Simulate processing
result = {"image_id": image_id, "processed": operations, "url": f"/images/{image_id}.webp"}
logger.info("Image processed id=%d", image_id)
return result
# ─────────────────────────────────────────────────────────────────────────────
# Actors with rate limiting
# ─────────────────────────────────────────────────────────────────────────────
@dramatiq.actor(queue_name="webhooks", max_retries=10, min_backoff=2_000)
def deliver_webhook(endpoint: str, payload: dict, attempt: int = 0) -> None:
"""
Deliver a webhook with per-endpoint rate limiting.
Max 10 concurrent requests to the same endpoint.
"""
import requests
try:
resp = requests.post(
endpoint,
json=payload,
headers={"Content-Type": "application/json", "X-Attempt": str(attempt)},
timeout=10,
)
resp.raise_for_status()
logger.info("Webhook delivered endpoint=%s status=%d", endpoint, resp.status_code)
except requests.HTTPError as exc:
if exc.response is not None and exc.response.status_code < 500:
# 4xx: don't retry (client-side error)
logger.warning("Webhook rejected endpoint=%s status=%d", endpoint,
exc.response.status_code)
return
raise # 5xx: let Dramatiq retry
# ─────────────────────────────────────────────────────────────────────────────
# Pipeline — chained actors
# ─────────────────────────────────────────────────────────────────────────────
@dramatiq.actor(queue_name="etl")
def extract_data(source_url: str, job_id: str) -> dict:
"""Step 1: fetch raw data from source."""
logger.info("Extracting data job_id=%s from=%s", job_id, source_url)
time.sleep(0.2)
return {"job_id": job_id, "rows": 1000, "source": source_url}
@dramatiq.actor(queue_name="etl")
def transform_data(extract_result: dict) -> dict:
"""Step 2: transform raw data. Receives output of extract_data."""
job_id = extract_result["job_id"]
logger.info("Transforming data job_id=%s rows=%d", job_id, extract_result["rows"])
time.sleep(0.3)
return {**extract_result, "transformed": True, "clean_rows": extract_result["rows"] - 10}
@dramatiq.actor(queue_name="etl")
def load_data(transform_result: dict, target_table: str) -> None:
"""Step 3: load into data warehouse."""
logger.info("Loading data job_id=%s rows=%d target=%s",
transform_result["job_id"], transform_result["clean_rows"], target_table)
time.sleep(0.2)
logger.info("Load complete job_id=%s", transform_result["job_id"])
def run_etl_pipeline(source_url: str, target_table: str, job_id: str | None = None) -> None:
"""Enqueue an ETL pipeline: extract → transform → load."""
if job_id is None:
import uuid
job_id = str(uuid.uuid4())
pipe = dramatiq.pipeline([
extract_data.message(source_url, job_id),
transform_data.message(),
load_data.message(target_table),
])
pipe.run()
logger.info("ETL pipeline enqueued job_id=%s", job_id)
# ─────────────────────────────────────────────────────────────────────────────
# Group — fan-out parallel actors
# ─────────────────────────────────────────────────────────────────────────────
@dramatiq.actor(queue_name="reports")
def generate_report(report_type: str, user_id: int, date_range: list[str]) -> None:
"""Generate one report section (runs in parallel via group)."""
logger.info("Generating %s report for user_id=%d range=%s",
report_type, user_id, date_range)
time.sleep(0.5)
logger.info("Report %s done user_id=%d", report_type, user_id)
def send_user_digest(user_id: int, date_range: list[str]) -> None:
"""Fan-out: generate all report sections in parallel, then send email."""
report_types = ["sales", "inventory", "traffic", "conversions"]
# All reports start simultaneously
g = dramatiq.group([
generate_report.message(rt, user_id, date_range)
for rt in report_types
])
# Chain with email after all reports finish
pipe = dramatiq.pipeline([g, send_email.message_with_options(
args=[f"user{user_id}@example.com", "Your Weekly Digest", "Reports ready."],
)])
pipe.run()
# ─────────────────────────────────────────────────────────────────────────────
# Delayed / scheduled tasks
# ─────────────────────────────────────────────────────────────────────────────
def schedule_reminder(user_id: int, message: str, delay_ms: int) -> None:
"""Send a push notification after a delay."""
send_push_notification.send_with_options(
args=[user_id, "Reminder", message],
delay=delay_ms,
)
# ─────────────────────────────────────────────────────────────────────────────
# Test helper — StubBroker for unit tests
# ─────────────────────────────────────────────────────────────────────────────
def get_stub_broker():
"""
Use StubBroker in tests — actors run synchronously without a real broker.
Usage:
broker = get_stub_broker()
with dramatiq.set_broker(broker):
send_email.send("[email protected]", "Hello", "Body")
broker.join("emails") # wait for all messages to process
"""
from dramatiq.brokers.stub import StubBroker
stub = StubBroker()
stub.add_middleware(Retries(max_retries=0)) # no retries in tests
return stub
if __name__ == "__main__":
# Configure broker before any actors run
configure_broker(os.environ.get("REDIS_URL", "redis://localhost:6379"))
# Enqueue a single email
send_email.send("[email protected]", "Dramatiq Demo", "Hello from background worker!")
print("Email task enqueued.")
print("Start worker with: python -m dramatiq tasks --processes 2 --threads 4")
print("Monitor with: python -m dramatiq tasks --use-spawning --verbose")
For the Celery alternative — Celery has a larger ecosystem (Flower monitoring, beat scheduler, canvas) but requires choosing between prefork/eventlet/gevent concurrency models and configuring CELERY_TASK_SERIALIZER, task_always_eager for tests, and autoretry_for per-task, while Dramatiq’s Retries middleware applies globally to all actors with min_backoff/max_backoff in milliseconds, StubBroker runs actors synchronously in tests without any configuration, and pipeline/group compose workflows with the same message-passing model as single actors. For the Redis Queue (RQ) alternative — RQ uses Python functions directly as jobs which prevents retry configuration per-function and provides no pipeline composition, while Dramatiq’s @dramatiq.actor(max_retries=5, time_limit=300_000) decorates any function with per-actor retry and timeout settings, dramatiq.pipeline chains actors by passing the return value of each to the args of the next, and dramatiq.group fans out to N workers simultaneously without custom aggregation code. The Claude Skills 360 bundle includes Dramatiq skill sets covering actor definition and enqueuing, RedisBroker and RabbitmqBroker setup, Retries/TimeLimit/AgeLimit middleware, pipeline chaining, group fan-out, send_with_options for delay and priority, Results middleware for return values, StubBroker unit testing, worker CLI startup, and rate limiting with ConcurrentRateLimiter. Start with the free tier to try background task code generation.