Claude Code for sched: Python Event Scheduler — Claude Skills 360 Blog
Blog / AI / Claude Code for sched: Python Event Scheduler
AI

Claude Code for sched: Python Event Scheduler

Published: December 8, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s sched module implements a general-purpose event scheduler. import sched. Create: s = sched.scheduler(timefunc=time.monotonic, delayfunc=time.sleep)timefunc() returns the current time; delayfunc(t) sleeps for t seconds; both are replaceable for testing. Schedule relative: event = s.enter(delay, priority, action, argument=(), kwargs={}) — fires action(*argument, **kwargs) after delay seconds; returns an event handle. Schedule absolute: s.enterabs(time, priority, action, ...) — fires at absolute time value. Cancel: s.cancel(event) — removes the event from the queue; raises ValueError if not found. Run: s.run(blocking=True) — runs all events in time order; blocking=True sleeps between events; blocking=False fires only events that are already due (non-blocking drain). Empty: s.empty()True if no events queued. Queue inspection: s.queue → list of Event(time, priority, action, argument, kwargs) named tuples. Priority: lower number = higher priority when two events share the same scheduled time. Claude Code generates single-threaded retry loops, debounce helpers, cron-style schedulers, background heartbeats, and deadline-based timeouts.

CLAUDE.md for sched

## sched Stack
- Stdlib: import sched, time, threading
- Create: s = sched.scheduler()              # uses time.monotonic + time.sleep
- Relative: ev = s.enter(delay, 1, fn, (args,))
- Absolute: ev = s.enterabs(abs_time, 1, fn)
- Cancel: s.cancel(ev)
- Run: s.run()                              # blocks until queue empty
- Non-blocking drain: s.run(blocking=False) # fires only ready events
- Note:    Not thread-safe: use threading.Timer or APScheduler for async use

sched Event Scheduler Pipeline

# app/schedutil.py — scheduler, retry, debounce, heartbeat, threadloop
from __future__ import annotations

import sched
import threading
import time
from dataclasses import dataclass, field
from typing import Callable


# ─────────────────────────────────────────────────────────────────────────────
# 1. Scheduler helpers
# ─────────────────────────────────────────────────────────────────────────────

def run_after(
    delay: float,
    fn: Callable,
    *args: object,
    **kwargs: object,
) -> sched.Event:
    """
    Schedule fn(*args, **kwargs) to run after delay seconds using a fresh
    scheduler. Blocks until the callback has fired.

    Example:
        run_after(0.1, print, "hello from the future")
    """
    s = sched.scheduler()
    ev = s.enter(delay, 1, fn, args, kwargs)
    s.run()
    return ev


def run_at(
    abs_time: float,
    fn: Callable,
    *args: object,
    **kwargs: object,
) -> None:
    """
    Schedule fn to run at absolute monotonic time abs_time. Blocks.

    Example:
        run_at(time.monotonic() + 2.0, print, "now!")
    """
    s = sched.scheduler()
    s.enterabs(abs_time, 1, fn, args, kwargs)
    s.run()


# ─────────────────────────────────────────────────────────────────────────────
# 2. Retry scheduler
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class RetryScheduler:
    """
    Retry a callable up to max_attempts times with exponential backoff.
    Uses sched under the hood (single-threaded, blocking).

    Example:
        def flaky():
            import random
            if random.random() < 0.7:
                raise RuntimeError("transient error")
            return "ok"

        rs = RetryScheduler(flaky, max_attempts=5, base_delay=0.05)
        result = rs.run()
    """
    fn:          Callable
    args:        tuple = ()
    kwargs:      dict = field(default_factory=dict)
    max_attempts: int = 3
    base_delay:  float = 1.0
    backoff:     float = 2.0
    max_delay:   float = 60.0

    def run(self) -> object:
        """
        Try fn up to max_attempts times. Returns the result on success.
        Raises the last exception if all attempts fail.
        """
        s = sched.scheduler()
        last_exc: BaseException | None = None
        result_holder: list = []

        def attempt(n: int) -> None:
            nonlocal last_exc
            try:
                result_holder.append(self.fn(*self.args, **self.kwargs))
            except Exception as exc:
                last_exc = exc
                if n < self.max_attempts:
                    delay = min(self.base_delay * (self.backoff ** (n - 1)), self.max_delay)
                    s.enter(delay, 1, attempt, (n + 1,))

        s.enter(0, 1, attempt, (1,))
        s.run()

        if result_holder:
            return result_holder[0]
        if last_exc:
            raise last_exc
        raise RuntimeError("RetryScheduler: no result and no exception")


