Claude Code for Celery: Distributed Task Queues and Background Jobs — Claude Skills 360 Blog
Blog / Backend / Claude Code for Celery: Distributed Task Queues and Background Jobs
Backend

Claude Code for Celery: Distributed Task Queues and Background Jobs

Published: October 29, 2026
Read time: 8 min read
By: Claude Skills 360

Celery is the standard for background task processing in Python: send emails, process images, call external APIs, and run scheduled jobs — all outside the web request/response cycle. It connects to a broker (Redis or RabbitMQ), distributes tasks across workers, retries on failure, and routes different task types to different queues. Claude Code writes Celery task definitions, retry policies, routing configurations, and the monitoring setup that makes distributed queues observable in production.

CLAUDE.md for Celery Projects

## Task Queue Stack
- Celery 5.x with Redis broker (RedisJSON for production result backend)
- Task priorities: critical (payment, webhooks) | default (email) | low (export, report)
- Always set task_soft_time_limit and task_time_limit — prevent zombie tasks
- Retries: exponential backoff with countdown=2**retry_count; max_retries=5
- Idempotency: every task must be safe to run multiple times
- Chord/chain: use for multi-step workflows; understand error propagation
- Beat scheduler: django-celery-beat for database-backed schedules

Celery App Configuration

# celery_app/config.py
from celery import Celery
from kombu import Queue, Exchange

app = Celery('myapp')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/1',
    
    # Serialization
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    
    # Time limits — prevent zombie tasks
    task_soft_time_limit=300,   # Raises SoftTimeLimitExceeded after 5 min
    task_time_limit=360,         # Hard kill after 6 min
    
    # Acknowledgment — only ack after task completes (at-least-once delivery)
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    
    # Result expiry
    result_expires=86400,  # 24 hours
    
    # Rate limiting
    worker_prefetch_multiplier=1,  # Fair distribution across workers
    
    # Queue routing
    task_queues=[
        Queue('critical', Exchange('critical'), routing_key='critical', priority=10),
        Queue('default',  Exchange('default'),  routing_key='default',  priority=5),
        Queue('low',      Exchange('low'),      routing_key='low',      priority=1),
    ],
    task_default_queue='default',
    task_default_exchange='default',
    task_default_routing_key='default',
    
    # Route specific tasks to queues
    task_routes={
        'tasks.payments.*': {'queue': 'critical'},
        'tasks.reports.*': {'queue': 'low'},
        'tasks.exports.*': {'queue': 'low'},
    },
)

Task Definitions

# tasks/email_tasks.py
from celery_app.config import app
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded

logger = get_task_logger(__name__)

@app.task(
    bind=True,
    name='tasks.email.send_order_confirmation',
    max_retries=5,
    default_retry_delay=60,
    soft_time_limit=30,
    acks_late=True,
)
def send_order_confirmation(self, order_id: str, user_email: str) -> dict:
    """
    Idempotent: safe to call multiple times — email provider deduplicates by order_id.
    """
    logger.info(f"Sending order confirmation for {order_id} to {user_email}")
    
    try:
        result = email_service.send_template(
            to=user_email,
            template='order_confirmation',
            context={'order_id': order_id},
            idempotency_key=f'order_confirm_{order_id}',
        )
        return {'ok': True, 'message_id': result.id}
        
    except email_service.RateLimitError as exc:
        # Exponential backoff on rate limit
        countdown = 2 ** self.request.retries * 60  # 60s, 120s, 240s...
        raise self.retry(exc=exc, countdown=countdown)
    
    except email_service.TemporaryError as exc:
        raise self.retry(exc=exc, countdown=30)
    
    except email_service.PermanentError as exc:
        # Don't retry on permanent errors (bad address, etc.)
        logger.error(f"Permanent email failure for {order_id}: {exc}")
        return {'ok': False, 'error': str(exc)}
    
    except SoftTimeLimitExceeded:
        logger.warning(f"Email task for {order_id} approaching time limit — aborting")
        raise

@app.task(
    bind=True,
    name='tasks.payments.process_refund',
    queue='critical',
    max_retries=3,
)
def process_refund(self, charge_id: str, amount_cents: int) -> dict:
    try:
        refund = stripe.Refund.create(charge=charge_id, amount=amount_cents)
        return {'ok': True, 'refund_id': refund.id}
    except stripe.RateLimitError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Task Chains and Chords

