Claude Code for APScheduler: Python Job Scheduling — Claude Skills 360 Blog
Blog / AI / Claude Code for APScheduler: Python Job Scheduling
AI

Claude Code for APScheduler: Python Job Scheduling

Published: January 21, 2028
Read time: 5 min read
By: Claude Skills 360

APScheduler runs scheduled jobs in Python processes. pip install apscheduler. Scheduler: from apscheduler.schedulers.background import BackgroundScheduler. scheduler = BackgroundScheduler(); scheduler.start(); scheduler.shutdown(). Async: from apscheduler.schedulers.asyncio import AsyncIOScheduler. scheduler = AsyncIOScheduler(); scheduler.start(). Add job: scheduler.add_job(send_report, "cron", hour=8, minute=0). Second trigger type: scheduler.add_job(ping, "interval", seconds=30). One-shot: scheduler.add_job(cleanup, "date", run_date=datetime(2024,12,31)). Cron: from apscheduler.triggers.cron import CronTrigger. CronTrigger(hour=9, minute=0, day_of_week="mon-fri"). CronTrigger.from_crontab("0 9 * * 1-5"). Interval: from apscheduler.triggers.interval import IntervalTrigger. IntervalTrigger(minutes=15, start_date=datetime.now()). Job kwargs: scheduler.add_job(fn, "interval", seconds=60, id="my_job", name="My Job", replace_existing=True, coalesce=True, max_instances=1, misfire_grace_time=30). coalesce: True — if job missed multiple fires, run only once on recovery. max_instances: 1 — don’t overlap. Remove: scheduler.remove_job("my_job"). Pause/resume: scheduler.pause_job("my_job"). scheduler.resume_job("my_job"). List: scheduler.get_jobs(). Job stores — persist scheduled jobs across restarts: from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore. jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///jobs.db")}. scheduler = BackgroundScheduler(jobstores=jobstores). Redis: from apscheduler.jobstores.redis import RedisJobStore. Events: from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR. scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR). FastAPI: add scheduler to app lifespan. @asynccontextmanager async def lifespan(app): scheduler.start(); yield; scheduler.shutdown(). Claude Code generates APScheduler job definitions, job store configs, and FastAPI lifespan integration.

CLAUDE.md for APScheduler

## APScheduler Stack
- Version: apscheduler >= 3.10 | pip install "apscheduler[sqlalchemy,redis]"
- Sync:  BackgroundScheduler — runs in a background thread
- Async: AsyncIOScheduler — runs in the asyncio event loop
- Cron:  add_job(fn, "cron", hour=8) | CronTrigger.from_crontab("0 8 * * *")
- Rate:  add_job(fn, "interval", seconds=30, coalesce=True, max_instances=1)
- Store: SQLAlchemyJobStore(url=...) — persist jobs across restarts
- Events: add_listener(fn, EVENT_JOB_ERROR) — alert on failures

APScheduler Scheduling Pipeline

# app/scheduler.py — APScheduler with cron, interval, jobstore, and FastAPI
from __future__ import annotations

import logging
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, AsyncGenerator

from apscheduler.events import (
    EVENT_JOB_ERROR,
    EVENT_JOB_EXECUTED,
    EVENT_JOB_MISSED,
    JobExecutionEvent,
)
from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger

log = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# Jobs — plain functions called by the scheduler
# ─────────────────────────────────────────────────────────────────────────────

def send_daily_digest(recipient: str = "[email protected]") -> None:
    """Runs at 08:00 Mon–Fri."""
    log.info("sending_daily_digest", extra={"recipient": recipient})
    # ... send email ...
    print(f"[{datetime.now().isoformat()}] Daily digest sent to {recipient}")


def cleanup_expired_sessions() -> None:
    """Runs every 15 minutes."""
    log.info("cleanup_expired_sessions")
    print(f"[{datetime.now().isoformat()}] Expired sessions cleaned up")


def health_ping() -> None:
    """Runs every 60 seconds — heartbeat for monitoring."""
    print(f"[{datetime.now().isoformat()}] Ping")


def generate_monthly_report(month: int, year: int) -> None:
    """One-shot report generation job — scheduled dynamically."""
    print(f"[{datetime.now().isoformat()}] Monthly report for {year}-{month:02d}")


async def sync_external_data(source: str = "partner-api") -> None:
    """Async job — works with AsyncIOScheduler."""
    import asyncio
    await asyncio.sleep(0)   # simulated async I/O
    print(f"[{datetime.now().isoformat()}] Synced data from {source}")


