arq is an async Redis job queue for Python asyncio. pip install arq. Define job function: async def send_email(ctx, to, subject): .... Worker settings: class WorkerSettings: functions = [send_email]; redis_settings = RedisSettings(host="localhost"). Run worker: arq tasks.WorkerSettings. Enqueue from app: redis = await create_pool(RedisSettings()); await redis.enqueue_job("send_email", "[email protected]", "Hi"). Job result: job = await redis.enqueue_job("compute", x); result = await job.result(timeout=30). Timeout: class WorkerSettings: job_timeout = 300 — 5 min default. Per-job: async def slow_task(ctx): ... ; slow_task.job_timeout = 600. Max retries: async def fetch(ctx, url): ... ; fetch.max_tries = 5. Retry: raise Retry(defer=timedelta(seconds=10)). on_startup: async def startup(ctx): ctx["db"] = await asyncpg.connect(...). on_shutdown: async def shutdown(ctx): await ctx["db"].close(). class WorkerSettings: on_startup=startup; on_shutdown=shutdown. Cron: cron(send_digest, hour=8, minute=0) — daily at 8am. class WorkerSettings: cron_jobs=[cron(...)]. Delayed: await redis.enqueue_job("task", defer_by=timedelta(minutes=5)). Scheduled: await redis.enqueue_job("task", run_at=datetime(2024,12,25,9,0,0)). Health: class WorkerSettings: health_check_interval=30. Queue inspection: await redis.queued_jobs(). await job.status() — deferred/queued/in_progress/complete/not_found. await job.abort(). Max jobs: class WorkerSettings: max_jobs=10. Keep result: keep_result=timedelta(hours=1). Unique: job = await redis.enqueue_job("task", _job_id="unique-key") — deduplicate. Claude Code generates arq WorkerSettings, FastAPI integration patterns, and scheduled job configurations.
CLAUDE.md for arq
## arq Stack
- Version: arq >= 0.26 | pip install arq
- Worker: class WorkerSettings: functions=[fn,...] redis_settings=RedisSettings()
- Job fn: async def task(ctx, *args) → ctx["db"] is injected via on_startup
- Enqueue: await redis.enqueue_job("fn_name", *args, _job_id=, _defer_by=)
- Retry: raise Retry(defer=timedelta(seconds=N)) inside job function
- Cron: cron(fn, hour=H, minute=M) in WorkerSettings.cron_jobs
- Start: arq tasks.WorkerSettings | arq tasks.WorkerSettings --burst
arq Async Job Queue Pipeline
# tasks.py — arq job definitions and WorkerSettings
from __future__ import annotations
import asyncio
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any
from arq import cron
from arq.connections import ArqRedis, RedisSettings, create_pool
from arq.worker import Retry
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Startup / shutdown — dependency injection via ctx dict
# ─────────────────────────────────────────────────────────────────────────────
async def startup(ctx: dict) -> None:
"""
Runs once when the worker starts.
Inject long-lived connections into ctx — they're available in every job.
"""
import aiohttp
import asyncpg
ctx["http"] = aiohttp.ClientSession()
ctx["db_pool"] = await asyncpg.create_pool(
dsn=os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost/mydb"),
min_size=2,
max_size=10,
)
logger.info("Worker startup: HTTP session and DB pool ready")
async def shutdown(ctx: dict) -> None:
"""Called once when the worker shuts down gracefully."""
await ctx["http"].close()
await ctx["db_pool"].close()
logger.info("Worker shutdown: connections closed")
# ─────────────────────────────────────────────────────────────────────────────
# Job functions
# ─────────────────────────────────────────────────────────────────────────────
async def send_email(
ctx: dict,
to: str,
subject: str,
body: str,
from_addr: str = "[email protected]",
) -> dict:
"""
Send transactional email via HTTP API.
Retries up to 5 times with exponential backoff.
"""
session: Any = ctx["http"]
logger.info("Sending email to=%s subject=%r", to, subject)
try:
async with session.post(
"https://api.email.com/send",
json={"from": from_addr, "to": to, "subject": subject, "body": body},
headers={"Authorization": f"Bearer {os.environ.get('EMAIL_API_KEY','')}"},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 429:
# Rate limited — retry after the Retry-After header
retry_after = int(resp.headers.get("Retry-After", "30"))
raise Retry(defer=timedelta(seconds=retry_after))
resp.raise_for_status()
result = await resp.json()
except Exception as exc: # noqa: BLE001
logger.warning("Email failed to=%s error=%s", to, exc)
raise
logger.info("Email sent to=%s message_id=%s", to, result.get("id"))
return result
# Attach per-function settings as attributes
send_email.max_tries = 5 # type: ignore[attr-defined]
send_email.job_timeout = 30 # type: ignore[attr-defined]
async def process_image(ctx: dict, image_id: int, ops: list[str]) -> dict:
"""
Heavy image processing — 10-minute timeout, 3 attempts.
Reads and writes to DB for status tracking.
"""
pool = ctx["db_pool"]
logger.info("Processing image id=%d ops=%s", image_id, ops)
async with pool.acquire() as conn:
await conn.execute(
"UPDATE images SET status='processing' WHERE id=$1", image_id
)
await asyncio.sleep(0.5) # simulate work
output_url = f"/processed/{image_id}.webp"
async with pool.acquire() as conn:
await conn.execute(
"UPDATE images SET status='done', output_url=$1 WHERE id=$2",
output_url, image_id,
)
return {"image_id": image_id, "output_url": output_url, "ops": ops}
process_image.max_tries = 3 # type: ignore[attr-defined]
process_image.job_timeout = 600 # type: ignore[attr-defined]
async def sync_external_data(ctx: dict, source: str, table: str) -> int:
"""Sync data from external API — unique job per (source, table) to avoid duplication."""
session = ctx["http"]
pool = ctx["db_pool"]
logger.info("Syncing source=%s table=%s", source, table)
async with session.get(f"https://data.example.com/{source}") as resp:
resp.raise_for_status()
records = await resp.json()
async with pool.acquire() as conn:
await conn.executemany(
f"INSERT INTO {table} (data, synced_at) VALUES ($1, NOW()) "
"ON CONFLICT (id) DO UPDATE SET data=$1, synced_at=NOW()",
[(record,) for record in records],
)
logger.info("Synced %d records source=%s", len(records), source)
return len(records)
sync_external_data.job_timeout = 120 # type: ignore[attr-defined]
# ─────────────────────────────────────────────────────────────────────────────
# Cron jobs — run on a schedule
# ─────────────────────────────────────────────────────────────────────────────
async def send_daily_digest(ctx: dict) -> None:
"""Send daily summary emails. Scheduled at 08:00 UTC."""
pool = ctx["db_pool"]
async with pool.acquire() as conn:
users = await conn.fetch("SELECT id, email FROM users WHERE digest_enabled=TRUE")
logger.info("Sending daily digest to %d users", len(users))
for user in users:
await asyncio.sleep(0.01) # rate limit
logger.debug("Digest sent to user_id=%d", user["id"])
logger.info("Daily digest complete users=%d", len(users))
async def cleanup_old_jobs(ctx: dict) -> None:
"""Remove completed job results older than 7 days. Runs nightly at 02:00 UTC."""
pool = ctx["db_pool"]
async with pool.acquire() as conn:
deleted = await conn.fetchval(
"DELETE FROM arq_jobs WHERE finished_at < NOW() - INTERVAL '7 days' RETURNING count(*)"
)
logger.info("Cleaned up %s old job records", deleted)
async def health_ping(ctx: dict) -> str:
"""Lightweight health check — proves worker is alive. Runs every minute."""
return f"ok at {datetime.now(timezone.utc).isoformat()}"
# ─────────────────────────────────────────────────────────────────────────────
# WorkerSettings
# ─────────────────────────────────────────────────────────────────────────────
class WorkerSettings:
"""
arq worker configuration.
Start: arq tasks.WorkerSettings
Burst: arq tasks.WorkerSettings --burst (process queue then exit — good for CI)
"""
redis_settings = RedisSettings(
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", "6379")),
password=os.environ.get("REDIS_PASSWORD"),
database=int(os.environ.get("REDIS_DB", "0")),
)
# Job functions available to this worker
functions = [
send_email,
process_image,
sync_external_data,
health_ping,
]
# Scheduled jobs
cron_jobs = [
cron(send_daily_digest, hour=8, minute=0), # 08:00 UTC daily
cron(cleanup_old_jobs, hour=2, minute=0), # 02:00 UTC daily
cron(health_ping, minute=1), # every minute
]
# Lifecycle hooks
on_startup = startup
on_shutdown = shutdown
# Concurrency — max simultaneous jobs
max_jobs = 20
# How long before a job is considered "stuck" and reclaimed (seconds)
job_timeout = 300
# Result retention — how long completed job results stay in Redis
keep_result = timedelta(hours=24)
# Minimum time before a retried job runs again (seconds)
retry_jobs: bool = True
# Health check — prove worker is alive
health_check_interval = 30
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI integration — enqueueing from web request handlers
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = """
# app/main.py — FastAPI + arq integration
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from arq.connections import create_pool, RedisSettings, ArqRedis
_redis_pool: ArqRedis | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _redis_pool
_redis_pool = await create_pool(RedisSettings())
yield
await _redis_pool.close()
app = FastAPI(lifespan=lifespan)
async def get_redis() -> ArqRedis:
return _redis_pool
@app.post("/users/{user_id}/welcome")
async def send_welcome(user_id: int, email: str, redis: ArqRedis = Depends(get_redis)):
job = await redis.enqueue_job(
"send_email",
email,
"Welcome!",
"Thanks for signing up.",
)
return {"job_id": job.job_id}
@app.post("/images/{image_id}/process")
async def trigger_processing(image_id: int, ops: list[str],
redis: ArqRedis = Depends(get_redis)):
job = await redis.enqueue_job(
"process_image",
image_id,
ops,
_job_id=f"image-{image_id}", # deduplicate: same image not processed twice
_defer_by=timedelta(seconds=2), # slight delay to batch multiple requests
)
return {"job_id": job.job_id, "status": await job.status()}
"""
if __name__ == "__main__":
import aiohttp # noqa: F401 — needed for type reference in startup
async def demo():
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job("send_email", "[email protected]",
"arq demo", "Hello from arq!")
print(f"Enqueued job_id={job.job_id}")
status = await job.status()
print(f"Status: {status}")
await redis.close()
asyncio.run(demo())
print("Start worker with: arq tasks.WorkerSettings")
For the Celery with eventlet/gevent alternative — Celery’s async support requires choosing an eventlet or gevent concurrency pool and monkey-patching the stdlib, which can break asyncio-native libraries (asyncpg, aiohttp), while arq uses Python’s native asyncio event loop: a single worker thread runs all jobs as coroutines sharing one event loop, on_startup injects a single asyncpg connection pool across all jobs, and arq tasks.WorkerSettings --burst exits after draining the queue making it trivially usable in serverless or CI environments. For the Redis Queue (RQ) alternative — RQ is synchronous and requires threading for concurrency which adds GIL contention when jobs perform I/O, while arq’s max_jobs=20 runs 20 concurrent I/O-bound jobs in one process with negligible memory overhead, raise Retry(defer=timedelta(seconds=retry_after)) implements dynamic retry delays based on Retry-After headers, and cron(fn, hour=8, minute=0) schedules jobs from the same WorkerSettings without a separate scheduler process. The Claude Skills 360 bundle includes arq skill sets covering WorkerSettings definition, async job functions with ctx injection, on_startup connection pooling, Retry exception for dynamic backoff, cron scheduling, FastAPI lifespan integration, enqueue_job with _job_id deduplication and _defer_by, job.result() for return values, burst mode for CI, and health_check_interval monitoring. Start with the free tier to try async job queue code generation.