filelock provides OS-level file locking for cross-process mutual exclusion. pip install filelock. Basic: from filelock import FileLock; lock = FileLock("file.lock"); with lock: write_data(). Timeout: FileLock("file.lock", timeout=5) — raises Timeout after 5 seconds. Non-blocking: FileLock("file.lock", timeout=0) — raises immediately if locked. Status: lock.is_locked → True/False. Acquire: lock.acquire() / lock.release(). Context: with lock.acquire(timeout=10):. Thread-safe: FileLock is also thread-safe within a process. Soft lock: from filelock import SoftFileLock — creates a .lock file but advisory only (no OS-level lock). Good for NFS/network filesystems where hard flock() fails. Lock path: FileLock("/tmp/myapp.lock") — use /tmp for system-wide locks. Per-file: FileLock(f"{data_file}.lock") — pair a lock file with each data file. Stale: if a process crashes, FileLock auto-cleans because hard locks are released by OS on process exit. SoftFileLock leaves a stale file; clean with lock_path.unlink(missing_ok=True). Multiprocessing: works across multiprocessing.Pool workers. Filelock raises filelock.Timeout on timeout. from filelock import Timeout. Claude Code generates filelock write guards, multiprocessing safe writers, and atomic update patterns.
CLAUDE.md for filelock
## filelock Stack
- Version: filelock >= 3.12 | pip install filelock
- Hard lock: FileLock("path.lock") — OS flock/LockFile, released on process exit
- Soft lock: SoftFileLock("path.lock") — advisory .lock file for NFS/network FS
- Timeout: FileLock("path.lock", timeout=10) — raises filelock.Timeout
- Non-blocking: timeout=0 — raises Timeout immediately if already locked
- Status: lock.is_locked | acquire()/release() for manual control
- Pattern: FileLock(f"{data_path}.lock") — one .lock file per data file
filelock File Locking Pipeline
# app/file_locking.py — filelock cross-process write coordination and atomic updates
from __future__ import annotations
import json
import os
import tempfile
from pathlib import Path
from typing import Any, Callable
from filelock import FileLock, SoftFileLock, Timeout
# ─────────────────────────────────────────────────────────────────────────────
# 1. Core lock helpers
# ─────────────────────────────────────────────────────────────────────────────
def locked_write(
path: str | Path,
data: str | bytes,
mode: str = "w",
lock_timeout: float = 30.0,
) -> None:
"""
Write to a file with an exclusive lock.
Multiple processes calling this concurrently will be serialized.
lock_timeout: seconds to wait for the lock; raises Timeout if exceeded.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
with FileLock(str(lock_path), timeout=lock_timeout):
with open(path, mode) as fh:
fh.write(data)
def locked_read(
path: str | Path,
mode: str = "r",
lock_timeout: float = 30.0,
) -> str | bytes:
"""
Read a file with a shared-style exclusive lock.
Prevents reading while a write is in progress.
Note: FileLock is exclusive (not shared/RW) — all access is serialized.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
with FileLock(str(lock_path), timeout=lock_timeout):
with open(path, mode) as fh:
return fh.read()
def try_locked_write(
path: str | Path,
data: str | bytes,
mode: str = "w",
) -> bool:
"""
Attempt a non-blocking locked write.
Returns True on success, False if the lock is already held.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
try:
with FileLock(str(lock_path), timeout=0):
with open(path, mode) as fh:
fh.write(data)
return True
except Timeout:
return False
# ─────────────────────────────────────────────────────────────────────────────
# 2. Atomic file update
# ─────────────────────────────────────────────────────────────────────────────
def atomic_write(
path: str | Path,
data: str | bytes,
mode: str = "w",
lock_timeout: float = 30.0,
) -> None:
"""
Atomically replace a file's contents:
1. Write to a temp file in the same directory
2. Acquire the lock
3. os.replace() — atomic on POSIX and Windows
Guarantees no partial writes are ever visible to readers.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
# Write to a temp file first (no lock needed)
suffix = ".tmp" + str(os.getpid())
with tempfile.NamedTemporaryFile(
mode=mode,
dir=path.parent,
prefix=path.stem + "_",
suffix=suffix,
delete=False,
) as tmp:
tmp.write(data)
tmp_path = tmp.name
# Then lock and atomically replace
try:
with FileLock(str(lock_path), timeout=lock_timeout):
os.replace(tmp_path, path)
except Exception:
Path(tmp_path).unlink(missing_ok=True)
raise
def atomic_json_update(
path: str | Path,
update_fn: Callable[[dict[str, Any]], dict[str, Any]],
lock_timeout: float = 30.0,
) -> dict[str, Any]:
"""
Read a JSON file, apply `update_fn`, and write back atomically.
Safe for concurrent processes all modifying the same JSON store.
Returns the new dict after update.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
with FileLock(str(lock_path), timeout=lock_timeout):
# Read current state
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
else:
data = {}
# Apply update
new_data = update_fn(data)
# Write back atomically
suffix = ".tmp" + str(os.getpid())
tmp_path = path.with_suffix(suffix)
tmp_path.write_text(json.dumps(new_data, indent=2), encoding="utf-8")
os.replace(tmp_path, path)
return new_data
# ─────────────────────────────────────────────────────────────────────────────
# 3. Singleton / one-at-a-time execution guard
# ─────────────────────────────────────────────────────────────────────────────
class SingletonGuard:
"""
Prevent multiple instances of an application or task from running.
Uses a PID-file with a FileLock to ensure only one process holds the guard.
"""
def __init__(self, name: str, lock_dir: str = "/tmp") -> None:
self._lock_dir = Path(lock_dir)
self._name = name
self._lock_path = self._lock_dir / f"{name}.singleton.lock"
self._lock = FileLock(str(self._lock_path), timeout=0)
self._acquired = False
def acquire(self) -> bool:
"""
Try to acquire the singleton guard.
Returns True if this process is now the sole instance.
Returns False if another process already holds it.
"""
try:
self._lock.acquire()
self._acquired = True
return True
except Timeout:
return False
def release(self) -> None:
if self._acquired:
self._lock.release()
self._acquired = False
def __enter__(self) -> "SingletonGuard":
if not self.acquire():
raise RuntimeError(
f"Another instance of '{self._name}' is already running."
)
return self
def __exit__(self, *args) -> None:
self.release()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Per-key resource locks (lock striping)
# ─────────────────────────────────────────────────────────────────────────────
class KeyedLockRegistry:
"""
Maintain one FileLock per key (e.g. per user_id or per filename).
Allows concurrent writes to *different* keys while serializing writes
to the *same* key.
"""
def __init__(self, lock_dir: str = "/tmp", timeout: float = 30.0) -> None:
self._dir = Path(lock_dir)
self._timeout = timeout
self._locks: dict[str, FileLock] = {}
def get_lock(self, key: str) -> FileLock:
"""Return the FileLock for `key`, creating it if necessary."""
if key not in self._locks:
safe_key = "".join(c if c.isalnum() or c in "-_." else "_" for c in key)
lock_path = self._dir / f"keyed_{safe_key}.lock"
self._locks[key] = FileLock(str(lock_path), timeout=self._timeout)
return self._locks[key]
def lock(self, key: str):
"""Context manager: acquire the lock for this key."""
return self.get_lock(key)
# ─────────────────────────────────────────────────────────────────────────────
# 5. NFS / network filesystem — SoftFileLock
# ─────────────────────────────────────────────────────────────────────────────
def nfs_safe_write(
path: str | Path,
data: str,
lock_timeout: float = 60.0,
stale_after: float = 300.0,
) -> None:
"""
Write to a file on a network/NFS filesystem using SoftFileLock.
SoftFileLock creates a .lock marker file rather than using OS flock(),
which may fail on some NFS mounts.
stale_after: remove an existing lock file older than this many seconds.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
# Clean up stale soft lock if process that created it is gone
if lock_path.exists():
age = os.path.getmtime(lock_path)
import time
if time.time() - age > stale_after:
lock_path.unlink(missing_ok=True)
with SoftFileLock(str(lock_path), timeout=lock_timeout):
with open(path, "w", encoding="utf-8") as fh:
fh.write(data)
# ─────────────────────────────────────────────────────────────────────────────
# 6. Safe CSV/JSONL append
# ─────────────────────────────────────────────────────────────────────────────
def locked_append(
path: str | Path,
line: str,
lock_timeout: float = 30.0,
) -> None:
"""
Append a line to a file with locking.
Safe for multiple processes appending concurrently to the same log file.
"""
path = Path(path)
lock_path = path.with_suffix(path.suffix + ".lock")
with FileLock(str(lock_path), timeout=lock_timeout):
with open(path, "a", encoding="utf-8") as fh:
if not line.endswith("\n"):
line += "\n"
fh.write(line)
def locked_append_json(
path: str | Path,
record: dict[str, Any],
lock_timeout: float = 30.0,
) -> None:
"""Append a JSON record as a line to a JSONL file with locking."""
locked_append(path, json.dumps(record, ensure_ascii=False), lock_timeout=lock_timeout)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import multiprocessing
import time
with tempfile.TemporaryDirectory() as tmpdir:
counter_path = Path(tmpdir) / "counter.txt"
counter_path.write_text("0")
def increment(n: int) -> None:
for _ in range(n):
lock_path = counter_path.with_suffix(".txt.lock")
with FileLock(str(lock_path)):
val = int(counter_path.read_text())
time.sleep(0.001)
counter_path.write_text(str(val + 1))
print("=== Concurrent counter (4 processes × 5 increments) ===")
with multiprocessing.Pool(4) as pool:
pool.map(increment, [5] * 4)
final = int(counter_path.read_text())
print(f" Expected: 20 | Got: {final} | Correct: {final == 20}")
print("\n=== atomic_json_update ===")
json_path = Path(tmpdir) / "scores.json"
def add_score(data: dict) -> dict:
data["alice"] = data.get("alice", 0) + 10
return data
for _ in range(3):
result = atomic_json_update(json_path, add_score)
print(f" After 3 updates: {result}")
print("\n=== SingletonGuard ===")
with SingletonGuard("demo", lock_dir=tmpdir) as guard:
already_running = SingletonGuard("demo", lock_dir=tmpdir).acquire()
print(f" Second acquire while first held: {already_running}")
second_try = SingletonGuard("demo", lock_dir=tmpdir).acquire()
print(f" Acquire after first released: {second_try}")
print("\n=== locked_append_json (JSONL log) ===")
log_path = Path(tmpdir) / "events.jsonl"
for i in range(3):
locked_append_json(log_path, {"event": "click", "i": i})
lines = log_path.read_text().strip().split("\n")
print(f" {len(lines)} lines written")
for line in lines:
print(f" {json.loads(line)}")
For the threading.Lock alternative — threading.Lock protects shared state across threads within a single process, but offers no protection when two separate Python processes write to the same file concurrently; FileLock uses OS-level flock/LockFile so it works across processes, Docker containers, and multiprocessing Pool workers — use threading.Lock inside a process and FileLock across processes. For the fcntl.flock / msvcrt.locking alternative — fcntl is POSIX-only and msvcrt is Windows-only; writing cross-platform file locking code by hand requires platform detection and separate implementations; filelock wraps both OS mechanisms behind a single FileLock class that works on Linux, macOS, and Windows — the right abstraction for library code that must run everywhere. The Claude Skills 360 bundle includes filelock skill sets covering FileLock context manager, SoftFileLock for NFS/network filesystems, timeout and non-blocking timeout=0, locked_write() and locked_read() convenience helpers, try_locked_write() non-blocking write attempt, atomic_write() temp-file swap pattern, atomic_json_update() read-modify-write for JSON, SingletonGuard for one-instance enforcement, KeyedLockRegistry for per-key striped locking, nfs_safe_write() with stale lock cleanup, locked_append() and locked_append_json() for concurrent log files. Start with the free tier to try file locking code generation.