async def send_push_notifications(batch_size: int = 500) -> None:
    """Async job — runs every 5 minutes."""
    import asyncio
    await asyncio.sleep(0)
    print(f"[{datetime.now().isoformat()}] Sent {batch_size} push notifications")


# ─────────────────────────────────────────────────────────────────────────────
# 1. BackgroundScheduler — for sync applications (Flask, CLI, etc.)
# ─────────────────────────────────────────────────────────────────────────────

def build_background_scheduler(db_url: str = "sqlite:///jobs.db") -> BackgroundScheduler:
    """
    BackgroundScheduler starts a daemon thread — ideal for Flask and CLI apps.
    Job state is persisted in SQLite — jobs survive application restart.
    """
    jobstores = {
        "default": SQLAlchemyJobStore(url=db_url),
        "memory":  MemoryJobStore(),       # for ephemeral jobs
    }
    executors = {
        "default": ThreadPoolExecutor(max_workers=10),
        "process": ProcessPoolExecutor(max_workers=2),
    }
    job_defaults = {
        "coalesce":           True,   # run once if multiple fires were missed
        "max_instances":      1,      # no overlapping executions
        "misfire_grace_time": 60,     # tolerate up to 60s startup delay
    }
    return BackgroundScheduler(
        jobstores=jobstores,
        executors=executors,
        job_defaults=job_defaults,
        timezone="UTC",
    )


def register_background_jobs(scheduler: BackgroundScheduler) -> None:
    """Add all jobs to the scheduler."""

    # Daily digest — every weekday at 08:00 UTC
    scheduler.add_job(
        send_daily_digest,
        CronTrigger.from_crontab("0 8 * * 1-5", timezone="UTC"),
        id="daily_digest",
        name="Daily Team Digest",
        kwargs={"recipient": "[email protected]"},
        replace_existing=True,
        jobstore="default",
    )

    # Session cleanup — every 15 minutes
    scheduler.add_job(
        cleanup_expired_sessions,
        IntervalTrigger(minutes=15),
        id="session_cleanup",
        name="Session Cleanup",
        replace_existing=True,
        jobstore="memory",   # ephemeral — recreated on restart
    )

    # Heartbeat — every 60 seconds
    scheduler.add_job(
        health_ping,
        "interval",
        seconds=60,
        id="health_ping",
        name="Health Ping",
        replace_existing=True,
        jobstore="memory",
    )


# ─────────────────────────────────────────────────────────────────────────────
# 2. Event listener — alert on failures
# ─────────────────────────────────────────────────────────────────────────────

def job_event_listener(event: JobExecutionEvent) -> None:
    if event.exception:
        log.error(
            "scheduled_job_failed",
            extra={
                "job_id":    event.job_id,
                "exception": str(event.exception),
                "traceback": str(event.traceback),
            },
        )
        # In production: send alert to PagerDuty/Slack
    elif event.code == EVENT_JOB_MISSED:
        log.warning("scheduled_job_missed", extra={"job_id": event.job_id})
    else:
        log.debug("scheduled_job_executed", extra={"job_id": event.job_id})


# ─────────────────────────────────────────────────────────────────────────────
# 3. AsyncIOScheduler — for FastAPI / asyncio applications
# ─────────────────────────────────────────────────────────────────────────────

def build_async_scheduler() -> AsyncIOScheduler:
    """
    AsyncIOScheduler runs inside the asyncio event loop.
    Use this with FastAPI, Starlette, or any asyncio application.
    """
    scheduler = AsyncIOScheduler(timezone="UTC")

    # Async jobs
    scheduler.add_job(
        sync_external_data,
        CronTrigger(hour="*/4", minute=0),   # every 4 hours
        id="sync_external",
        name="External Data Sync",
        kwargs={"source": "partner-api"},
        max_instances=1,
        coalesce=True,
        replace_existing=True,
    )

    scheduler.add_job(
        send_push_notifications,
        IntervalTrigger(minutes=5),
        id="push_notifications",
        name="Push Notifications",
        kwargs={"batch_size": 500},
        max_instances=1,
        coalesce=True,
        replace_existing=True,
    )

    # Mix sync jobs — APScheduler runs them in a thread pool
    scheduler.add_job(
        health_ping,
        "interval",
        seconds=60,
        id="health_ping",
        name="Health Ping",
        replace_existing=True,
    )

    scheduler.add_listener(
        job_event_listener,
        EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED,
    )

    return scheduler


