Claude Code for arq: Async Redis Job Queues — Claude Skills 360 Blog
Blog / AI / Claude Code for arq: Async Redis Job Queues
AI

Claude Code for arq: Async Redis Job Queues

Published: December 23, 2027
Read time: 5 min read
By: Claude Skills 360

arq is an async Redis job queue for Python asyncio. pip install arq. Define job function: async def send_email(ctx, to, subject): .... Worker settings: class WorkerSettings: functions = [send_email]; redis_settings = RedisSettings(host="localhost"). Run worker: arq tasks.WorkerSettings. Enqueue from app: redis = await create_pool(RedisSettings()); await redis.enqueue_job("send_email", "[email protected]", "Hi"). Job result: job = await redis.enqueue_job("compute", x); result = await job.result(timeout=30). Timeout: class WorkerSettings: job_timeout = 300 — 5 min default. Per-job: async def slow_task(ctx): ... ; slow_task.job_timeout = 600. Max retries: async def fetch(ctx, url): ... ; fetch.max_tries = 5. Retry: raise Retry(defer=timedelta(seconds=10)). on_startup: async def startup(ctx): ctx["db"] = await asyncpg.connect(...). on_shutdown: async def shutdown(ctx): await ctx["db"].close(). class WorkerSettings: on_startup=startup; on_shutdown=shutdown. Cron: cron(send_digest, hour=8, minute=0) — daily at 8am. class WorkerSettings: cron_jobs=[cron(...)]. Delayed: await redis.enqueue_job("task", defer_by=timedelta(minutes=5)). Scheduled: await redis.enqueue_job("task", run_at=datetime(2024,12,25,9,0,0)). Health: class WorkerSettings: health_check_interval=30. Queue inspection: await redis.queued_jobs(). await job.status() — deferred/queued/in_progress/complete/not_found. await job.abort(). Max jobs: class WorkerSettings: max_jobs=10. Keep result: keep_result=timedelta(hours=1). Unique: job = await redis.enqueue_job("task", _job_id="unique-key") — deduplicate. Claude Code generates arq WorkerSettings, FastAPI integration patterns, and scheduled job configurations.

CLAUDE.md for arq

## arq Stack
- Version: arq >= 0.26 | pip install arq
- Worker: class WorkerSettings: functions=[fn,...] redis_settings=RedisSettings()
- Job fn: async def task(ctx, *args) → ctx["db"] is injected via on_startup
- Enqueue: await redis.enqueue_job("fn_name", *args, _job_id=, _defer_by=)
- Retry: raise Retry(defer=timedelta(seconds=N)) inside job function
- Cron: cron(fn, hour=H, minute=M) in WorkerSettings.cron_jobs
- Start: arq tasks.WorkerSettings | arq tasks.WorkerSettings --burst

arq Async Job Queue Pipeline

# tasks.py — arq job definitions and WorkerSettings
from __future__ import annotations

import asyncio
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any

from arq import cron
from arq.connections import ArqRedis, RedisSettings, create_pool
from arq.worker import Retry

logger = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# Startup / shutdown — dependency injection via ctx dict
# ─────────────────────────────────────────────────────────────────────────────

async def startup(ctx: dict) -> None:
    """
    Runs once when the worker starts.
    Inject long-lived connections into ctx — they're available in every job.
    """
    import aiohttp
    import asyncpg

    ctx["http"]    = aiohttp.ClientSession()
    ctx["db_pool"] = await asyncpg.create_pool(
        dsn=os.environ.get("DATABASE_URL", "postgresql://user:pass@localhost/mydb"),
        min_size=2,
        max_size=10,
    )
    logger.info("Worker startup: HTTP session and DB pool ready")


async def shutdown(ctx: dict) -> None:
    """Called once when the worker shuts down gracefully."""
    await ctx["http"].close()
    await ctx["db_pool"].close()
    logger.info("Worker shutdown: connections closed")


# ─────────────────────────────────────────────────────────────────────────────
# Job functions
# ─────────────────────────────────────────────────────────────────────────────

