Celery is the standard for background task processing in Python: send emails, process images, call external APIs, and run scheduled jobs — all outside the web request/response cycle. It connects to a broker (Redis or RabbitMQ), distributes tasks across workers, retries on failure, and routes different task types to different queues. Claude Code writes Celery task definitions, retry policies, routing configurations, and the monitoring setup that makes distributed queues observable in production.
CLAUDE.md for Celery Projects
## Task Queue Stack
- Celery 5.x with Redis broker (RedisJSON for production result backend)
- Task priorities: critical (payment, webhooks) | default (email) | low (export, report)
- Always set task_soft_time_limit and task_time_limit — prevent zombie tasks
- Retries: exponential backoff with countdown=2**retry_count; max_retries=5
- Idempotency: every task must be safe to run multiple times
- Chord/chain: use for multi-step workflows; understand error propagation
- Beat scheduler: django-celery-beat for database-backed schedules
Celery App Configuration
# celery_app/config.py
from celery import Celery
from kombu import Queue, Exchange
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
# Serialization
task_serializer='json',
result_serializer='json',
accept_content=['json'],
# Time limits — prevent zombie tasks
task_soft_time_limit=300, # Raises SoftTimeLimitExceeded after 5 min
task_time_limit=360, # Hard kill after 6 min
# Acknowledgment — only ack after task completes (at-least-once delivery)
task_acks_late=True,
task_reject_on_worker_lost=True,
# Result expiry
result_expires=86400, # 24 hours
# Rate limiting
worker_prefetch_multiplier=1, # Fair distribution across workers
# Queue routing
task_queues=[
Queue('critical', Exchange('critical'), routing_key='critical', priority=10),
Queue('default', Exchange('default'), routing_key='default', priority=5),
Queue('low', Exchange('low'), routing_key='low', priority=1),
],
task_default_queue='default',
task_default_exchange='default',
task_default_routing_key='default',
# Route specific tasks to queues
task_routes={
'tasks.payments.*': {'queue': 'critical'},
'tasks.reports.*': {'queue': 'low'},
'tasks.exports.*': {'queue': 'low'},
},
)
Task Definitions
# tasks/email_tasks.py
from celery_app.config import app
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded
logger = get_task_logger(__name__)
@app.task(
bind=True,
name='tasks.email.send_order_confirmation',
max_retries=5,
default_retry_delay=60,
soft_time_limit=30,
acks_late=True,
)
def send_order_confirmation(self, order_id: str, user_email: str) -> dict:
"""
Idempotent: safe to call multiple times — email provider deduplicates by order_id.
"""
logger.info(f"Sending order confirmation for {order_id} to {user_email}")
try:
result = email_service.send_template(
to=user_email,
template='order_confirmation',
context={'order_id': order_id},
idempotency_key=f'order_confirm_{order_id}',
)
return {'ok': True, 'message_id': result.id}
except email_service.RateLimitError as exc:
# Exponential backoff on rate limit
countdown = 2 ** self.request.retries * 60 # 60s, 120s, 240s...
raise self.retry(exc=exc, countdown=countdown)
except email_service.TemporaryError as exc:
raise self.retry(exc=exc, countdown=30)
except email_service.PermanentError as exc:
# Don't retry on permanent errors (bad address, etc.)
logger.error(f"Permanent email failure for {order_id}: {exc}")
return {'ok': False, 'error': str(exc)}
except SoftTimeLimitExceeded:
logger.warning(f"Email task for {order_id} approaching time limit — aborting")
raise
@app.task(
bind=True,
name='tasks.payments.process_refund',
queue='critical',
max_retries=3,
)
def process_refund(self, charge_id: str, amount_cents: int) -> dict:
try:
refund = stripe.Refund.create(charge=charge_id, amount=amount_cents)
return {'ok': True, 'refund_id': refund.id}
except stripe.RateLimitError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Task Chains and Chords
# tasks/workflows.py — multi-step workflows
from celery import chain, chord, group
from .email_tasks import send_order_confirmation
from .inventory_tasks import reserve_inventory, release_inventory
from .fulfillment_tasks import create_shipment
def process_order_workflow(order_id: str, user_email: str, items: list):
"""
Sequential steps with error handling.
chain: step1 → step2 → step3 (each gets previous result)
"""
workflow = chain(
reserve_inventory.si(order_id, items),
create_shipment.s(order_id), # .s() passes previous result
send_order_confirmation.si(order_id, user_email),
)
return workflow.apply_async()
def generate_reports_workflow(date: str, report_ids: list[str]):
"""
Parallel tasks with callback when all complete.
chord: [task1, task2, task3] → callback(all_results)
"""
generate_tasks = group(
generate_report.s(report_id, date)
for report_id in report_ids
)
return chord(generate_tasks)(combine_and_email_reports.s(date))
Celery Beat: Scheduled Tasks
# tasks/scheduled.py — periodic tasks
from celery.schedules import crontab
app.conf.beat_schedule = {
# Every day at 2am UTC: generate daily reports
'daily-reports': {
'task': 'tasks.reports.generate_daily',
'schedule': crontab(hour=2, minute=0),
'args': (),
},
# Every 5 minutes: process pending webhooks
'process-webhooks': {
'task': 'tasks.webhooks.process_pending',
'schedule': 300, # seconds
},
# Every Monday at 9am: weekly digest email
'weekly-digest': {
'task': 'tasks.email.send_weekly_digest',
'schedule': crontab(hour=9, minute=0, day_of_week=1),
},
}
@app.task(name='tasks.reports.generate_daily')
def generate_daily_reports():
"""Enqueue per-account reports in parallel."""
accounts = db.get_active_accounts()
group(
generate_account_report.s(account.id)
for account in accounts
).apply_async()
Worker Startup Command
# Start worker consuming all queues (dev)
celery -A celery_app.config worker --loglevel=info
# Production: separate workers per queue priority
celery -A celery_app.config worker -Q critical --concurrency=4 --loglevel=warning &
celery -A celery_app.config worker -Q default --concurrency=8 --loglevel=warning &
celery -A celery_app.config worker -Q low --concurrency=2 --loglevel=warning &
# Celery Beat scheduler
celery -A celery_app.config beat --loglevel=info
# Flower monitoring UI (http://localhost:5555)
celery -A celery_app.config flower --port=5555
For the Redis configuration that backs Celery’s broker and result store, the Redis guide covers connection pooling and Redis Streams as an alternative broker. For the Django integration that triggers Celery tasks from views, the Django REST Framework guide covers the signal-based task dispatch pattern. The Claude Skills 360 bundle includes Celery skill sets covering task definitions, retry strategies, chord/chain workflows, and Beat scheduling. Start with the free tier to try Celery task generation.