schedule runs periodic jobs in Python. pip install schedule. Basic: import schedule; schedule.every(10).minutes.do(job). Interval: schedule.every(2).hours.do(job). Daily: schedule.every().day.at("09:30").do(job). Weekly: schedule.every().monday.at("08:00").do(job). Weekdays: schedule.every().monday.do(f); schedule.every().tuesday.do(f) (etc.). Args: schedule.every(5).minutes.do(job, "arg1", key="val"). Tag: schedule.every().hour.do(job).tag("reports","email"). Cancel tagged: schedule.clear("reports"). Cancel one: schedule.cancel_job(job). Next run: schedule.next_run(). All jobs: schedule.jobs. Idle seconds: schedule.idle_seconds(). Run loop: while True: schedule.run_pending(); time.sleep(1). Run all: schedule.run_all(). Custom scheduler: from schedule import Scheduler; s = Scheduler(). Multiple schedulers: separate Scheduler instances. Thread: threading.Thread(target=run_loop, daemon=True).start(). Until: schedule.every().hour.until("23:59").do(job). At: schedule.every().second.do(job) (minimum is 1 second). schedule.every(5).to(10).minutes.do(job) — random interval. Cancel if returns schedule.CancelJob. Claude Code generates schedule cron jobs, background loops, FastAPI startup schedulers, and taggable periodic task systems.
CLAUDE.md for schedule
## schedule Stack
- Version: schedule >= 1.2 | pip install schedule
- Interval: schedule.every(N).minutes/hours/days.do(fn, *args, **kwargs)
- Daily: schedule.every().day.at("HH:MM").do(fn)
- Loop: while True: schedule.run_pending(); time.sleep(1)
- Thread: threading.Thread(target=loop, daemon=True).start()
- Cancel: schedule.cancel_job(job) | schedule.clear("tag")
schedule Periodic Task Pipeline
# app/scheduler.py — schedule jobs, threading, tags, error handling, and FastAPI
from __future__ import annotations
import logging
import random
import threading
import time
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import Any, Callable
import schedule
from schedule import Job, Scheduler
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Scheduler runner helpers
# ─────────────────────────────────────────────────────────────────────────────
def run_pending_loop(
interval_secs: float = 1.0,
stop_event: threading.Event | None = None,
scheduler: Scheduler | None = None,
) -> None:
"""
Blocking loop that calls run_pending() every interval_secs.
Stops when stop_event is set (or KeyboardInterrupt).
Usage (foreground):
run_pending_loop()
Usage (thread):
stop = threading.Event()
t = threading.Thread(target=run_pending_loop, args=(1.0, stop), daemon=True)
t.start()
...
stop.set()
"""
s = scheduler or schedule.default_scheduler
try:
while not (stop_event and stop_event.is_set()):
s.run_pending()
# Sleep only as long as the next job needs — saves CPU
idle = s.idle_seconds
sleep_time = min(idle if idle else interval_secs, interval_secs)
time.sleep(max(sleep_time, 0))
except KeyboardInterrupt:
pass
def start_background_scheduler(
interval_secs: float = 1.0,
scheduler: Scheduler | None = None,
) -> threading.Event:
"""
Start scheduler in a daemon thread.
Returns a threading.Event — set it to stop the loop cleanly.
Usage:
stop = start_background_scheduler()
# ... application runs ...
stop.set()
"""
stop_event = threading.Event()
t = threading.Thread(
target=run_pending_loop,
args=(interval_secs, stop_event, scheduler),
daemon=True,
name="schedule-worker",
)
t.start()
log.info("Background scheduler started (thread: %s)", t.name)
return stop_event
# ─────────────────────────────────────────────────────────────────────────────
# 2. Error-resilient job wrapper
# ─────────────────────────────────────────────────────────────────────────────
def safe_job(fn: Callable, *args: Any, reraise: bool = False, **kwargs: Any) -> Callable:
"""
Wrap a function so that exceptions are logged but don't crash the scheduler.
Returns a wrapper suitable for .do(safe_job(my_fn, arg1, key=val)).
Usage:
schedule.every().hour.do(safe_job(send_reports, "[email protected]"))
"""
def wrapper() -> Any:
try:
return fn(*args, **kwargs)
except Exception as e:
log.error("Scheduled job '%s' failed: %s", fn.__name__, e, exc_info=True)
if reraise:
raise
wrapper.__name__ = fn.__name__
return wrapper
def cancellable_job(fn: Callable, *args: Any, **kwargs: Any) -> Callable:
"""
Return a wrapper that calls fn and cancels itself if fn returns False or raises.
Usage:
# Job cancels itself after completing successfully once
def one_shot_task():
do_work()
return schedule.CancelJob
schedule.every().day.at("07:00").do(cancellable_job(one_shot_task))
"""
def wrapper():
result = fn(*args, **kwargs)
if result is schedule.CancelJob or result is False:
return schedule.CancelJob
wrapper.__name__ = fn.__name__
return wrapper
# ─────────────────────────────────────────────────────────────────────────────
# 3. Job factory helpers
# ─────────────────────────────────────────────────────────────────────────────
def add_interval_job(
fn: Callable,
*args: Any,
every: int = 1,
unit: str = "minutes",
tag: str | None = None,
safe: bool = True,
scheduler: Scheduler | None = None,
**kwargs: Any,
) -> Job:
"""
Register a repeating interval job.
unit: "seconds" | "minutes" | "hours" | "days" | "weeks"
Example:
add_interval_job(sync_users, every=30, unit="minutes", tag="sync")
"""
s = scheduler or schedule
fn_ = safe_job(fn, *args, **kwargs) if safe else (lambda: fn(*args, **kwargs))
job = getattr(getattr(s.every(every), unit), "do")(fn_)
if tag:
job.tag(tag)
return job
def add_daily_job(
fn: Callable,
*args: Any,
at: str = "00:00",
tag: str | None = None,
safe: bool = True,
scheduler: Scheduler | None = None,
**kwargs: Any,
) -> Job:
"""
Register a daily job at a fixed time (HH:MM or HH:MM:SS).
Example:
add_daily_job(send_digest, "[email protected]", at="08:00", tag="email")
"""
s = scheduler or schedule
fn_ = safe_job(fn, *args, **kwargs) if safe else (lambda: fn(*args, **kwargs))
job = s.every().day.at(at).do(fn_)
if tag:
job.tag(tag)
return job
def add_weekly_job(
fn: Callable,
*args: Any,
day: str = "monday",
at: str = "09:00",
tag: str | None = None,
safe: bool = True,
scheduler: Scheduler | None = None,
**kwargs: Any,
) -> Job:
"""
Register a weekly job.
day: "monday" | "tuesday" | ... | "sunday"
Example:
add_weekly_job(generate_weekly_report, at="06:00", day="monday", tag="reports")
"""
s = scheduler or schedule
fn_ = safe_job(fn, *args, **kwargs) if safe else (lambda: fn(*args, **kwargs))
job = getattr(s.every(), day).at(at).do(fn_)
if tag:
job.tag(tag)
return job
def add_random_interval_job(
fn: Callable,
*args: Any,
min_interval: int = 5,
max_interval: int = 15,
unit: str = "minutes",
tag: str | None = None,
safe: bool = True,
scheduler: Scheduler | None = None,
**kwargs: Any,
) -> Job:
"""
Register a job with a randomised interval (e.g. 5–15 minutes).
Useful for jitter to avoid thundering-herd problems.
Example:
add_random_interval_job(poll_external_api, min_interval=10, max_interval=20)
"""
s = scheduler or schedule
fn_ = safe_job(fn, *args, **kwargs) if safe else (lambda: fn(*args, **kwargs))
job = s.every(min_interval).to(max_interval)
job = getattr(job, unit).do(fn_)
if tag:
job.tag(tag)
return job
# ─────────────────────────────────────────────────────────────────────────────
# 4. Job inspection / management
# ─────────────────────────────────────────────────────────────────────────────
def list_jobs(scheduler: Scheduler | None = None) -> list[dict]:
"""Return a summary of all scheduled jobs."""
s = scheduler or schedule.default_scheduler
result = []
for job in s.jobs:
result.append({
"next_run": str(job.next_run),
"last_run": str(job.last_run),
"interval": str(job.interval),
"unit": job.unit,
"tags": list(job.tags),
"job_func": getattr(job.job_func, "__name__", str(job.job_func)),
})
return result
def cancel_by_tag(tag: str, scheduler: Scheduler | None = None) -> int:
"""Cancel all jobs with the given tag. Returns count cancelled."""
s = scheduler or schedule
before = len((scheduler or schedule.default_scheduler).jobs)
s.clear(tag)
after = len((scheduler or schedule.default_scheduler).jobs)
return before - after
def next_run_in(scheduler: Scheduler | None = None) -> float | None:
"""Return seconds until next job fires, or None if no jobs."""
s = scheduler or schedule.default_scheduler
return s.idle_seconds
# ─────────────────────────────────────────────────────────────────────────────
# 5. Example periodic tasks
# ─────────────────────────────────────────────────────────────────────────────
def heartbeat(name: str = "default") -> None:
"""Emit a log heartbeat — useful for liveness monitoring."""
log.info("[heartbeat] %s at %s", name, datetime.utcnow().isoformat())
def cleanup_temp_files(directory: str = "/tmp", max_age_hours: int = 24) -> int:
"""Simulate removing temp files older than max_age_hours. Returns count."""
log.info("Cleaning up %s (files older than %dh)", directory, max_age_hours)
return 0 # would return actual count in production
def send_daily_digest(recipients: list[str]) -> dict:
"""
Simulate sending a daily digest email.
Returns status per recipient.
"""
log.info("Sending daily digest to %d recipients", len(recipients))
return {r: "sent" for r in recipients}
def sync_database(source: str, destination: str) -> bool:
"""Simulate a database sync job."""
log.info("Syncing %s → %s", source, destination)
return True
# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = '''
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.scheduler import (
add_interval_job, add_daily_job, add_weekly_job,
start_background_scheduler, list_jobs, cancel_by_tag,
cleanup_temp_files, send_daily_digest, heartbeat,
)
import schedule
_stop_event = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _stop_event
# Register jobs
add_interval_job(heartbeat, "api", every=1, unit="minutes", tag="monitoring")
add_interval_job(cleanup_temp_files, "/tmp", 12, every=6, unit="hours", tag="maintenance")
add_daily_job(send_daily_digest, ["[email protected]"], at="08:30", tag="email")
add_weekly_job(lambda: None, day="sunday", at="02:00", tag="maintenance")
_stop_event = start_background_scheduler()
yield
_stop_event.set() # stop the scheduler thread
app = FastAPI(lifespan=lifespan)
@app.get("/scheduler/jobs")
def get_jobs():
return {"jobs": list_jobs()}
@app.delete("/scheduler/jobs/{tag}")
def cancel_jobs(tag: str):
count = cancel_by_tag(tag)
return {"cancelled": count, "tag": tag}
@app.post("/scheduler/run-now/{tag}")
def run_tag_now(tag: str):
schedule.run_all(delay_seconds=0)
return {"status": "triggered", "tag": tag}
'''
# ─────────────────────────────────────────────────────────────────────────────
# Demo (sync mode — runs for a few seconds to show jobs firing)
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import schedule as sch
# Use a fresh isolated scheduler for the demo
s = Scheduler()
print("=== Registering jobs ===")
j1 = add_interval_job(heartbeat, "demo", every=2, unit="seconds", tag="demo", scheduler=s)
j2 = add_interval_job(cleanup_temp_files, "/tmp", every=5, unit="seconds", tag="maint", scheduler=s)
j3 = add_daily_job(send_daily_digest, ["[email protected]"], at="23:59", tag="email", scheduler=s)
print(f" {len(s.jobs)} jobs scheduled:")
for j in list_jobs(s):
print(f" {j['job_func']}: every {j['interval']} {j['unit']} | next={j['next_run']}")
print("\n=== Running for 6 seconds ===")
stop = start_background_scheduler(scheduler=s)
time.sleep(6)
stop.set()
print("\n=== Cancel maintenance jobs ===")
cancelled = cancel_by_tag("maint", scheduler=s)
print(f" Cancelled {cancelled} jobs tagged 'maint'")
print(f" Remaining jobs: {len(s.jobs)}")
For the APScheduler alternative — APScheduler supports cron, interval, date triggers, multiple job stores (Redis, SQLAlchemy), and async schedulers via AsyncIOScheduler; schedule is a single-file zero-dependency library with a fluent API (every().monday.at("08:00").do(fn)) that is ideal for simple scripts and applications that don’t need persistence or distributed scheduling. For the Celery Beat alternative — Celery Beat provides production-grade periodic task scheduling with Redis/RabbitMQ brokers, task routing, and rate limiting as part of the full Celery ecosystem; schedule is the right choice when you want lightweight in-process scheduling without a broker, suitable for single-process web apps, CLI tools, and simple automation scripts. The Claude Skills 360 bundle includes schedule skill sets covering every().minutes/hours/days/weekday interval scheduling, at(“HH:MM”) time-based triggers, run_pending_loop() with idle_seconds sleep optimization, start_background_scheduler() daemon thread, safe_job() exception-resilient wrapper, cancellable_job() self-cancelling pattern, add_interval_job()/add_daily_job()/add_weekly_job() factory helpers, add_random_interval_job() for jitter, list_jobs()/cancel_by_tag() management, and FastAPI lifespan integration. Start with the free tier to try Python periodic job scheduling code generation.