async def send_email(
    ctx: dict,
    to: str,
    subject: str,
    body: str,
    from_addr: str = "[email protected]",
) -> dict:
    """
    Send transactional email via HTTP API.
    Retries up to 5 times with exponential backoff.
    """
    session: Any = ctx["http"]
    logger.info("Sending email to=%s subject=%r", to, subject)

    try:
        async with session.post(
            "https://api.email.com/send",
            json={"from": from_addr, "to": to, "subject": subject, "body": body},
            headers={"Authorization": f"Bearer {os.environ.get('EMAIL_API_KEY','')}"},
            timeout=aiohttp.ClientTimeout(total=15),
        ) as resp:
            if resp.status == 429:
                # Rate limited — retry after the Retry-After header
                retry_after = int(resp.headers.get("Retry-After", "30"))
                raise Retry(defer=timedelta(seconds=retry_after))
            resp.raise_for_status()
            result = await resp.json()

    except Exception as exc:  # noqa: BLE001
        logger.warning("Email failed to=%s error=%s", to, exc)
        raise

    logger.info("Email sent to=%s message_id=%s", to, result.get("id"))
    return result


# Attach per-function settings as attributes
send_email.max_tries = 5  # type: ignore[attr-defined]
send_email.job_timeout = 30  # type: ignore[attr-defined]


async def process_image(ctx: dict, image_id: int, ops: list[str]) -> dict:
    """
    Heavy image processing — 10-minute timeout, 3 attempts.
    Reads and writes to DB for status tracking.
    """
    pool = ctx["db_pool"]
    logger.info("Processing image id=%d ops=%s", image_id, ops)

    async with pool.acquire() as conn:
        await conn.execute(
            "UPDATE images SET status='processing' WHERE id=$1", image_id
        )

    await asyncio.sleep(0.5)   # simulate work

    output_url = f"/processed/{image_id}.webp"

    async with pool.acquire() as conn:
        await conn.execute(
            "UPDATE images SET status='done', output_url=$1 WHERE id=$2",
            output_url, image_id,
        )

    return {"image_id": image_id, "output_url": output_url, "ops": ops}


process_image.max_tries   = 3    # type: ignore[attr-defined]
process_image.job_timeout = 600  # type: ignore[attr-defined]


async def sync_external_data(ctx: dict, source: str, table: str) -> int:
    """Sync data from external API — unique job per (source, table) to avoid duplication."""
    session = ctx["http"]
    pool    = ctx["db_pool"]

    logger.info("Syncing source=%s table=%s", source, table)

    async with session.get(f"https://data.example.com/{source}") as resp:
        resp.raise_for_status()
        records = await resp.json()

    async with pool.acquire() as conn:
        await conn.executemany(
            f"INSERT INTO {table} (data, synced_at) VALUES ($1, NOW()) "
            "ON CONFLICT (id) DO UPDATE SET data=$1, synced_at=NOW()",
            [(record,) for record in records],
        )

    logger.info("Synced %d records source=%s", len(records), source)
    return len(records)


sync_external_data.job_timeout = 120  # type: ignore[attr-defined]


# ─────────────────────────────────────────────────────────────────────────────
# Cron jobs — run on a schedule
# ─────────────────────────────────────────────────────────────────────────────

async def send_daily_digest(ctx: dict) -> None:
    """Send daily summary emails. Scheduled at 08:00 UTC."""
    pool = ctx["db_pool"]

    async with pool.acquire() as conn:
        users = await conn.fetch("SELECT id, email FROM users WHERE digest_enabled=TRUE")

    logger.info("Sending daily digest to %d users", len(users))
    for user in users:
        await asyncio.sleep(0.01)   # rate limit
        logger.debug("Digest sent to user_id=%d", user["id"])

    logger.info("Daily digest complete users=%d", len(users))


async def cleanup_old_jobs(ctx: dict) -> None:
    """Remove completed job results older than 7 days. Runs nightly at 02:00 UTC."""
    pool = ctx["db_pool"]
    async with pool.acquire() as conn:
        deleted = await conn.fetchval(
            "DELETE FROM arq_jobs WHERE finished_at < NOW() - INTERVAL '7 days' RETURNING count(*)"
        )
    logger.info("Cleaned up %s old job records", deleted)


async def health_ping(ctx: dict) -> str:
    """Lightweight health check — proves worker is alive. Runs every minute."""
    return f"ok at {datetime.now(timezone.utc).isoformat()}"


# ─────────────────────────────────────────────────────────────────────────────
# WorkerSettings
# ─────────────────────────────────────────────────────────────────────────────

