Python’s sched module implements a general-purpose event scheduler. import sched. Create: s = sched.scheduler(timefunc=time.monotonic, delayfunc=time.sleep) — timefunc() returns the current time; delayfunc(t) sleeps for t seconds; both are replaceable for testing. Schedule relative: event = s.enter(delay, priority, action, argument=(), kwargs={}) — fires action(*argument, **kwargs) after delay seconds; returns an event handle. Schedule absolute: s.enterabs(time, priority, action, ...) — fires at absolute time value. Cancel: s.cancel(event) — removes the event from the queue; raises ValueError if not found. Run: s.run(blocking=True) — runs all events in time order; blocking=True sleeps between events; blocking=False fires only events that are already due (non-blocking drain). Empty: s.empty() → True if no events queued. Queue inspection: s.queue → list of Event(time, priority, action, argument, kwargs) named tuples. Priority: lower number = higher priority when two events share the same scheduled time. Claude Code generates single-threaded retry loops, debounce helpers, cron-style schedulers, background heartbeats, and deadline-based timeouts.
CLAUDE.md for sched
## sched Stack
- Stdlib: import sched, time, threading
- Create: s = sched.scheduler() # uses time.monotonic + time.sleep
- Relative: ev = s.enter(delay, 1, fn, (args,))
- Absolute: ev = s.enterabs(abs_time, 1, fn)
- Cancel: s.cancel(ev)
- Run: s.run() # blocks until queue empty
- Non-blocking drain: s.run(blocking=False) # fires only ready events
- Note: Not thread-safe: use threading.Timer or APScheduler for async use
sched Event Scheduler Pipeline
# app/schedutil.py — scheduler, retry, debounce, heartbeat, threadloop
from __future__ import annotations
import sched
import threading
import time
from dataclasses import dataclass, field
from typing import Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1. Scheduler helpers
# ─────────────────────────────────────────────────────────────────────────────
def run_after(
delay: float,
fn: Callable,
*args: object,
**kwargs: object,
) -> sched.Event:
"""
Schedule fn(*args, **kwargs) to run after delay seconds using a fresh
scheduler. Blocks until the callback has fired.
Example:
run_after(0.1, print, "hello from the future")
"""
s = sched.scheduler()
ev = s.enter(delay, 1, fn, args, kwargs)
s.run()
return ev
def run_at(
abs_time: float,
fn: Callable,
*args: object,
**kwargs: object,
) -> None:
"""
Schedule fn to run at absolute monotonic time abs_time. Blocks.
Example:
run_at(time.monotonic() + 2.0, print, "now!")
"""
s = sched.scheduler()
s.enterabs(abs_time, 1, fn, args, kwargs)
s.run()
# ─────────────────────────────────────────────────────────────────────────────
# 2. Retry scheduler
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RetryScheduler:
"""
Retry a callable up to max_attempts times with exponential backoff.
Uses sched under the hood (single-threaded, blocking).
Example:
def flaky():
import random
if random.random() < 0.7:
raise RuntimeError("transient error")
return "ok"
rs = RetryScheduler(flaky, max_attempts=5, base_delay=0.05)
result = rs.run()
"""
fn: Callable
args: tuple = ()
kwargs: dict = field(default_factory=dict)
max_attempts: int = 3
base_delay: float = 1.0
backoff: float = 2.0
max_delay: float = 60.0
def run(self) -> object:
"""
Try fn up to max_attempts times. Returns the result on success.
Raises the last exception if all attempts fail.
"""
s = sched.scheduler()
last_exc: BaseException | None = None
result_holder: list = []
def attempt(n: int) -> None:
nonlocal last_exc
try:
result_holder.append(self.fn(*self.args, **self.kwargs))
except Exception as exc:
last_exc = exc
if n < self.max_attempts:
delay = min(self.base_delay * (self.backoff ** (n - 1)), self.max_delay)
s.enter(delay, 1, attempt, (n + 1,))
s.enter(0, 1, attempt, (1,))
s.run()
if result_holder:
return result_holder[0]
if last_exc:
raise last_exc
raise RuntimeError("RetryScheduler: no result and no exception")
# ─────────────────────────────────────────────────────────────────────────────
# 3. Debounce helper
# ─────────────────────────────────────────────────────────────────────────────
class Debouncer:
"""
Call a function at most once per wait_seconds, even if triggered many times.
Thread-safe: uses a threading.Timer internally with sched semantics.
Example:
save = Debouncer(persist_to_disk, wait=0.5)
# called rapidly from many places:
save(); save(); save()
# persist_to_disk fires once, 0.5s after the last call
"""
def __init__(self, fn: Callable, wait: float, *args: object, **kwargs: object) -> None:
self._fn = fn
self._args = args
self._kwargs = kwargs
self._wait = wait
self._timer: threading.Timer | None = None
self._lock = threading.Lock()
def __call__(self, *args: object, **kwargs: object) -> None:
effective_args = args or self._args
effective_kwargs = kwargs or self._kwargs
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(
self._wait, self._fn, effective_args, effective_kwargs
)
self._timer.daemon = True
self._timer.start()
def cancel(self) -> None:
"""Cancel the pending call if not yet fired."""
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
def flush(self) -> None:
"""Fire the callback immediately (cancels the pending timer)."""
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
self._fn(*self._args, **self._kwargs)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Recurring scheduler (heartbeat / cron-style)
# ─────────────────────────────────────────────────────────────────────────────
class RecurringScheduler:
"""
Run a callback at a fixed interval in a background thread.
Uses sched.scheduler with re-scheduling on each fire.
Example:
def heartbeat():
print(f" beat at {time.monotonic():.2f}")
rs = RecurringScheduler(heartbeat, interval=0.1)
rs.start()
time.sleep(0.35)
rs.stop()
"""
def __init__(self, fn: Callable, interval: float, *args: object, **kwargs: object) -> None:
self._fn = fn
self._interval = interval
self._args = args
self._kwargs = kwargs
self._running = False
self._thread: threading.Thread | None = None
self._sched = sched.scheduler()
def _tick(self) -> None:
if not self._running:
return
try:
self._fn(*self._args, **self._kwargs)
except Exception:
pass
if self._running:
self._sched.enter(self._interval, 1, self._tick)
self._sched.run(blocking=False)
def _loop(self) -> None:
self._sched.enter(self._interval, 1, self._tick)
while self._running:
self._sched.run(blocking=False)
time.sleep(min(self._interval / 10, 0.05))
def start(self) -> "RecurringScheduler":
self._running = True
self._sched = sched.scheduler()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
return self
def stop(self, timeout: float = 2.0) -> None:
self._running = False
if self._thread:
self._thread.join(timeout=timeout)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Multi-event scheduler (cron-style dispatcher)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ScheduledJob:
name: str
fn: Callable
delay: float # delay after start() or after previous fire
repeat: bool = False
class JobScheduler:
"""
Run a set of named jobs, optionally repeating.
Example:
js = JobScheduler()
js.add("cleanup", gc.collect, delay=5.0, repeat=True)
js.add("ping", send_ping, delay=1.0, repeat=True)
js.run(duration=10.0)
"""
def __init__(self) -> None:
self._jobs: dict[str, ScheduledJob] = {}
self._sched = sched.scheduler()
def add(self, name: str, fn: Callable, delay: float, repeat: bool = False) -> None:
self._jobs[name] = ScheduledJob(name=name, fn=fn, delay=delay, repeat=repeat)
def remove(self, name: str) -> None:
self._jobs.pop(name, None)
def run(self, duration: "float | None" = None) -> None:
"""
Fire all jobs in delay order. If repeat=True, re-schedules automatically.
Stops after duration seconds (or runs forever if None, but then only
one pass — for truly infinite loops, use RecurringScheduler instead).
"""
start = time.monotonic()
self._sched = sched.scheduler()
def _fire(job: ScheduledJob) -> None:
if duration and (time.monotonic() - start) >= duration:
return
try:
job.fn()
except Exception:
pass
if job.repeat:
if not duration or (time.monotonic() - start + job.delay) < duration:
self._sched.enter(job.delay, 1, _fire, (job,))
for job in self._jobs.values():
self._sched.enter(job.delay, 1, _fire, (job,))
if duration:
self._sched.enter(duration, 99, lambda: None) # sentinel to end blocking run
self._sched.run()
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== sched demo ===")
# ── run_after ─────────────────────────────────────────────────────────────
print("\n--- run_after ---")
fired: list[str] = []
run_after(0.05, lambda: fired.append("A"))
run_after(0.05, lambda: fired.append("B"))
run_after(0.05, lambda: fired.append("C"))
print(f" fired: {fired}")
# ── sched priority ────────────────────────────────────────────────────────
print("\n--- priority ordering ---")
order: list[int] = []
s = sched.scheduler()
# same time but different priorities
s.enterabs(time.monotonic(), 3, lambda: order.append(3))
s.enterabs(time.monotonic(), 1, lambda: order.append(1))
s.enterabs(time.monotonic(), 2, lambda: order.append(2))
s.run()
print(f" priority order: {order}")
# ── RetryScheduler ────────────────────────────────────────────────────────
print("\n--- RetryScheduler ---")
attempt_count = [0]
def flaky_fn() -> str:
attempt_count[0] += 1
if attempt_count[0] < 3:
raise RuntimeError("not ready")
return f"succeeded on attempt {attempt_count[0]}"
rs = RetryScheduler(flaky_fn, max_attempts=5, base_delay=0.01, backoff=1.5)
result = rs.run()
print(f" {result}")
# ── Debouncer ─────────────────────────────────────────────────────────────
print("\n--- Debouncer ---")
calls: list[float] = []
debounced = Debouncer(lambda: calls.append(time.monotonic()), wait=0.1)
for _ in range(5):
debounced()
time.sleep(0.01)
time.sleep(0.15)
print(f" called 5x rapidly, fired {len(calls)} time(s)")
# ── RecurringScheduler ────────────────────────────────────────────────────
print("\n--- RecurringScheduler ---")
beats: list[float] = []
rs2 = RecurringScheduler(lambda: beats.append(time.monotonic()), interval=0.05)
rs2.start()
time.sleep(0.25)
rs2.stop()
print(f" fired {len(beats)} times in ~0.25s")
# ── JobScheduler ──────────────────────────────────────────────────────────
print("\n--- JobScheduler ---")
log: list[str] = []
js = JobScheduler()
js.add("fast", lambda: log.append("fast"), delay=0.05, repeat=True)
js.add("slow", lambda: log.append("slow"), delay=0.15, repeat=True)
js.run(duration=0.35)
print(f" jobs fired: fast×{log.count('fast')} slow×{log.count('slow')}")
print("\n=== done ===")
For the threading.Timer alternative — threading.Timer(interval, fn).start() fires a callable once after a delay in a background thread — use threading.Timer for simple fire-once delayed calls when you need non-blocking behavior; use sched.scheduler for deterministic single-threaded queuing of multiple events with priorities, or when you want to replace the time source for testing (pass a fake timefunc and delayfunc). For the APScheduler / schedule (PyPI) alternatives — apscheduler supports cron-style, interval, and one-shot jobs with async backends; the schedule library offers a fluent API (schedule.every(10).minutes.do(job)) — use these for long-running daemon processes that need persistent schedules, timezone support, job stores, or async execution; use sched for lightweight single-process scripting where a subprocess-free stdlib solution is preferred. The Claude Skills 360 bundle includes sched skill sets covering run_after()/run_at() one-shot helpers, RetryScheduler with exponential backoff via re-scheduling, Debouncer thread-safe trailing-edge debounce, RecurringScheduler background interval heartbeat, and JobScheduler with add()/remove()/run() named-job cron-style dispatcher. Start with the free tier to try event scheduling patterns and sched pipeline code generation.