watchdog monitors file system events in Python. pip install watchdog. Observer: from watchdog.observers import Observer. Handler: from watchdog.events import FileSystemEventHandler. handler = FileSystemEventHandler(). observer = Observer(); observer.schedule(handler, path=".", recursive=True); observer.start(). Subclass: class MyHandler(FileSystemEventHandler): def on_modified(self, event): print(event.src_path). Events: on_created, on_modified, on_deleted, on_moved. Event attrs: event.src_path, event.dest_path (moved), event.is_directory. Pattern: from watchdog.events import PatternMatchingEventHandler. PatternMatchingEventHandler(patterns=["*.py","*.json"], ignore_patterns=["*.pyc"], ignore_directories=True, case_sensitive=False). Regex: from watchdog.events import RegexMatchingEventHandler. RegexMatchingEventHandler(regexes=[r".*\.py$"], ignore_regexes=[r".*__pycache__.*"]). Observer lifecycle: observer.start(). observer.stop(); observer.join(). Multiple watches: observer.schedule(h1, "src/", recursive=True). observer.schedule(h2, "config/", recursive=False). observer.unschedule(watch). observer.unschedule_all(). Non-blocking: Observer runs in a daemon thread — main thread stays free. Stop: observer.is_alive(). Debounce: use threading.Timer or time.time() delta to collapse rapid events. inotify backend: Linux uses inotify by default — efficient. Polling: from watchdog.observers.polling import PollingObserver — for networked/NFS paths. PollingObserver(timeout=1). Claude Code generates watchdog handlers, debounced file watchers, and auto-reload pipelines.
CLAUDE.md for watchdog
## watchdog Stack
- Version: watchdog >= 4.0 | pip install watchdog
- Observer: observer = Observer(); observer.schedule(handler, path, recursive=True)
- Handler: class H(FileSystemEventHandler): on_created/on_modified/on_deleted/on_moved
- Pattern: PatternMatchingEventHandler(patterns=["*.py"], ignore_directories=True)
- Debounce: collapse rapid on_modified events with threading.Timer(delay, callback)
- Threading: Observer is a daemon thread — call observer.stop(); observer.join() to exit
- Polling: PollingObserver for network mounts where inotify is unavailable
watchdog File System Monitoring Pipeline
# app/file_watcher.py — watchdog event handlers, debouncing, and auto-reload
from __future__ import annotations
import logging
import threading
import time
from collections import defaultdict
from pathlib import Path
from queue import Empty, Queue
from typing import Callable
from watchdog.events import (
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
FileSystemEvent,
FileSystemEventHandler,
PatternMatchingEventHandler,
RegexMatchingEventHandler,
)
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic handler — log all events
# ─────────────────────────────────────────────────────────────────────────────
class LoggingHandler(FileSystemEventHandler):
"""Logs every file system event with its type and path."""
def on_created(self, event: FileSystemEvent) -> None:
if not event.is_directory:
log.info("file_created", extra={"path": event.src_path})
print(f"[CREATED] {event.src_path}")
def on_modified(self, event: FileSystemEvent) -> None:
if not event.is_directory:
log.info("file_modified", extra={"path": event.src_path})
print(f"[MODIFIED] {event.src_path}")
def on_deleted(self, event: FileSystemEvent) -> None:
if not event.is_directory:
log.info("file_deleted", extra={"path": event.src_path})
print(f"[DELETED] {event.src_path}")
def on_moved(self, event: FileMovedEvent) -> None:
if not event.is_directory:
log.info("file_moved", extra={
"src": event.src_path,
"dest": event.dest_path,
})
print(f"[MOVED] {event.src_path} → {event.dest_path}")
# ─────────────────────────────────────────────────────────────────────────────
# 2. Pattern-filtered handler — only Python/config files
# ─────────────────────────────────────────────────────────────────────────────
class SourceChangeHandler(PatternMatchingEventHandler):
"""
Only triggers on .py, .toml, .json, .yaml files.
Ignores __pycache__, .git internals, and compiled artefacts.
"""
def __init__(self, on_change: Callable[[str], None]) -> None:
super().__init__(
patterns=["*.py", "*.toml", "*.json", "*.yaml", "*.yml", "*.env"],
ignore_patterns=["*/__pycache__/*", "*/.git/*", "*.pyc", "*.pyo"],
ignore_directories=True,
case_sensitive=False,
)
self._on_change = on_change
def on_modified(self, event: FileSystemEvent) -> None:
self._on_change(event.src_path)
def on_created(self, event: FileSystemEvent) -> None:
self._on_change(event.src_path)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Debounced handler — collapse rapid on_modified bursts
# ─────────────────────────────────────────────────────────────────────────────
class DebouncedHandler(FileSystemEventHandler):
"""
Many editors write files in multiple steps (save → temp → rename).
Without debouncing, a single save fires 3–5 on_modified events.
This handler waits `delay` seconds after the last event before firing.
"""
def __init__(
self,
callback: Callable[[str], None],
delay: float = 0.5,
patterns: list[str] | None = None,
) -> None:
super().__init__()
self._callback = callback
self._delay = delay
self._patterns = set(patterns or [])
self._timers: dict[str, threading.Timer] = {}
self._lock = threading.Lock()
def _matches(self, path: str) -> bool:
if not self._patterns:
return True
p = Path(path)
return any(p.match(pat) for pat in self._patterns)
def _schedule(self, path: str) -> None:
if not self._matches(path):
return
with self._lock:
if path in self._timers:
self._timers[path].cancel()
timer = threading.Timer(self._delay, self._fire, args=(path,))
self._timers[path] = timer
timer.start()
def _fire(self, path: str) -> None:
with self._lock:
self._timers.pop(path, None)
log.debug("debounced_event", extra={"path": path})
self._callback(path)
def on_modified(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self._schedule(event.src_path)
def on_created(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self._schedule(event.src_path)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Queue-based handler — decouple event detection from processing
# ─────────────────────────────────────────────────────────────────────────────
class QueueHandler(FileSystemEventHandler):
"""
Push events into a queue so the main thread (or worker pool) can
process them without blocking the Observer thread.
"""
def __init__(self) -> None:
super().__init__()
self.queue: Queue[FileSystemEvent] = Queue()
def on_any_event(self, event: FileSystemEvent) -> None:
if not event.is_directory:
self.queue.put(event)
def queue_consumer(
queue: Queue[FileSystemEvent],
stop_event: threading.Event,
processor: Callable[[FileSystemEvent], None],
) -> None:
"""Drain queue in a separate thread, calling processor for each event."""
while not stop_event.is_set():
try:
event = queue.get(timeout=0.1)
try:
processor(event)
except Exception as exc:
log.exception("event_processing_failed", extra={"path": event.src_path})
finally:
queue.task_done()
except Empty:
continue
# ─────────────────────────────────────────────────────────────────────────────
# 5. File ingestion pipeline — watch drop folder, process new files
# ─────────────────────────────────────────────────────────────────────────────
class IngestionHandler(PatternMatchingEventHandler):
"""
Watch a drop folder for new CSV/JSON files, process them as they arrive.
ignore_directories=True — only triggers on file events.
"""
def __init__(
self,
output_dir: Path,
processed: set[str] | None = None,
) -> None:
super().__init__(
patterns=["*.csv", "*.json", "*.jsonl"],
ignore_patterns=["*.tmp", "*.part"],
ignore_directories=True,
case_sensitive=False,
)
self._output_dir = output_dir
self._processed = processed or set()
def on_created(self, event: FileCreatedEvent) -> None:
path = Path(event.src_path)
if str(path) in self._processed:
return
self._processed.add(str(path))
self._ingest(path)
def _ingest(self, path: Path) -> None:
log.info("ingesting_file", extra={"path": str(path)})
print(f"[INGEST] {path.name} ({path.stat().st_size} bytes)")
# In production: parse CSV/JSON, validate, write to DB
output = self._output_dir / f"done_{path.name}"
output.touch()
# ─────────────────────────────────────────────────────────────────────────────
# 6. Observer factory — local vs network paths
# ─────────────────────────────────────────────────────────────────────────────
def make_observer(polling: bool = False) -> Observer:
"""
Use PollingObserver for network mounts (NFS, SMB, Docker volumes)
where inotify/kqueue events are not delivered to the client.
"""
if polling:
return PollingObserver(timeout=1)
return Observer()
# ─────────────────────────────────────────────────────────────────────────────
# 7. Context manager — clean observer lifecycle
# ─────────────────────────────────────────────────────────────────────────────
class WatcherContext:
"""
Context manager that starts an Observer on entry and stops it on exit.
Usage:
with WatcherContext("src/", handler) as _:
while True:
time.sleep(1)
"""
def __init__(
self,
path: str | Path,
handler: FileSystemEventHandler,
recursive: bool = True,
polling: bool = False,
) -> None:
self._path = str(path)
self._handler = handler
self._recursive = recursive
self._observer = make_observer(polling)
def __enter__(self) -> "WatcherContext":
self._observer.schedule(self._handler, self._path, recursive=self._recursive)
self._observer.start()
log.info("watcher_started", extra={"path": self._path})
return self
def __exit__(self, *_) -> None:
self._observer.stop()
self._observer.join()
log.info("watcher_stopped", extra={"path": self._path})
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
(root / "src").mkdir()
(root / "drop").mkdir()
(root / "done").mkdir()
changed: list[str] = []
debounced = DebouncedHandler(
callback=lambda p: changed.append(p),
delay=0.1,
patterns=["*.py"],
)
with WatcherContext(root / "src", debounced, recursive=True):
# Simulate rapid saves (editor burst)
for _ in range(5):
(root / "src" / "app.py").write_text(f"# {time.time()}")
time.sleep(0.02)
time.sleep(0.3) # wait for debounce timer
print(f"Debounce fired {len(changed)} time(s) for 5 rapid writes: {changed}")
# Ingestion
ingester = IngestionHandler(output_dir=root / "done")
with WatcherContext(root / "drop", ingester, recursive=False):
for i in range(3):
(root / "drop" / f"data_{i}.csv").write_text("a,b\n1,2\n")
time.sleep(0.05)
time.sleep(0.2) # let events land
print("Ingestion demo complete.")
For the inotifywait shell alternative — polling with inotifywait -m or fswatch requires spawning a subprocess, parsing unstructured text output, and re-implementing filtering logic in shell, while watchdog’s PatternMatchingEventHandler(patterns=["*.py"]) filters in Python, receives typed event objects (FileCreatedEvent, FileMovedEvent) with src_path and dest_path attributes, and runs cross-platform on Linux (inotify), macOS (kqueue), and Windows (ReadDirectoryChangesW) from a single code path. For the polling with os.listdir alternative — a polling loop that calls os.listdir() and compares snapshot diffs consumes CPU continuously and introduces latency proportional to the poll interval, while watchdog’s Observer uses the OS’s native async notification API so the thread wakes only when an event actually occurs — at idle, a watchdog Observer consumes near-zero CPU. The Claude Skills 360 bundle includes watchdog skill sets covering FileSystemEventHandler subclassing, PatternMatchingEventHandler with patterns/ignore_patterns, RegexMatchingEventHandler, Observer schedule and unschedule, debounced handler with threading.Timer to collapse editor burst saves, Queue-based handler for decoupled processing, file ingestion pipeline for drop-folder automation, PollingObserver for network mounts, WatcherContext manager for clean lifecycle, and multi-directory watch management. Start with the free tier to try file system monitoring code generation.