Python’s atexit module registers functions to call automatically when the interpreter exits normally. import atexit. Register: atexit.register(fn, *args, **kwargs) — stores (fn, args, kwargs); returns fn so it can be used as a decorator. Unregister: atexit.unregister(fn) — removes all registrations of fn (silent if not registered). Execution order: LIFO (last in, first out) — the most recently registered handler runs first, mirroring the order resources were acquired. Triggers: handlers run on normal interpreter exit (sys.exit(), end of __main__), and on exceptions that propagate to the top level; they do NOT run on os._exit(), os.abort(), SIGKILL, or when the process is killed by an uncaught signal that terminates without Python cleanup. Exception handling: if a handler raises, the traceback is printed to sys.stderr and the remaining handlers continue running. Decorator usage: @atexit.register works because register returns its first argument. Clear all: atexit._clear() — private but useful in test teardown. Manual run: atexit._run_exitfuncs() — calls all handlers immediately; each is removed after running. Claude Code generates cleanup registrars, resource finalizers, log flushers, lock releasers, and temporary file removers.
CLAUDE.md for atexit
## atexit Stack
- Stdlib: import atexit
- Register: atexit.register(fn, arg1, kwarg=val) # returns fn
- Decorator: @atexit.register # zero-arg function
- Unregister: atexit.unregister(fn) # all copies removed
- Order: LIFO — last registered runs first
- Note: does NOT fire on os._exit() or SIGKILL
- exceptions in handlers are printed, remaining handlers still run
atexit Shutdown Handler Pipeline
# app/atexitutil.py — cleanup registry, temp files, log flush, resource guard
from __future__ import annotations
import atexit
import logging
import os
import shutil
import sys
import tempfile
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
# ─────────────────────────────────────────────────────────────────────────────
# 1. Named cleanup registry
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class CleanupEntry:
name: str
fn: "callable"
registered_at: float = field(default_factory=time.time)
ran: bool = False
error: str | None = None
class CleanupRegistry:
"""
Named cleanup registry backed by atexit.
Tracks which handlers ran, which failed, and time of registration.
Example:
registry = CleanupRegistry()
registry.register("close-db", db.close)
registry.register("flush-cache", lambda: cache.flush())
# handlers run automatically at exit in LIFO order
"""
def __init__(self) -> None:
self._entries: dict[str, CleanupEntry] = {}
atexit.register(self._run_all)
def register(self, name: str, fn: "callable", *args: object, **kwargs: object) -> None:
"""
Register a named cleanup function.
If name already exists, the old registration is replaced.
Example:
registry.register("close-db", conn.close)
registry.register("flush-log", flush, level="INFO")
"""
def _wrapped() -> None:
fn(*args, **kwargs)
self._entries[name] = CleanupEntry(name=name, fn=_wrapped)
def unregister(self, name: str) -> bool:
"""Remove a named cleanup. Returns True if it existed."""
return self._entries.pop(name, None) is not None
def _run_all(self) -> None:
# LIFO: reverse insertion order
for name in list(reversed(list(self._entries))):
entry = self._entries[name]
try:
entry.fn()
entry.ran = True
except Exception as exc:
entry.ran = True
entry.error = str(exc)
print(f"[atexit] cleanup '{name}' failed: {exc}", file=sys.stderr)
def report(self) -> list[dict]:
return [
{
"name": e.name,
"ran": e.ran,
"error": e.error,
"registered_at": e.registered_at,
}
for e in self._entries.values()
]
# ─────────────────────────────────────────────────────────────────────────────
# 2. Temporary path manager
# ─────────────────────────────────────────────────────────────────────────────
class TempManager:
"""
Create temporary files and directories that are automatically removed at exit.
Example:
tm = TempManager()
path = tm.mkdtemp(prefix="work_")
# ... use path ...
# removed at interpreter exit
"""
def __init__(self) -> None:
self._paths: list[Path] = []
atexit.register(self._cleanup)
def mkdtemp(self, **kwargs: object) -> Path:
"""Create a temporary directory and track it for deletion."""
p = Path(tempfile.mkdtemp(**kwargs))
self._paths.append(p)
return p
def mkstemp(self, **kwargs: object) -> tuple[int, Path]:
"""Create a temporary file and track it for deletion. Returns (fd, path)."""
fd, name = tempfile.mkstemp(**kwargs)
p = Path(name)
self._paths.append(p)
return fd, p
def track(self, path: "str | Path") -> Path:
"""Track an existing path for deletion at exit."""
p = Path(path)
self._paths.append(p)
return p
def _cleanup(self) -> None:
for p in reversed(self._paths):
try:
if p.is_dir():
shutil.rmtree(p, ignore_errors=True)
elif p.exists():
p.unlink(missing_ok=True)
except Exception:
pass
# ─────────────────────────────────────────────────────────────────────────────
# 3. Logging flush + close on exit
# ─────────────────────────────────────────────────────────────────────────────
def register_logging_shutdown() -> None:
"""
Register logging.shutdown() as an atexit handler.
Ensures all log handlers flush and close cleanly before the interpreter exits.
Safe to call multiple times (atexit deduplicates on the same function object).
Example:
register_logging_shutdown()
logging.basicConfig(filename="app.log", level=logging.DEBUG)
"""
atexit.register(logging.shutdown)
def register_file_logger(
path: "str | Path",
name: str = "app",
level: int = logging.DEBUG,
) -> logging.Logger:
"""
Create a file logger and register its handler for flush+close at exit.
Example:
logger = register_file_logger("/tmp/app.log")
logger.info("started")
# file handler flushed + closed at exit automatically
"""
logger = logging.getLogger(name)
handler = logging.FileHandler(str(path))
handler.setLevel(level)
logger.addHandler(handler)
logger.setLevel(level)
def _close_handler() -> None:
handler.flush()
handler.close()
atexit.register(_close_handler)
return logger
# ─────────────────────────────────────────────────────────────────────────────
# 4. Resource guard (generic acquire / release)
# ─────────────────────────────────────────────────────────────────────────────
class ResourceGuard:
"""
Acquire a lockable resource and register its release with atexit.
Useful for process-level locks (e.g., pid files, advisory file locks).
Example:
guard = ResourceGuard("my-lock", acquire=lock.acquire, release=lock.release)
guard.acquire()
# released at exit even if the program crashes mid-run
"""
def __init__(
self,
name: str,
acquire: "callable",
release: "callable",
) -> None:
self.name = name
self._acquire = acquire
self._release = release
self._held = False
def acquire(self) -> bool:
"""Acquire the resource and register release for atexit."""
result = self._acquire()
self._held = True
atexit.register(self._release_once)
return result
def release(self) -> None:
"""Release immediately and unregister the atexit handler."""
self._release_once()
atexit.unregister(self._release_once)
def _release_once(self) -> None:
if self._held:
self._held = False
try:
self._release()
except Exception as exc:
print(f"[atexit] ResourceGuard '{self.name}' release failed: {exc}",
file=sys.stderr)
# ─────────────────────────────────────────────────────────────────────────────
# 5. PID file helper
# ─────────────────────────────────────────────────────────────────────────────
def write_pidfile(path: "str | Path") -> Path:
"""
Write the current PID to path and register removal at exit.
Raises FileExistsError if the pid file already exists.
Example:
pidfile = write_pidfile("/var/run/myapp.pid")
"""
p = Path(path)
if p.exists():
existing_pid = p.read_text().strip()
raise FileExistsError(
f"PID file {p} already exists (pid {existing_pid}); "
"another instance may be running"
)
p.write_text(str(os.getpid()))
def _remove() -> None:
try:
p.unlink(missing_ok=True)
except Exception:
pass
atexit.register(_remove)
return p
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== atexit demo ===")
# ── @atexit.register decorator ────────────────────────────────────────────
print("\n--- @atexit.register decorator ---")
order: list[str] = []
@atexit.register
def _last_handler() -> None:
order.append("last_handler")
atexit.register(lambda: order.append("first_handler"))
# LIFO: first_handler runs before last_handler
# (shown via manual _run_exitfuncs in demo only)
import atexit as _ax
_ax._run_exitfuncs()
print(f" LIFO order: {order}")
# ── CleanupRegistry ───────────────────────────────────────────────────────
print("\n--- CleanupRegistry ---")
registry = CleanupRegistry()
log_items: list[str] = []
registry.register("step-1", lambda: log_items.append("step-1 cleanup"))
registry.register("step-2", lambda: log_items.append("step-2 cleanup"))
registry.register("step-3", lambda: log_items.append("step-3 cleanup"))
registry._run_all() # simulate exit
print(f" cleanup order (LIFO): {log_items}")
for r in registry.report():
print(f" {r['name']}: ran={r['ran']} error={r['error']}")
# ── TempManager ───────────────────────────────────────────────────────────
print("\n--- TempManager ---")
tm = TempManager()
tmpdir = tm.mkdtemp(prefix="atexit_demo_")
(tmpdir / "test.txt").write_text("hello")
print(f" tmpdir={tmpdir.name} exists={tmpdir.exists()}")
tm._cleanup() # simulate exit
print(f" after cleanup: exists={tmpdir.exists()}")
# ── write_pidfile ─────────────────────────────────────────────────────────
print("\n--- write_pidfile ---")
with tempfile.TemporaryDirectory() as td:
pidpath = write_pidfile(Path(td) / "demo.pid")
print(f" pid written: {pidpath.read_text()} (current: {os.getpid()})")
atexit._run_exitfuncs()
print(f" pid file removed: {not pidpath.exists()}")
# ── ResourceGuard ─────────────────────────────────────────────────────────
print("\n--- ResourceGuard ---")
lock = threading.Lock()
guard = ResourceGuard("demo-lock", lock.acquire, lock.release)
guard.acquire()
print(f" lock held: {not lock.acquire(blocking=False)}")
guard.release()
print(f" lock free: {lock.acquire(blocking=False)}")
lock.release()
print("\n=== done ===")
For the contextlib.ExitStack alternative — contextlib.ExitStack() used as a context manager registers cleanup callbacks via .callback(fn) or .enter_context(cm) and runs them in LIFO order when the with block exits — use ExitStack when cleanup is scoped to a function or block and you want the cleanup to be explicit and visible in the code; use atexit for process-level cleanup that must run regardless of which code path exits, especially when the cleanup cannot be expressed as a context manager (e.g., cleaning up resources acquired in a background thread or across module boundaries). For the signal module alternative — signal.signal(signal.SIGTERM, handler) and signal.signal(signal.SIGINT, handler) register handlers for OS signals that terminate the process — use signal handlers when you need to respond to SIGTERM (graceful shutdown) or SIGINT (Ctrl+C) since atexit handlers do not run on unhandled signals; combine both: register atexit for normal exit cleanup and a SIGTERM handler that calls sys.exit(0) to trigger the atexit handlers on signal termination. The Claude Skills 360 bundle includes atexit skill sets covering CleanupRegistry with named LIFO handlers and report(), TempManager with mkdtemp()/mkstemp()/track() auto-remove on exit, register_logging_shutdown()/register_file_logger() log flush helpers, ResourceGuard with acquire()/release()/_release_once() for process-level locks, and write_pidfile() PID file with auto-remove. Start with the free tier to try shutdown handler patterns and atexit pipeline code generation.