# ─────────────────────────────────────────────────────────────────────────────
# 4. FastAPI lifespan integration
# ─────────────────────────────────────────────────────────────────────────────

try:
    from fastapi import FastAPI
    from fastapi.responses import JSONResponse

    _scheduler: AsyncIOScheduler | None = None

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
        """Start scheduler on startup, shutdown on exit."""
        global _scheduler
        _scheduler = build_async_scheduler()
        _scheduler.start()
        yield
        _scheduler.shutdown(wait=False)

    app = FastAPI(lifespan=lifespan)

    @app.get("/health")
    async def health() -> dict:
        if _scheduler is None:
            return {"status": "scheduler_not_started"}
        jobs = [
            {
                "id":       j.id,
                "name":     j.name,
                "next_run": j.next_run_time.isoformat() if j.next_run_time else None,
            }
            for j in _scheduler.get_jobs()
        ]
        return {"status": "ok", "scheduled_jobs": len(jobs), "jobs": jobs}

    @app.post("/jobs/{job_id}/pause")
    async def pause_job(job_id: str) -> dict:
        if _scheduler:
            _scheduler.pause_job(job_id)
        return {"job_id": job_id, "status": "paused"}

    @app.post("/jobs/{job_id}/resume")
    async def resume_job(job_id: str) -> dict:
        if _scheduler:
            _scheduler.resume_job(job_id)
        return {"job_id": job_id, "status": "resumed"}

    @app.post("/jobs/{job_id}/trigger")
    async def trigger_job(job_id: str) -> dict:
        """Manually fire a job immediately."""
        if _scheduler:
            job = _scheduler.get_job(job_id)
            if job:
                job.func(*job.args, **job.kwargs)
                return {"job_id": job_id, "status": "triggered"}
        return JSONResponse({"error": "job not found"}, status_code=404)

except ImportError:
    app = None  # type: ignore[assignment]


# ─────────────────────────────────────────────────────────────────────────────
# 5. Dynamic one-shot job — e.g., "send report at end of month"
# ─────────────────────────────────────────────────────────────────────────────

def schedule_one_shot(
    scheduler: BackgroundScheduler | AsyncIOScheduler,
    run_at: datetime,
    job_fn,
    job_id: str,
    **kwargs: Any,
) -> str:
    """Schedule a job to run once at a specific datetime."""
    scheduler.add_job(
        job_fn,
        "date",
        run_date=run_at,
        id=job_id,
        kwargs=kwargs,
        replace_existing=True,
    )
    return job_id


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import time

    scheduler = build_background_scheduler(db_url="sqlite:///:memory:")
    scheduler.add_listener(job_event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    register_background_jobs(scheduler)

    # Add a fast-firing test job
    scheduler.add_job(health_ping, "interval", seconds=2, id="test_ping")

    scheduler.start()
    print(f"Scheduler started. Jobs: {[j.id for j in scheduler.get_jobs()]}")

    time.sleep(5)   # let the test_ping fire twice

    print(f"\nJob next_run times:")
    for job in scheduler.get_jobs():
        print(f"  {job.id}: {job.next_run_time}")

    scheduler.shutdown()
    print("Scheduler stopped.")

For the celery beat alternative — Celery Beat requires a running Redis/RabbitMQ broker, a Celery worker process, and a separate celery beat process — three moving parts before a single cron job runs — while APScheduler’s BackgroundScheduler runs in the same Python process with no external dependencies, making it ideal for monoliths, CLI tools, and lower-volume scheduled tasks where broker infrastructure overhead is not justified. For the sched.scheduler alternative — Python’s stdlib sched.scheduler is a one-shot event queue that requires an explicit event loop call to process pending events and has no persistent job store, no cron syntax, and no missed-fire handling, while APScheduler’s CronTrigger.from_crontab("0 8 * * 1-5") handles daylight saving transitions, persists jobs to SQLite or Redis across restarts, and fires missed jobs with a configurable misfire_grace_time. The Claude Skills 360 bundle includes APScheduler skill sets covering BackgroundScheduler and AsyncIOScheduler setup, CronTrigger from_crontab and explicit fields, IntervalTrigger with coalesce/max_instances, DateTrigger for one-shot execution, SQLAlchemyJobStore and MemoryJobStore, add_listener for job error alerting, pause/resume/remove_job lifecycle, schedule_one_shot dynamic scheduling, FastAPI lifespan integration, and /jobs API endpoint management. Start with the free tier to try job scheduling code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free