Huey is a lightweight Python task queue. pip install huey. Redis backend: from huey import RedisHuey; huey = RedisHuey("myapp", host="localhost"). SQLite: from huey.contrib.mini import MiniHuey; huey = MiniHuey(). SQLite persistent: from huey import SqliteHuey; huey = SqliteHuey("myapp.db"). Task: @huey.task() def send_email(to, subject): .... Enqueue: send_email("[email protected]", "Hello") — returns AsyncResult. Delayed: send_email.schedule(args=("[email protected]","Hi"), delay=60) — runs in 60s. Result: result = send_email(...). result.get(blocking=True, timeout=10). result.is_ready(). result.get(preserve=True) — keep in store. Periodic: from huey import crontab. @huey.periodic_task(crontab(minute="0", hour="8")). crontab(minute="*/5") — every 5 min. crontab(day_of_week="1-5") — weekdays. Revoke: result.revoke(). send_email.revoke_by_id(task_id). send_email.restore(task_id). Retry: @huey.task(retries=3, retry_delay=10). raise RetryTask — explicit retry. Pipeline: add.s(1, 2).then(multiply, 3) — chain results. Pre/post hooks: @huey.pre_execute() def before(task): log(task.name). Immediate mode: huey.immediate = True — execute inline for tests. Start consumer: huey_consumer.py myapp.huey -w 4 -k process. -k thread for thread workers. -k greenlet for gevent. Claude Code generates Huey task definitions, periodic schedules, and consumer configuration.
CLAUDE.md for Huey
## Huey Stack
- Version: huey >= 2.5 | pip install "huey[redis]" or huey[sqlite]
- Backend: RedisHuey("app", host="localhost") | SqliteHuey("jobs.db")
- Task: @huey.task() def fn(*args): ... → fn(*args) returns AsyncResult
- Periodic: @huey.periodic_task(crontab(minute="*/15")) — no args
- Result: result.get(blocking=True, timeout=10) | result.is_ready()
- Retry: @huey.task(retries=3, retry_delay=30) | raise RetryTask in body
- Test: huey.immediate = True — runs tasks synchronously inline
Huey Task Queue Pipeline
# app/tasks.py — Huey task definitions, periodic jobs, and result handling
from __future__ import annotations
import logging
import os
import time
from datetime import datetime, timezone
from typing import Any
from huey import RedisHuey, SqliteHuey, crontab
from huey.exceptions import RetryTask
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Huey instance — choose backend from environment
# ─────────────────────────────────────────────────────────────────────────────
def _make_huey():
backend = os.environ.get("HUEY_BACKEND", "sqlite")
if backend == "redis":
return RedisHuey(
"myapp",
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", "6379")),
db=int(os.environ.get("REDIS_DB", "0")),
results=True,
store_none=False,
)
# Default: SQLite — good for single-machine deployments
return SqliteHuey(
os.environ.get("HUEY_DB", "huey_tasks.db"),
results=True,
)
huey = _make_huey()
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic async tasks
# ─────────────────────────────────────────────────────────────────────────────
@huey.task()
def send_email(to: str, subject: str, body: str = "") -> dict:
"""
Enqueue: result = send_email("[email protected]", "Welcome")
Result: result.get(blocking=True, timeout=30)
"""
log.info("send_email", extra={"to": to, "subject": subject})
time.sleep(0.1) # simulate SMTP call
return {"sent": True, "to": to, "timestamp": datetime.now(timezone.utc).isoformat()}
@huey.task()
def resize_image(image_id: int, width: int, height: int) -> dict:
"""CPU-bound task — runs in a worker process."""
log.info("resize_image", extra={"id": image_id, "size": f"{width}x{height}"})
time.sleep(0.2) # simulate image processing
return {"image_id": image_id, "new_path": f"/processed/{image_id}_{width}x{height}.jpg"}
@huey.task()
def export_report(user_id: int, format: str = "csv") -> str:
"""Long-running task that returns a file path."""
time.sleep(0.5)
path = f"/tmp/reports/user_{user_id}_{format}_{int(time.time())}.{format}"
return path
# ─────────────────────────────────────────────────────────────────────────────
# 2. Retrying tasks — transient failures
# ─────────────────────────────────────────────────────────────────────────────
@huey.task(retries=3, retry_delay=10)
def call_external_api(endpoint: str, payload: dict) -> dict:
"""
Automatically retried up to 3 times with 10s delay on any exception.
Raise RetryTask explicitly for controlled retry (e.g., rate limit 429).
"""
log.info("call_external_api", extra={"endpoint": endpoint})
# Simulate occasional 429 rate limit
import random
if random.random() < 0.3:
log.warning("rate_limited", extra={"endpoint": endpoint})
raise RetryTask # explicit retry — does not count against retries limit
time.sleep(0.05)
return {"endpoint": endpoint, "status": 200, "data": payload}
@huey.task(retries=5, retry_delay=30)
def sync_inventory(product_id: int) -> dict:
"""
Retry with exponential back-off via retry_delay (fixed in Huey).
For exponential back-off, raise RetryTask with a calculated delay:
raise RetryTask(eta=datetime.now() + timedelta(seconds=2**attempt))
"""
log.info("sync_inventory", extra={"product_id": product_id})
time.sleep(0.02)
return {"product_id": product_id, "stock": 100, "synced": True}
# ─────────────────────────────────────────────────────────────────────────────
# 3. Periodic tasks — cron schedule
# ─────────────────────────────────────────────────────────────────────────────
@huey.periodic_task(crontab(minute="0", hour="8", day_of_week="1-5"))
def send_daily_digest() -> None:
"""Runs Mon–Fri at 08:00. Periodic tasks take no arguments."""
log.info("send_daily_digest")
print(f"[{datetime.now().isoformat()}] Sending daily digest")
@huey.periodic_task(crontab(minute="*/15"))
def cleanup_temp_files() -> None:
"""Runs every 15 minutes."""
log.info("cleanup_temp_files")
# os.path.glob("/tmp/reports/*.tmp") and delete old files
@huey.periodic_task(crontab(minute="0", hour="*/4"))
def sync_all_inventory() -> None:
"""Runs every 4 hours — fan out to per-product tasks."""
product_ids = [1, 2, 3, 4, 5] # in production: query DB
for pid in product_ids:
sync_inventory(pid) # enqueue individual tasks
@huey.periodic_task(crontab(minute="0", hour="0", day="1"))
def generate_monthly_report() -> None:
"""First day of each month at midnight."""
log.info("generate_monthly_report")
export_report(user_id=0, format="csv")
# ─────────────────────────────────────────────────────────────────────────────
# 4. Delayed / scheduled execution
# ─────────────────────────────────────────────────────────────────────────────
def schedule_welcome_email(user_id: int, email: str, delay_seconds: int = 300) -> Any:
"""
.schedule(args=..., delay=N) enqueues the task to run after N seconds.
Returns an AsyncResult — can still .revoke() before it fires.
"""
return send_email.schedule(
args=(email, "Welcome to the platform!"),
kwargs={"body": f"Hi, your user ID is {user_id}."},
delay=delay_seconds,
)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Pre/post execute hooks — cross-cutting middleware
# ─────────────────────────────────────────────────────────────────────────────
@huey.pre_execute()
def log_task_start(task) -> None:
"""Runs before every task — useful for tracing, auth checks."""
log.debug("task_start", extra={"task": task.name, "id": task.id})
@huey.post_execute()
def log_task_end(task, task_value, exc) -> None:
"""Runs after every task — task_value is the return value, exc is any exception."""
if exc:
log.error("task_failed", extra={"task": task.name, "error": str(exc)})
else:
log.debug("task_done", extra={"task": task.name})
# ─────────────────────────────────────────────────────────────────────────────
# 6. Task pipeline — chaining
# ─────────────────────────────────────────────────────────────────────────────
@huey.task()
def validate_upload(file_path: str) -> str:
"""Step 1: validate uploaded file, return path."""
time.sleep(0.05)
return file_path
@huey.task()
def process_upload(file_path: str) -> str:
"""Step 2: process the file, return output path."""
time.sleep(0.1)
return file_path.replace("uploads", "processed")
@huey.task()
def notify_upload_done(output_path: str) -> dict:
"""Step 3: notify user that processing is complete."""
return {"done": True, "path": output_path}
def chain_upload_pipeline(file_path: str):
"""
Pipeline: each task's return value is passed as the first arg of the next.
Executed in sequence by the consumer — not in parallel.
"""
return (
validate_upload.s(file_path)
.then(process_upload)
.then(notify_upload_done)
)
# ─────────────────────────────────────────────────────────────────────────────
# 7. Immediate mode — synchronous execution for tests
# ─────────────────────────────────────────────────────────────────────────────
def run_tests_demo() -> None:
"""
huey.immediate = True causes all tasks to execute synchronously
in the calling thread — no consumer process needed for tests.
"""
huey.immediate = True
try:
result = send_email("[email protected]", "Test")
# In immediate mode .get() returns the value directly
value = result()
print(f"Immediate result: {value}")
img_result = resize_image(42, 800, 600)
print(f"Resize result: {img_result()}")
finally:
huey.immediate = False
# ─────────────────────────────────────────────────────────────────────────────
# 8. FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────
try:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class EmailRequest(BaseModel):
to: str
subject: str
body: str = ""
class ImageRequest(BaseModel):
image_id: int
width: int
height: int
@app.post("/tasks/email")
async def enqueue_email(req: EmailRequest) -> dict:
result = send_email(req.to, req.subject, req.body)
return {"task_id": result.id, "status": "queued"}
@app.post("/tasks/resize")
async def enqueue_resize(req: ImageRequest) -> dict:
result = resize_image(req.image_id, req.width, req.height)
return {"task_id": result.id, "status": "queued"}
@app.get("/tasks/{task_id}")
async def get_task_result(task_id: str) -> dict:
result = huey.result(task_id)
if result is None:
return {"task_id": task_id, "status": "pending"}
return {"task_id": task_id, "status": "done", "result": result}
@app.delete("/tasks/{task_id}")
async def revoke_task(task_id: str) -> dict:
huey.revoke_by_id(task_id)
return {"task_id": task_id, "status": "revoked"}
except ImportError:
app = None # type: ignore[assignment]
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Immediate mode demo ===")
run_tests_demo()
print("\n=== Enqueue tasks (requires consumer) ===")
r1 = send_email("[email protected]", "Hello", "World")
r2 = resize_image(1, 1920, 1080)
print(f"Enqueued: email={r1.id} resize={r2.id}")
print("\n=== Scheduled task ===")
r3 = schedule_welcome_email(99, "[email protected]", delay_seconds=30)
print(f"Scheduled in 30s: {r3.id}")
r3.revoke()
print("Revoked scheduled task.")
For the Celery alternative — Celery requires a broker (Redis or RabbitMQ), a result backend, a CELERY_ settings module, at least three running processes (broker, worker, beat), and ~500ms startup time per worker, while Huey runs with a single huey_consumer.py myapp.tasks.huey command against Redis or SQLite, zero configuration files, and a huey.immediate = True flag that makes all tasks run synchronously in tests without any mocking. For the rq (Redis Queue) alternative — rq requires Redis exclusively and surfaces a separate rqscheduler process for cron jobs, while Huey’s @huey.periodic_task(crontab(...)) decorator bakes the schedule directly into the task definition and the single consumer process handles both on-demand and periodic tasks without a separate scheduler daemon — and Huey’s SQLite backend means zero external dependencies for development and single-machine deployments. The Claude Skills 360 bundle includes Huey skill sets covering RedisHuey and SqliteHuey backend configuration, @huey.task with retries and retry_delay, RetryTask for controlled retry logic, @huey.periodic_task with crontab expressions, task.schedule for delayed execution, AsyncResult.get blocking result retrieval, revoke and restore for task cancellation, pre_execute and post_execute hooks, pipeline chaining with .then(), huey.immediate mode for test isolation, and FastAPI /tasks endpoint integration. Start with the free tier to try lightweight task queue code generation.