# ─────────────────────────────────────────────────────────────────────────────
# 3. Debounce helper
# ─────────────────────────────────────────────────────────────────────────────

class Debouncer:
    """
    Call a function at most once per wait_seconds, even if triggered many times.
    Thread-safe: uses a threading.Timer internally with sched semantics.

    Example:
        save = Debouncer(persist_to_disk, wait=0.5)
        # called rapidly from many places:
        save(); save(); save()
        # persist_to_disk fires once, 0.5s after the last call
    """

    def __init__(self, fn: Callable, wait: float, *args: object, **kwargs: object) -> None:
        self._fn = fn
        self._args = args
        self._kwargs = kwargs
        self._wait = wait
        self._timer: threading.Timer | None = None
        self._lock = threading.Lock()

    def __call__(self, *args: object, **kwargs: object) -> None:
        effective_args = args or self._args
        effective_kwargs = kwargs or self._kwargs
        with self._lock:
            if self._timer is not None:
                self._timer.cancel()
            self._timer = threading.Timer(
                self._wait, self._fn, effective_args, effective_kwargs
            )
            self._timer.daemon = True
            self._timer.start()

    def cancel(self) -> None:
        """Cancel the pending call if not yet fired."""
        with self._lock:
            if self._timer is not None:
                self._timer.cancel()
                self._timer = None

    def flush(self) -> None:
        """Fire the callback immediately (cancels the pending timer)."""
        with self._lock:
            if self._timer is not None:
                self._timer.cancel()
                self._timer = None
        self._fn(*self._args, **self._kwargs)


# ─────────────────────────────────────────────────────────────────────────────
# 4. Recurring scheduler (heartbeat / cron-style)
# ─────────────────────────────────────────────────────────────────────────────

class RecurringScheduler:
    """
    Run a callback at a fixed interval in a background thread.
    Uses sched.scheduler with re-scheduling on each fire.

    Example:
        def heartbeat():
            print(f"  beat at {time.monotonic():.2f}")

        rs = RecurringScheduler(heartbeat, interval=0.1)
        rs.start()
        time.sleep(0.35)
        rs.stop()
    """

    def __init__(self, fn: Callable, interval: float, *args: object, **kwargs: object) -> None:
        self._fn = fn
        self._interval = interval
        self._args = args
        self._kwargs = kwargs
        self._running = False
        self._thread: threading.Thread | None = None
        self._sched = sched.scheduler()

    def _tick(self) -> None:
        if not self._running:
            return
        try:
            self._fn(*self._args, **self._kwargs)
        except Exception:
            pass
        if self._running:
            self._sched.enter(self._interval, 1, self._tick)
            self._sched.run(blocking=False)

    def _loop(self) -> None:
        self._sched.enter(self._interval, 1, self._tick)
        while self._running:
            self._sched.run(blocking=False)
            time.sleep(min(self._interval / 10, 0.05))

    def start(self) -> "RecurringScheduler":
        self._running = True
        self._sched = sched.scheduler()
        self._thread = threading.Thread(target=self._loop, daemon=True)
        self._thread.start()
        return self

    def stop(self, timeout: float = 2.0) -> None:
        self._running = False
        if self._thread:
            self._thread.join(timeout=timeout)


# ─────────────────────────────────────────────────────────────────────────────
# 5. Multi-event scheduler (cron-style dispatcher)
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class ScheduledJob:
    name:     str
    fn:       Callable
    delay:    float           # delay after start() or after previous fire
    repeat:   bool = False


