Python’s signal module intercepts Unix signals for graceful shutdown, timeouts, and daemon lifecycle management. import signal. signal.signal: signal.signal(signal.SIGTERM, handler) — register handler(signum, frame) for a signal; only the main thread can register handlers. SIG_DFL / SIG_IGN: signal.signal(signal.SIGHUP, signal.SIG_IGN) — ignore; signal.SIG_DFL restores OS default. getsignal: signal.getsignal(signal.SIGINT) → current handler. raise_signal: signal.raise_signal(signal.SIGUSR1) — send to self (Python 3.8+). alarm: signal.alarm(5) — schedules SIGALRM in 5 seconds (Unix only); signal.alarm(0) cancels. setitimer: signal.setitimer(signal.ITIMER_REAL, 1.5) — floating-point alarm; fires SIGALRM at interval. Signals: signal.Signals.SIGTERM (15), signal.SIGINT (2), signal.SIGHUP (1), signal.SIGCHLD (17), signal.SIGUSR1 (10), signal.SIGUSR2 (12). strsignal: signal.strsignal(signal.SIGTERM) → “Terminated”. sigwait: sig = signal.sigwait({signal.SIGUSR1, signal.SIGTERM}) — block until one arrives (use in threads). pthread_kill: signal.pthread_kill(tid, signal.SIGUSR1) — send to a specific thread. signal.valid_signals() → set of all valid signal numbers. Limitations: handlers run in the main thread only; use threading.Event or queues to relay signals to worker threads. Claude Code generates graceful shutdown managers, timeout decorators, daemon reload handlers, and child-process reaping utilities.
CLAUDE.md for signal
## signal Stack
- Stdlib: import signal, os, threading
- Handle: signal.signal(signal.SIGTERM, lambda s, f: shutdown_event.set())
- Ignore: signal.signal(signal.SIGHUP, signal.SIG_IGN)
- Alarm: signal.alarm(10) # SIGALRM in 10s (Unix only)
- Names: signal.strsignal(signum)
- Thread: use threading.Event / queue.Queue to relay signal to worker threads
signal Signal Handling Pipeline
# app/signalutil.py — shutdown, timeout, reload, child reap, signal relay
from __future__ import annotations
import os
import queue
import signal
import sys
import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Callable, Generator, TypeVar
T = TypeVar("T")
_IS_UNIX = sys.platform != "win32"
# ─────────────────────────────────────────────────────────────────────────────
# 1. Graceful shutdown manager
# ─────────────────────────────────────────────────────────────────────────────
class ShutdownManager:
"""
Registers SIGTERM and SIGINT handlers and provides a threading.Event
that is set when a termination signal arrives.
Optionally calls a cleanup callback.
Example:
sm = ShutdownManager()
sm.install()
while not sm.should_exit:
do_work()
sm.wait()
"""
def __init__(self, signals: list[int] | None = None) -> None:
self._sigs = signals or [signal.SIGTERM, signal.SIGINT]
self._event = threading.Event()
self._callbacks: list[Callable[[], None]] = []
self._signum: int | None = None
def on_shutdown(self, fn: Callable[[], None]) -> None:
"""Register a cleanup callback invoked when a shutdown signal arrives."""
self._callbacks.append(fn)
def install(self) -> "ShutdownManager":
"""Install signal handlers. Must be called from the main thread."""
for sig in self._sigs:
signal.signal(sig, self._handler)
return self
def uninstall(self) -> None:
"""Restore SIG_DFL for all managed signals."""
for sig in self._sigs:
signal.signal(sig, signal.SIG_DFL)
def _handler(self, signum: int, frame: Any) -> None:
self._signum = signum
for cb in self._callbacks:
try:
cb()
except Exception:
pass
self._event.set()
@property
def should_exit(self) -> bool:
return self._event.is_set()
def wait(self, timeout: float | None = None) -> bool:
"""Block until a shutdown signal is received or timeout expires."""
return self._event.wait(timeout=timeout)
@property
def signal_name(self) -> str:
if self._signum is None:
return "none"
return signal.strsignal(self._signum) or str(self._signum)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Signal-based timeout (Unix only — uses SIGALRM)
# ─────────────────────────────────────────────────────────────────────────────
class SignalTimeout(Exception):
"""Raised when a signal-based timeout expires."""
@contextmanager
def alarm_timeout(seconds: float) -> Generator[None, None, None]:
"""
Context manager that raises SignalTimeout after `seconds`.
Unix-only (uses SIGALRM). Must run in the main thread.
Example:
try:
with alarm_timeout(5.0):
slow_operation()
except SignalTimeout:
print("operation timed out")
"""
if not _IS_UNIX:
yield
return
def _handler(signum: int, frame: Any) -> None:
raise SignalTimeout(f"operation timed out after {seconds}s")
old_handler = signal.signal(signal.SIGALRM, _handler)
signal.setitimer(signal.ITIMER_REAL, seconds)
try:
yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old_handler)
def run_with_alarm_timeout(fn: Callable[[], T], seconds: float, default: T | None = None) -> T | None:
"""
Run fn(); return result or default if SIGALRM fires first.
Unix-only.
Example:
result = run_with_alarm_timeout(slow_fn, 3.0, default=None)
"""
try:
with alarm_timeout(seconds):
return fn()
except SignalTimeout:
return default
# ─────────────────────────────────────────────────────────────────────────────
# 3. Signal relay to worker threads
# ─────────────────────────────────────────────────────────────────────────────
class SignalRelay:
"""
Bridges signal handlers (main-thread-only) to worker threads via a queue.
Workers can consume SignalEvents from the relay's queue.
Example:
relay = SignalRelay([signal.SIGUSR1, signal.SIGHUP])
relay.install()
def worker():
while True:
event = relay.get(timeout=1.0)
if event:
print(f"received {event.name}")
thread = threading.Thread(target=worker, daemon=True)
thread.start()
"""
@dataclass
class SignalEvent:
signum: int
@property
def name(self) -> str:
return signal.strsignal(self.signum) or str(self.signum)
def __init__(self, signals: list[int]) -> None:
self._sigs = signals
self._queue: queue.Queue["SignalRelay.SignalEvent"] = queue.Queue()
def install(self) -> "SignalRelay":
for sig in self._sigs:
signal.signal(sig, self._handler)
return self
def _handler(self, signum: int, frame: Any) -> None:
self._queue.put_nowait(SignalRelay.SignalEvent(signum=signum))
def get(self, timeout: float | None = None) -> "SignalRelay.SignalEvent | None":
try:
return self._queue.get(timeout=timeout)
except queue.Empty:
return None
# ─────────────────────────────────────────────────────────────────────────────
# 4. SIGHUP reload pattern
# ─────────────────────────────────────────────────────────────────────────────
class ReloadManager:
"""
Listens for SIGHUP and invokes a reload callback.
Typical use: reload configuration without restarting the process.
Example:
def reload_config():
global CONFIG
CONFIG = load_config("app.toml")
print("Config reloaded")
rlm = ReloadManager(reload_config)
rlm.install()
"""
def __init__(self, callback: Callable[[], None]) -> None:
self._callback = callback
self._reload_count = 0
def install(self) -> "ReloadManager":
if _IS_UNIX:
signal.signal(signal.SIGHUP, self._handler)
return self
def _handler(self, signum: int, frame: Any) -> None:
self._reload_count += 1
self._callback()
@property
def reload_count(self) -> int:
return self._reload_count
# ─────────────────────────────────────────────────────────────────────────────
# 5. Child process reaping (SIGCHLD)
# ─────────────────────────────────────────────────────────────────────────────
class ChildReaper:
"""
Installs a SIGCHLD handler that reaps zombie children automatically.
Records (pid, exit_status) for each reaped child.
Example:
reaper = ChildReaper()
reaper.install()
pid = os.fork()
if pid == 0:
sys.exit(0) # child exits
time.sleep(0.1)
print(reaper.reaped)
"""
def __init__(self) -> None:
self.reaped: list[tuple[int, int]] = []
self._lock = threading.Lock()
def install(self) -> "ChildReaper":
if _IS_UNIX:
signal.signal(signal.SIGCHLD, self._handler)
return self
def _handler(self, signum: int, frame: Any) -> None:
while True:
try:
pid, status = os.waitpid(-1, os.WNOHANG)
if pid == 0:
break
with self._lock:
self.reaped.append((pid, status))
except ChildProcessError:
break
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== signal demo ===")
# ── ShutdownManager (send SIGUSR1 to self to simulate) ────────────────────
print("\n--- ShutdownManager ---")
sm = ShutdownManager(signals=[signal.SIGUSR1])
cleanup_called = []
sm.on_shutdown(lambda: cleanup_called.append(True))
sm.install()
# Simulate signal from a background thread
def _send_sig():
time.sleep(0.05)
signal.raise_signal(signal.SIGUSR1)
threading.Thread(target=_send_sig, daemon=True).start()
exited = sm.wait(timeout=2.0)
sm.uninstall()
print(f" should_exit={sm.should_exit} signal={sm.signal_name} cleanup_called={cleanup_called}")
# ── alarm_timeout ─────────────────────────────────────────────────────────
if _IS_UNIX:
print("\n--- alarm_timeout (success) ---")
try:
with alarm_timeout(2.0):
time.sleep(0.01)
print(" completed before timeout")
except SignalTimeout:
print(" timed out (unexpected)")
print("\n--- alarm_timeout (timeout fires) ---")
try:
with alarm_timeout(0.05):
time.sleep(5.0) # will be interrupted
except SignalTimeout:
print(" SignalTimeout raised as expected")
else:
print("\n--- alarm_timeout skipped (Windows) ---")
# ── SignalRelay ────────────────────────────────────────────────────────────
if _IS_UNIX:
print("\n--- SignalRelay ---")
relay = SignalRelay([signal.SIGUSR2])
relay.install()
received = []
def _worker():
event = relay.get(timeout=1.0)
if event:
received.append(event.name)
t = threading.Thread(target=_worker, daemon=True)
t.start()
time.sleep(0.01)
signal.raise_signal(signal.SIGUSR2)
t.join(timeout=1.5)
print(f" relay received: {received}")
# ── strsignal / Signals enum ──────────────────────────────────────────────
print("\n--- signal names ---")
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGUSR1]:
print(f" {sig}: {signal.strsignal(sig)}")
print("\n=== done ===")
For the atexit alternative — atexit.register(fn) adds a callback executed when the Python interpreter exits normally (after sys.exit() or when the main module finishes) but NOT on SIGKILL, SIGTERM, or unhandled SIGINT unless caught first — use atexit for cleanup that should run on normal program exit (flushing logs, closing files); combine both: install a signal.signal(SIGTERM, ...) handler that calls sys.exit() so atexit handlers also fire on SIGTERM. For the threading.Event alternative — threading.Event used alongside signal handlers provides a safer inter-thread communication pattern than setting global flags in signal handlers; signal handlers in Python are restricted to the main thread and can only atomically update simple data types (integers, threading.Event.set()) — use threading.Event or queue.Queue (as shown in SignalRelay above) to communicate signal arrivals to worker threads; never call complex functions or I/O directly inside a signal handler. The Claude Skills 360 bundle includes signal skill sets covering ShutdownManager with install()/uninstall()/wait()/signal_name, SignalTimeout exception with alarm_timeout() context manager and run_with_alarm_timeout(), SignalRelay queue-bridged multi-thread signal delivery, ReloadManager SIGHUP config-reload pattern, and ChildReaper SIGCHLD zombie reaper. Start with the free tier to try Unix signal patterns and signal pipeline code generation.