portalocker wraps OS file locking (flock on POSIX, LockFile on Windows) with a clean Python API. pip install portalocker. Exclusive: import portalocker; portalocker.lock(fh, portalocker.LOCK_EX) — blocks until exclusive lock acquired. Shared: portalocker.lock(fh, portalocker.LOCK_SH) — multiple readers, no writers. Non-blocking: portalocker.LOCK_EX | portalocker.LOCK_NB — raises LockException immediately if locked. Unlock: portalocker.unlock(fh). Context manager: with portalocker.Lock("file.txt", "r") as fh:. Write: with portalocker.Lock("file.txt", "w", timeout=5) as fh: fh.write(data). Timeout: portalocker.Lock("file.txt", timeout=10, check_interval=0.1). RLock: portalocker.RLock("file.txt") — reentrant, same process can acquire multiple times. Semaphore: portalocker.BoundedSemaphore(2, "sem.txt") — allow N concurrent processes. Exception: portalocker.LockException (for LOCK_NB fail). portalocker.AlreadyLocked. Truncate: portalocker.Lock("file.txt", mode="w", truncate=True). flags=0 — no flags, use portalocker.LOCK_EX by default. PID file: lock a .pid file for singleton process guard. portalocker.lock(fh, portalocker.LOCK_EX | portalocker.LOCK_NB) in startup. Claude Code generates portalocker read-write guards, semaphore pools, and PID file patterns.
CLAUDE.md for portalocker
## portalocker Stack
- Version: portalocker >= 2.8 | pip install portalocker
- Exclusive: portalocker.lock(fh, portalocker.LOCK_EX) | LOCK_SH for shared read
- Non-blocking: LOCK_EX | LOCK_NB — raises LockException immediately
- Class: portalocker.Lock("file.txt", "a+", timeout=10) context manager
- RLock: portalocker.RLock — reentrant lock, same process can acquire multiple times
- Semaphore: portalocker.BoundedSemaphore(n, "sem.lock") — n concurrent processes
- Exceptions: portalocker.LockException | portalocker.AlreadyLocked
portalocker Cross-Platform File Locking Pipeline
# app/port_locking.py — portalocker exclusive/shared locks, semaphore, and PID files
from __future__ import annotations
import json
import os
import tempfile
from pathlib import Path
from typing import Any
import portalocker
# ─────────────────────────────────────────────────────────────────────────────
# 1. Low-level lock/unlock wrappers
# ─────────────────────────────────────────────────────────────────────────────
def exclusive_lock(fh, blocking: bool = True) -> None:
"""
Acquire an exclusive (write) lock on an open file handle.
blocking=False: raises portalocker.LockException immediately if locked.
"""
flags = portalocker.LOCK_EX
if not blocking:
flags |= portalocker.LOCK_NB
portalocker.lock(fh, flags)
def shared_lock(fh, blocking: bool = True) -> None:
"""
Acquire a shared (read) lock.
Multiple processes can hold shared locks simultaneously.
Any exclusive lock request will block until all shared locks are released.
"""
flags = portalocker.LOCK_SH
if not blocking:
flags |= portalocker.LOCK_NB
portalocker.lock(fh, flags)
def release_lock(fh) -> None:
"""Release any lock held on the file handle."""
portalocker.unlock(fh)
# ─────────────────────────────────────────────────────────────────────────────
# 2. High-level Lock class wrappers
# ─────────────────────────────────────────────────────────────────────────────
def locked_write(
path: str | Path,
data: str,
timeout: float = 30.0,
encoding: str = "utf-8",
) -> None:
"""
Write text to a file with an exclusive lock.
Safe for concurrent writes from multiple processes.
"""
lock = portalocker.Lock(
str(path),
mode="w",
timeout=timeout,
encoding=encoding,
flags=portalocker.LOCK_EX,
)
with lock as fh:
fh.write(data)
def locked_append(
path: str | Path,
data: str,
timeout: float = 30.0,
encoding: str = "utf-8",
) -> None:
"""
Append text to a file with an exclusive lock.
Multiple processes appending concurrently are serialized.
"""
lock = portalocker.Lock(
str(path),
mode="a",
timeout=timeout,
encoding=encoding,
flags=portalocker.LOCK_EX,
)
with lock as fh:
if not data.endswith("\n"):
data += "\n"
fh.write(data)
def locked_read(
path: str | Path,
timeout: float = 30.0,
encoding: str = "utf-8",
) -> str:
"""
Read text from a file with a shared lock.
Blocks writers while multiple readers operate concurrently.
"""
lock = portalocker.Lock(
str(path),
mode="r",
timeout=timeout,
encoding=encoding,
flags=portalocker.LOCK_SH,
)
with lock as fh:
return fh.read()
def try_locked_write(
path: str | Path,
data: str,
encoding: str = "utf-8",
) -> bool:
"""
Non-blocking exclusive write.
Returns True on success, False if file is already locked.
"""
try:
lock = portalocker.Lock(
str(path),
mode="w",
timeout=0,
encoding=encoding,
flags=portalocker.LOCK_EX | portalocker.LOCK_NB,
)
with lock as fh:
fh.write(data)
return True
except portalocker.LockException:
return False
# ─────────────────────────────────────────────────────────────────────────────
# 3. Read-modify-write (atomic JSON update)
# ─────────────────────────────────────────────────────────────────────────────
def json_update(
path: str | Path,
update_fn,
timeout: float = 30.0,
) -> dict[str, Any]:
"""
Atomically read, update, and write back a JSON file.
Uses exclusive lock + truncate to replace content safely.
"""
path = Path(path)
# Open for reading AND writing (r+ keeps existing content, a+ appends)
# We open with "a+" to create if not exists, then seek to beginning to read
with portalocker.Lock(
str(path),
mode="a+",
timeout=timeout,
encoding="utf-8",
flags=portalocker.LOCK_EX,
) as fh:
fh.seek(0)
content = fh.read()
data = json.loads(content) if content.strip() else {}
updated = update_fn(data)
fh.seek(0)
fh.truncate()
json.dump(updated, fh, indent=2, ensure_ascii=False)
return updated
# ─────────────────────────────────────────────────────────────────────────────
# 4. PID file / singleton guard
# ─────────────────────────────────────────────────────────────────────────────
class PidGuard:
"""
Prevent duplicate process instances using a PID file + exclusive lock.
The lock is held open for the lifetime of the guarded process.
When the process exits (even on crash), the OS releases the flock.
"""
def __init__(self, name: str, pid_dir: str = "/tmp") -> None:
self._pid_path = Path(pid_dir) / f"{name}.pid"
self._fh = None
def acquire(self) -> bool:
"""
Acquire the singleton guard.
Returns True if this process is now sole owner; False otherwise.
"""
try:
self._fh = open(self._pid_path, "a+")
portalocker.lock(self._fh, portalocker.LOCK_EX | portalocker.LOCK_NB)
except portalocker.LockException:
if self._fh:
self._fh.close()
self._fh = None
return False
# Write PID for diagnostics
self._fh.seek(0)
self._fh.truncate()
self._fh.write(str(os.getpid()))
self._fh.flush()
return True
def release(self) -> None:
if self._fh:
portalocker.unlock(self._fh)
self._fh.close()
self._fh = None
def __enter__(self) -> "PidGuard":
if not self.acquire():
raise RuntimeError(
f"Another instance is already running (see {self._pid_path})"
)
return self
def __exit__(self, *args) -> None:
self.release()
# ─────────────────────────────────────────────────────────────────────────────
# 5. BoundedSemaphore — limit N concurrent processes
# ─────────────────────────────────────────────────────────────────────────────
def bounded_task(name: str, sem_path: str, max_workers: int, timeout: float = 60.0):
"""
Context manager that limits concurrent executions via BoundedSemaphore.
At most `max_workers` processes can be inside the `with` block at once.
"""
return portalocker.BoundedSemaphore(
maximum=max_workers,
filename=sem_path,
timeout=timeout,
)
# ─────────────────────────────────────────────────────────────────────────────
# 6. RLock — reentrant lock
# ─────────────────────────────────────────────────────────────────────────────
def get_rlock(path: str | Path, timeout: float = 30.0) -> portalocker.RLock:
"""
Return a reentrant file lock.
The same process can acquire the lock multiple times without deadlocking.
Inner acquisitions are no-ops; the lock is released only on the outermost release.
"""
return portalocker.RLock(str(path), timeout=timeout)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import multiprocessing
import time
with tempfile.TemporaryDirectory() as tmpdir:
counter_path = Path(tmpdir) / "counter.txt"
counter_path.write_text("0")
def safe_increment(n: int) -> None:
for _ in range(n):
with portalocker.Lock(
str(counter_path),
mode="a+",
timeout=10,
flags=portalocker.LOCK_EX,
) as fh:
fh.seek(0)
val = int(fh.read() or "0")
fh.seek(0)
fh.truncate()
fh.write(str(val + 1))
time.sleep(0.001)
print("=== Concurrent counter (4 processes × 5 increments) ===")
with multiprocessing.Pool(4) as pool:
pool.map(safe_increment, [5] * 4)
print(f" Expected: 20 | Got: {counter_path.read_text()} | "
f"Correct: {counter_path.read_text() == '20'}")
print("\n=== json_update ===")
json_path = Path(tmpdir) / "store.json"
def add_entry(data: dict) -> dict:
data["count"] = data.get("count", 0) + 1
return data
for _ in range(4):
result = json_update(json_path, add_entry)
print(f" After 4 updates: {result}")
print("\n=== BoundedSemaphore (max 2 at once) ===")
sem_path = str(Path(tmpdir) / "sem.lock")
arrivals = multiprocessing.Manager().list()
def sem_task(i: int) -> None:
with bounded_task(f"worker-{i}", sem_path, max_workers=2, timeout=30):
arrivals.append(os.getpid())
time.sleep(0.05)
with multiprocessing.Pool(4) as pool:
pool.map(sem_task, range(4))
print(f" {len(arrivals)} tasks completed through semaphore")
print("\n=== PidGuard (singleton) ===")
guard = PidGuard("test", pid_dir=tmpdir)
acquired = guard.acquire()
second = PidGuard("test", pid_dir=tmpdir).acquire()
print(f" First acquire: {acquired} | Second (should be False): {second}")
guard.release()
third = PidGuard("test", pid_dir=tmpdir).acquire()
print(f" After release (should be True): {third}")
PidGuard("test", pid_dir=tmpdir).release()
For the filelock alternative — filelock and portalocker both provide cross-platform file locking; filelock is lighter (one dependency, simple API) while portalocker adds LOCK_SH shared reads, RLock for reentrant locking, and BoundedSemaphore for N-concurrency patterns; choose filelock for simple exclusive-lock use cases and portalocker when you need shared read locks or the semaphore primitive. For the fcntl.flock alternative — fcntl.flock is POSIX-only and raises NotImplementedError on Windows; portalocker abstracts this behind a platform-independent API using LockFile on Windows and flock + lockf on POSIX, so the same portalocker.lock(fh, LOCK_EX) call works on both. The Claude Skills 360 bundle includes portalocker skill sets covering portalocker.lock() with LOCK_EX/LOCK_SH/LOCK_NB flags, portalocker.Lock context manager with timeout, locked_write() and locked_append() helpers, locked_read() with shared lock, try_locked_write() non-blocking attempt, json_update() read-modify-write pattern, PidGuard singleton process guard, bounded_task() BoundedSemaphore context manager, and portalocker.RLock reentrant locking. Start with the free tier to try cross-platform file locking code generation.