APScheduler runs scheduled jobs in Python processes. pip install apscheduler. Scheduler: from apscheduler.schedulers.background import BackgroundScheduler. scheduler = BackgroundScheduler(); scheduler.start(); scheduler.shutdown(). Async: from apscheduler.schedulers.asyncio import AsyncIOScheduler. scheduler = AsyncIOScheduler(); scheduler.start(). Add job: scheduler.add_job(send_report, "cron", hour=8, minute=0). Second trigger type: scheduler.add_job(ping, "interval", seconds=30). One-shot: scheduler.add_job(cleanup, "date", run_date=datetime(2024,12,31)). Cron: from apscheduler.triggers.cron import CronTrigger. CronTrigger(hour=9, minute=0, day_of_week="mon-fri"). CronTrigger.from_crontab("0 9 * * 1-5"). Interval: from apscheduler.triggers.interval import IntervalTrigger. IntervalTrigger(minutes=15, start_date=datetime.now()). Job kwargs: scheduler.add_job(fn, "interval", seconds=60, id="my_job", name="My Job", replace_existing=True, coalesce=True, max_instances=1, misfire_grace_time=30). coalesce: True — if job missed multiple fires, run only once on recovery. max_instances: 1 — don’t overlap. Remove: scheduler.remove_job("my_job"). Pause/resume: scheduler.pause_job("my_job"). scheduler.resume_job("my_job"). List: scheduler.get_jobs(). Job stores — persist scheduled jobs across restarts: from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore. jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///jobs.db")}. scheduler = BackgroundScheduler(jobstores=jobstores). Redis: from apscheduler.jobstores.redis import RedisJobStore. Events: from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR. scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR). FastAPI: add scheduler to app lifespan. @asynccontextmanager async def lifespan(app): scheduler.start(); yield; scheduler.shutdown(). Claude Code generates APScheduler job definitions, job store configs, and FastAPI lifespan integration.
CLAUDE.md for APScheduler
## APScheduler Stack
- Version: apscheduler >= 3.10 | pip install "apscheduler[sqlalchemy,redis]"
- Sync: BackgroundScheduler — runs in a background thread
- Async: AsyncIOScheduler — runs in the asyncio event loop
- Cron: add_job(fn, "cron", hour=8) | CronTrigger.from_crontab("0 8 * * *")
- Rate: add_job(fn, "interval", seconds=30, coalesce=True, max_instances=1)
- Store: SQLAlchemyJobStore(url=...) — persist jobs across restarts
- Events: add_listener(fn, EVENT_JOB_ERROR) — alert on failures
APScheduler Scheduling Pipeline
# app/scheduler.py — APScheduler with cron, interval, jobstore, and FastAPI
from __future__ import annotations
import logging
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, AsyncGenerator
from apscheduler.events import (
EVENT_JOB_ERROR,
EVENT_JOB_EXECUTED,
EVENT_JOB_MISSED,
JobExecutionEvent,
)
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Jobs — plain functions called by the scheduler
# ─────────────────────────────────────────────────────────────────────────────
def send_daily_digest(recipient: str = "[email protected]") -> None:
"""Runs at 08:00 Mon–Fri."""
log.info("sending_daily_digest", extra={"recipient": recipient})
# ... send email ...
print(f"[{datetime.now().isoformat()}] Daily digest sent to {recipient}")
def cleanup_expired_sessions() -> None:
"""Runs every 15 minutes."""
log.info("cleanup_expired_sessions")
print(f"[{datetime.now().isoformat()}] Expired sessions cleaned up")
def health_ping() -> None:
"""Runs every 60 seconds — heartbeat for monitoring."""
print(f"[{datetime.now().isoformat()}] Ping")
def generate_monthly_report(month: int, year: int) -> None:
"""One-shot report generation job — scheduled dynamically."""
print(f"[{datetime.now().isoformat()}] Monthly report for {year}-{month:02d}")
async def sync_external_data(source: str = "partner-api") -> None:
"""Async job — works with AsyncIOScheduler."""
import asyncio
await asyncio.sleep(0) # simulated async I/O
print(f"[{datetime.now().isoformat()}] Synced data from {source}")
async def send_push_notifications(batch_size: int = 500) -> None:
"""Async job — runs every 5 minutes."""
import asyncio
await asyncio.sleep(0)
print(f"[{datetime.now().isoformat()}] Sent {batch_size} push notifications")
# ─────────────────────────────────────────────────────────────────────────────
# 1. BackgroundScheduler — for sync applications (Flask, CLI, etc.)
# ─────────────────────────────────────────────────────────────────────────────
def build_background_scheduler(db_url: str = "sqlite:///jobs.db") -> BackgroundScheduler:
"""
BackgroundScheduler starts a daemon thread — ideal for Flask and CLI apps.
Job state is persisted in SQLite — jobs survive application restart.
"""
jobstores = {
"default": SQLAlchemyJobStore(url=db_url),
"memory": MemoryJobStore(), # for ephemeral jobs
}
executors = {
"default": ThreadPoolExecutor(max_workers=10),
"process": ProcessPoolExecutor(max_workers=2),
}
job_defaults = {
"coalesce": True, # run once if multiple fires were missed
"max_instances": 1, # no overlapping executions
"misfire_grace_time": 60, # tolerate up to 60s startup delay
}
return BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone="UTC",
)
def register_background_jobs(scheduler: BackgroundScheduler) -> None:
"""Add all jobs to the scheduler."""
# Daily digest — every weekday at 08:00 UTC
scheduler.add_job(
send_daily_digest,
CronTrigger.from_crontab("0 8 * * 1-5", timezone="UTC"),
id="daily_digest",
name="Daily Team Digest",
kwargs={"recipient": "[email protected]"},
replace_existing=True,
jobstore="default",
)
# Session cleanup — every 15 minutes
scheduler.add_job(
cleanup_expired_sessions,
IntervalTrigger(minutes=15),
id="session_cleanup",
name="Session Cleanup",
replace_existing=True,
jobstore="memory", # ephemeral — recreated on restart
)
# Heartbeat — every 60 seconds
scheduler.add_job(
health_ping,
"interval",
seconds=60,
id="health_ping",
name="Health Ping",
replace_existing=True,
jobstore="memory",
)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Event listener — alert on failures
# ─────────────────────────────────────────────────────────────────────────────
def job_event_listener(event: JobExecutionEvent) -> None:
if event.exception:
log.error(
"scheduled_job_failed",
extra={
"job_id": event.job_id,
"exception": str(event.exception),
"traceback": str(event.traceback),
},
)
# In production: send alert to PagerDuty/Slack
elif event.code == EVENT_JOB_MISSED:
log.warning("scheduled_job_missed", extra={"job_id": event.job_id})
else:
log.debug("scheduled_job_executed", extra={"job_id": event.job_id})
# ─────────────────────────────────────────────────────────────────────────────
# 3. AsyncIOScheduler — for FastAPI / asyncio applications
# ─────────────────────────────────────────────────────────────────────────────
def build_async_scheduler() -> AsyncIOScheduler:
"""
AsyncIOScheduler runs inside the asyncio event loop.
Use this with FastAPI, Starlette, or any asyncio application.
"""
scheduler = AsyncIOScheduler(timezone="UTC")
# Async jobs
scheduler.add_job(
sync_external_data,
CronTrigger(hour="*/4", minute=0), # every 4 hours
id="sync_external",
name="External Data Sync",
kwargs={"source": "partner-api"},
max_instances=1,
coalesce=True,
replace_existing=True,
)
scheduler.add_job(
send_push_notifications,
IntervalTrigger(minutes=5),
id="push_notifications",
name="Push Notifications",
kwargs={"batch_size": 500},
max_instances=1,
coalesce=True,
replace_existing=True,
)
# Mix sync jobs — APScheduler runs them in a thread pool
scheduler.add_job(
health_ping,
"interval",
seconds=60,
id="health_ping",
name="Health Ping",
replace_existing=True,
)
scheduler.add_listener(
job_event_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED,
)
return scheduler
# ─────────────────────────────────────────────────────────────────────────────
# 4. FastAPI lifespan integration
# ─────────────────────────────────────────────────────────────────────────────
try:
from fastapi import FastAPI
from fastapi.responses import JSONResponse
_scheduler: AsyncIOScheduler | None = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Start scheduler on startup, shutdown on exit."""
global _scheduler
_scheduler = build_async_scheduler()
_scheduler.start()
yield
_scheduler.shutdown(wait=False)
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health() -> dict:
if _scheduler is None:
return {"status": "scheduler_not_started"}
jobs = [
{
"id": j.id,
"name": j.name,
"next_run": j.next_run_time.isoformat() if j.next_run_time else None,
}
for j in _scheduler.get_jobs()
]
return {"status": "ok", "scheduled_jobs": len(jobs), "jobs": jobs}
@app.post("/jobs/{job_id}/pause")
async def pause_job(job_id: str) -> dict:
if _scheduler:
_scheduler.pause_job(job_id)
return {"job_id": job_id, "status": "paused"}
@app.post("/jobs/{job_id}/resume")
async def resume_job(job_id: str) -> dict:
if _scheduler:
_scheduler.resume_job(job_id)
return {"job_id": job_id, "status": "resumed"}
@app.post("/jobs/{job_id}/trigger")
async def trigger_job(job_id: str) -> dict:
"""Manually fire a job immediately."""
if _scheduler:
job = _scheduler.get_job(job_id)
if job:
job.func(*job.args, **job.kwargs)
return {"job_id": job_id, "status": "triggered"}
return JSONResponse({"error": "job not found"}, status_code=404)
except ImportError:
app = None # type: ignore[assignment]
# ─────────────────────────────────────────────────────────────────────────────
# 5. Dynamic one-shot job — e.g., "send report at end of month"
# ─────────────────────────────────────────────────────────────────────────────
def schedule_one_shot(
scheduler: BackgroundScheduler | AsyncIOScheduler,
run_at: datetime,
job_fn,
job_id: str,
**kwargs: Any,
) -> str:
"""Schedule a job to run once at a specific datetime."""
scheduler.add_job(
job_fn,
"date",
run_date=run_at,
id=job_id,
kwargs=kwargs,
replace_existing=True,
)
return job_id
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import time
scheduler = build_background_scheduler(db_url="sqlite:///:memory:")
scheduler.add_listener(job_event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
register_background_jobs(scheduler)
# Add a fast-firing test job
scheduler.add_job(health_ping, "interval", seconds=2, id="test_ping")
scheduler.start()
print(f"Scheduler started. Jobs: {[j.id for j in scheduler.get_jobs()]}")
time.sleep(5) # let the test_ping fire twice
print(f"\nJob next_run times:")
for job in scheduler.get_jobs():
print(f" {job.id}: {job.next_run_time}")
scheduler.shutdown()
print("Scheduler stopped.")
For the celery beat alternative — Celery Beat requires a running Redis/RabbitMQ broker, a Celery worker process, and a separate celery beat process — three moving parts before a single cron job runs — while APScheduler’s BackgroundScheduler runs in the same Python process with no external dependencies, making it ideal for monoliths, CLI tools, and lower-volume scheduled tasks where broker infrastructure overhead is not justified. For the sched.scheduler alternative — Python’s stdlib sched.scheduler is a one-shot event queue that requires an explicit event loop call to process pending events and has no persistent job store, no cron syntax, and no missed-fire handling, while APScheduler’s CronTrigger.from_crontab("0 8 * * 1-5") handles daylight saving transitions, persists jobs to SQLite or Redis across restarts, and fires missed jobs with a configurable misfire_grace_time. The Claude Skills 360 bundle includes APScheduler skill sets covering BackgroundScheduler and AsyncIOScheduler setup, CronTrigger from_crontab and explicit fields, IntervalTrigger with coalesce/max_instances, DateTrigger for one-shot execution, SQLAlchemyJobStore and MemoryJobStore, add_listener for job error alerting, pause/resume/remove_job lifecycle, schedule_one_shot dynamic scheduling, FastAPI lifespan integration, and /jobs API endpoint management. Start with the free tier to try job scheduling code generation.