class JobScheduler:
    """
    Run a set of named jobs, optionally repeating.

    Example:
        js = JobScheduler()
        js.add("cleanup", gc.collect, delay=5.0, repeat=True)
        js.add("ping",    send_ping,  delay=1.0, repeat=True)
        js.run(duration=10.0)
    """

    def __init__(self) -> None:
        self._jobs: dict[str, ScheduledJob] = {}
        self._sched = sched.scheduler()

    def add(self, name: str, fn: Callable, delay: float, repeat: bool = False) -> None:
        self._jobs[name] = ScheduledJob(name=name, fn=fn, delay=delay, repeat=repeat)

    def remove(self, name: str) -> None:
        self._jobs.pop(name, None)

    def run(self, duration: "float | None" = None) -> None:
        """
        Fire all jobs in delay order. If repeat=True, re-schedules automatically.
        Stops after duration seconds (or runs forever if None, but then only
        one pass — for truly infinite loops, use RecurringScheduler instead).
        """
        start = time.monotonic()
        self._sched = sched.scheduler()

        def _fire(job: ScheduledJob) -> None:
            if duration and (time.monotonic() - start) >= duration:
                return
            try:
                job.fn()
            except Exception:
                pass
            if job.repeat:
                if not duration or (time.monotonic() - start + job.delay) < duration:
                    self._sched.enter(job.delay, 1, _fire, (job,))

        for job in self._jobs.values():
            self._sched.enter(job.delay, 1, _fire, (job,))

        if duration:
            self._sched.enter(duration, 99, lambda: None)  # sentinel to end blocking run
        self._sched.run()


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== sched demo ===")

    # ── run_after ─────────────────────────────────────────────────────────────
    print("\n--- run_after ---")
    fired: list[str] = []
    run_after(0.05, lambda: fired.append("A"))
    run_after(0.05, lambda: fired.append("B"))
    run_after(0.05, lambda: fired.append("C"))
    print(f"  fired: {fired}")

    # ── sched priority ────────────────────────────────────────────────────────
    print("\n--- priority ordering ---")
    order: list[int] = []
    s = sched.scheduler()
    # same time but different priorities
    s.enterabs(time.monotonic(), 3, lambda: order.append(3))
    s.enterabs(time.monotonic(), 1, lambda: order.append(1))
    s.enterabs(time.monotonic(), 2, lambda: order.append(2))
    s.run()
    print(f"  priority order: {order}")

    # ── RetryScheduler ────────────────────────────────────────────────────────
    print("\n--- RetryScheduler ---")
    attempt_count = [0]

    def flaky_fn() -> str:
        attempt_count[0] += 1
        if attempt_count[0] < 3:
            raise RuntimeError("not ready")
        return f"succeeded on attempt {attempt_count[0]}"

    rs = RetryScheduler(flaky_fn, max_attempts=5, base_delay=0.01, backoff=1.5)
    result = rs.run()
    print(f"  {result}")

    # ── Debouncer ─────────────────────────────────────────────────────────────
    print("\n--- Debouncer ---")
    calls: list[float] = []
    debounced = Debouncer(lambda: calls.append(time.monotonic()), wait=0.1)
    for _ in range(5):
        debounced()
        time.sleep(0.01)
    time.sleep(0.15)
    print(f"  called 5x rapidly, fired {len(calls)} time(s)")

    # ── RecurringScheduler ────────────────────────────────────────────────────
    print("\n--- RecurringScheduler ---")
    beats: list[float] = []
    rs2 = RecurringScheduler(lambda: beats.append(time.monotonic()), interval=0.05)
    rs2.start()
    time.sleep(0.25)
    rs2.stop()
    print(f"  fired {len(beats)} times in ~0.25s")

    # ── JobScheduler ──────────────────────────────────────────────────────────
    print("\n--- JobScheduler ---")
    log: list[str] = []
    js = JobScheduler()
    js.add("fast", lambda: log.append("fast"), delay=0.05, repeat=True)
    js.add("slow", lambda: log.append("slow"), delay=0.15, repeat=True)
    js.run(duration=0.35)
    print(f"  jobs fired: fast×{log.count('fast')} slow×{log.count('slow')}")

    print("\n=== done ===")

For the threading.Timer alternative — threading.Timer(interval, fn).start() fires a callable once after a delay in a background thread — use threading.Timer for simple fire-once delayed calls when you need non-blocking behavior; use sched.scheduler for deterministic single-threaded queuing of multiple events with priorities, or when you want to replace the time source for testing (pass a fake timefunc and delayfunc). For the APScheduler / schedule (PyPI) alternatives — apscheduler supports cron-style, interval, and one-shot jobs with async backends; the schedule library offers a fluent API (schedule.every(10).minutes.do(job)) — use these for long-running daemon processes that need persistent schedules, timezone support, job stores, or async execution; use sched for lightweight single-process scripting where a subprocess-free stdlib solution is preferred. The Claude Skills 360 bundle includes sched skill sets covering run_after()/run_at() one-shot helpers, RetryScheduler with exponential backoff via re-scheduling, Debouncer thread-safe trailing-edge debounce, RecurringScheduler background interval heartbeat, and JobScheduler with add()/remove()/run() named-job cron-style dispatcher. Start with the free tier to try event scheduling patterns and sched pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free