# tasks/workflows.py — multi-step workflows
from celery import chain, chord, group
from .email_tasks import send_order_confirmation
from .inventory_tasks import reserve_inventory, release_inventory
from .fulfillment_tasks import create_shipment

def process_order_workflow(order_id: str, user_email: str, items: list):
    """
    Sequential steps with error handling.
    chain: step1 → step2 → step3 (each gets previous result)
    """
    workflow = chain(
        reserve_inventory.si(order_id, items),
        create_shipment.s(order_id),             # .s() passes previous result
        send_order_confirmation.si(order_id, user_email),
    )
    
    return workflow.apply_async()

def generate_reports_workflow(date: str, report_ids: list[str]):
    """
    Parallel tasks with callback when all complete.
    chord: [task1, task2, task3] → callback(all_results)
    """
    generate_tasks = group(
        generate_report.s(report_id, date)
        for report_id in report_ids
    )
    
    return chord(generate_tasks)(combine_and_email_reports.s(date))

Celery Beat: Scheduled Tasks

# tasks/scheduled.py — periodic tasks
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Every day at 2am UTC: generate daily reports
    'daily-reports': {
        'task': 'tasks.reports.generate_daily',
        'schedule': crontab(hour=2, minute=0),
        'args': (),
    },
    
    # Every 5 minutes: process pending webhooks
    'process-webhooks': {
        'task': 'tasks.webhooks.process_pending',
        'schedule': 300,  # seconds
    },
    
    # Every Monday at 9am: weekly digest email
    'weekly-digest': {
        'task': 'tasks.email.send_weekly_digest',
        'schedule': crontab(hour=9, minute=0, day_of_week=1),
    },
}

@app.task(name='tasks.reports.generate_daily')
def generate_daily_reports():
    """Enqueue per-account reports in parallel."""
    accounts = db.get_active_accounts()
    
    group(
        generate_account_report.s(account.id)
        for account in accounts
    ).apply_async()

Worker Startup Command

# Start worker consuming all queues (dev)
celery -A celery_app.config worker --loglevel=info

# Production: separate workers per queue priority
celery -A celery_app.config worker -Q critical --concurrency=4 --loglevel=warning &
celery -A celery_app.config worker -Q default  --concurrency=8 --loglevel=warning &
celery -A celery_app.config worker -Q low      --concurrency=2 --loglevel=warning &

# Celery Beat scheduler
celery -A celery_app.config beat --loglevel=info

# Flower monitoring UI (http://localhost:5555)
celery -A celery_app.config flower --port=5555

For the Redis configuration that backs Celery’s broker and result store, the Redis guide covers connection pooling and Redis Streams as an alternative broker. For the Django integration that triggers Celery tasks from views, the Django REST Framework guide covers the signal-based task dispatch pattern. The Claude Skills 360 bundle includes Celery skill sets covering task definitions, retry strategies, chord/chain workflows, and Beat scheduling. Start with the free tier to try Celery task generation.

Keep Reading

Backend

Claude Code for Bun: Fast JavaScript Runtime and Toolkit

Build with Bun and Claude Code — Bun.serve for HTTP servers, Bun.file for fast file I/O, Bun.$ for shell commands, Bun.sql for SQLite and PostgreSQL, Bun.build for bundling, bun:test for testing, Bun.hash for hashing, bun.lock for deterministic installs, bun run for package.json scripts, hot reloading with --hot, bun init for project scaffolding, and compatibility with Node.js modules.

6 min read Jun 13, 2027
Backend

Claude Code for Express.js Advanced: Patterns for Production APIs

Advanced Express.js patterns with Claude Code — typed request handlers with RequestHandler generics, async error handling middleware, Zod validation middleware factory, rate limiting with express-rate-limit and Redis store, helmet security middleware, compression, dependency injection with tsyringe, file upload with multer and S3, pagination utilities, JWT middleware, and structured logging with pino.

6 min read Jun 8, 2027
Backend

Claude Code for KeystoneJS: Node.js CMS and App Framework

Build full-stack apps with KeystoneJS and Claude Code — config with lists, fields.text and fields.relationship for schema definition, access control with isAuthenticated and isAdmin functions, hooks with beforeOperation and afterOperation, GraphQL API auto-generation from schema, AdminUI for content management, session with statelessSessions, Prisma adapter for database, file storage with images and files fields, and custom REST endpoints.

6 min read Jun 7, 2027

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free