class WorkerSettings:
    """
    arq worker configuration.
    Start: arq tasks.WorkerSettings
    Burst: arq tasks.WorkerSettings --burst  (process queue then exit — good for CI)
    """

    redis_settings = RedisSettings(
        host=os.environ.get("REDIS_HOST", "localhost"),
        port=int(os.environ.get("REDIS_PORT", "6379")),
        password=os.environ.get("REDIS_PASSWORD"),
        database=int(os.environ.get("REDIS_DB", "0")),
    )

    # Job functions available to this worker
    functions = [
        send_email,
        process_image,
        sync_external_data,
        health_ping,
    ]

    # Scheduled jobs
    cron_jobs = [
        cron(send_daily_digest, hour=8,  minute=0),   # 08:00 UTC daily
        cron(cleanup_old_jobs,  hour=2,  minute=0),   # 02:00 UTC daily
        cron(health_ping,       minute=1),             # every minute
    ]

    # Lifecycle hooks
    on_startup  = startup
    on_shutdown = shutdown

    # Concurrency — max simultaneous jobs
    max_jobs = 20

    # How long before a job is considered "stuck" and reclaimed (seconds)
    job_timeout = 300

    # Result retention — how long completed job results stay in Redis
    keep_result = timedelta(hours=24)

    # Minimum time before a retried job runs again (seconds)
    retry_jobs: bool = True

    # Health check — prove worker is alive
    health_check_interval = 30


# ─────────────────────────────────────────────────────────────────────────────
# FastAPI integration — enqueueing from web request handlers
# ─────────────────────────────────────────────────────────────────────────────

FASTAPI_EXAMPLE = """
# app/main.py — FastAPI + arq integration
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from arq.connections import create_pool, RedisSettings, ArqRedis

_redis_pool: ArqRedis | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global _redis_pool
    _redis_pool = await create_pool(RedisSettings())
    yield
    await _redis_pool.close()

app = FastAPI(lifespan=lifespan)

async def get_redis() -> ArqRedis:
    return _redis_pool

@app.post("/users/{user_id}/welcome")
async def send_welcome(user_id: int, email: str, redis: ArqRedis = Depends(get_redis)):
    job = await redis.enqueue_job(
        "send_email",
        email,
        "Welcome!",
        "Thanks for signing up.",
    )
    return {"job_id": job.job_id}

@app.post("/images/{image_id}/process")
async def trigger_processing(image_id: int, ops: list[str],
                              redis: ArqRedis = Depends(get_redis)):
    job = await redis.enqueue_job(
        "process_image",
        image_id,
        ops,
        _job_id=f"image-{image_id}",   # deduplicate: same image not processed twice
        _defer_by=timedelta(seconds=2), # slight delay to batch multiple requests
    )
    return {"job_id": job.job_id, "status": await job.status()}
"""


if __name__ == "__main__":
    import aiohttp  # noqa: F401 — needed for type reference in startup

    async def demo():
        redis = await create_pool(RedisSettings())
        job = await redis.enqueue_job("send_email", "[email protected]",
                                      "arq demo", "Hello from arq!")
        print(f"Enqueued job_id={job.job_id}")
        status = await job.status()
        print(f"Status: {status}")
        await redis.close()

    asyncio.run(demo())
    print("Start worker with: arq tasks.WorkerSettings")

For the Celery with eventlet/gevent alternative — Celery’s async support requires choosing an eventlet or gevent concurrency pool and monkey-patching the stdlib, which can break asyncio-native libraries (asyncpg, aiohttp), while arq uses Python’s native asyncio event loop: a single worker thread runs all jobs as coroutines sharing one event loop, on_startup injects a single asyncpg connection pool across all jobs, and arq tasks.WorkerSettings --burst exits after draining the queue making it trivially usable in serverless or CI environments. For the Redis Queue (RQ) alternative — RQ is synchronous and requires threading for concurrency which adds GIL contention when jobs perform I/O, while arq’s max_jobs=20 runs 20 concurrent I/O-bound jobs in one process with negligible memory overhead, raise Retry(defer=timedelta(seconds=retry_after)) implements dynamic retry delays based on Retry-After headers, and cron(fn, hour=8, minute=0) schedules jobs from the same WorkerSettings without a separate scheduler process. The Claude Skills 360 bundle includes arq skill sets covering WorkerSettings definition, async job functions with ctx injection, on_startup connection pooling, Retry exception for dynamic backoff, cron scheduling, FastAPI lifespan integration, enqueue_job with _job_id deduplication and _defer_by, job.result() for return values, burst mode for CI, and health_check_interval monitoring. Start with the free tier to try async job queue code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free