Python’s fcntl module manipulates file descriptor properties and implements advisory file locking on Unix/macOS. import fcntl. fcntl: fcntl.fcntl(fd, cmd, arg=0) → int or bytes; commands: F_GETFL / F_SETFL (file status flags), F_GETFD / F_SETFD (fd flags, notably FD_CLOEXEC), F_DUPFD (dup to ≥ fd). ioctl: fcntl.ioctl(fd, request, arg=0) — device control. flock: fcntl.flock(fd, operation) — BSD-style whole-file advisory locks; operations: LOCK_SH (shared), LOCK_EX (exclusive), LOCK_UN (unlock), LOCK_NB (non-blocking, combine with |). lockf: fcntl.lockf(fd, cmd, len=0, start=0, whence=0) — POSIX byte-range locking; cmd: LOCK_SH, LOCK_EX, LOCK_UN. Non-blocking: fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK). Close-on-exec: fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC). flock advisory: lock held until fd closed or explicitly unlocked; does not prevent other processes from reading or writing if they ignore the lock. Windows: not available — use msvcrt.locking() or portalocker (PyPI). Claude Code generates PID file guards, single-instance daemons, non-blocking pipe readers, and concurrent file access managers.
CLAUDE.md for fcntl
## fcntl Stack
- Stdlib: import fcntl, os
- NonBlocking: flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- CloseOnExec: fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
- ExLock: fcntl.flock(fd, fcntl.LOCK_EX) # block until locked
- TryLock: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # raises BlockingIOError
- Unlock: fcntl.flock(fd, fcntl.LOCK_UN)
fcntl File Control Pipeline
# app/fcntlutil.py — non-blocking, close-on-exec, flock, lockf, pid file, single-instance
from __future__ import annotations
import contextlib
import errno
import os
import platform
import struct
import sys
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Generator
_FCNTL_AVAILABLE = platform.system() != "Windows"
if _FCNTL_AVAILABLE:
import fcntl
# ─────────────────────────────────────────────────────────────────────────────
# 1. File descriptor flag helpers
# ─────────────────────────────────────────────────────────────────────────────
def set_nonblocking(fd: int) -> None:
"""
Set a file descriptor to non-blocking mode.
After this, read/write will raise BlockingIOError instead of blocking.
Example:
set_nonblocking(pipe_read_fd)
try:
data = os.read(pipe_read_fd, 1024)
except BlockingIOError:
data = b"" # no data available yet
"""
if not _FCNTL_AVAILABLE:
raise OSError("fcntl not available on Windows")
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def set_blocking(fd: int) -> None:
"""Restore a file descriptor to blocking mode."""
if not _FCNTL_AVAILABLE:
raise OSError("fcntl not available on Windows")
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
def is_nonblocking(fd: int) -> bool:
"""Return True if the file descriptor is in non-blocking mode."""
if not _FCNTL_AVAILABLE:
return False
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
return bool(flags & os.O_NONBLOCK)
def set_cloexec(fd: int) -> None:
"""
Set the close-on-exec flag so the fd is closed in child processes.
Prevents leaking open file descriptors across exec() calls.
Example:
set_cloexec(socket.fileno()) # don't inherit socket in child
"""
if not _FCNTL_AVAILABLE:
raise OSError("fcntl not available on Windows")
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def clear_cloexec(fd: int) -> None:
"""Clear the close-on-exec flag so child processes inherit the fd."""
if not _FCNTL_AVAILABLE:
raise OSError("fcntl not available on Windows")
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags & ~fcntl.FD_CLOEXEC)
def is_cloexec(fd: int) -> bool:
"""Return True if the close-on-exec flag is set."""
if not _FCNTL_AVAILABLE:
return False
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
return bool(flags & fcntl.FD_CLOEXEC)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Advisory file locking (flock)
# ─────────────────────────────────────────────────────────────────────────────
@contextmanager
def exclusive_lock(
file_obj,
blocking: bool = True,
) -> Generator[None, None, None]:
"""
Acquire an exclusive (write) advisory lock on a file.
blocking=False raises BlockingIOError if lock can't be acquired immediately.
Example:
with open("data.json", "r+") as f, exclusive_lock(f):
data = json.load(f)
data["count"] += 1
f.seek(0); json.dump(data, f); f.truncate()
"""
if not _FCNTL_AVAILABLE:
yield # no-op on Windows
return
op = fcntl.LOCK_EX | (0 if blocking else fcntl.LOCK_NB)
fcntl.flock(file_obj, op)
try:
yield
finally:
fcntl.flock(file_obj, fcntl.LOCK_UN)
@contextmanager
def shared_lock(
file_obj,
blocking: bool = True,
) -> Generator[None, None, None]:
"""
Acquire a shared (read) advisory lock on a file.
Multiple readers can hold shared locks simultaneously.
Example:
with open("config.json") as f, shared_lock(f):
data = json.load(f)
"""
if not _FCNTL_AVAILABLE:
yield
return
op = fcntl.LOCK_SH | (0 if blocking else fcntl.LOCK_NB)
fcntl.flock(file_obj, op)
try:
yield
finally:
fcntl.flock(file_obj, fcntl.LOCK_UN)
def try_lock(file_obj) -> bool:
"""
Try to acquire an exclusive lock without blocking.
Returns True if locked, False if already locked by another process.
Example:
with open("resource.lock", "w") as f:
if not try_lock(f):
print("Another process holds the lock")
"""
if not _FCNTL_AVAILABLE:
return True
try:
fcntl.flock(file_obj, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except BlockingIOError:
return False
# ─────────────────────────────────────────────────────────────────────────────
# 3. PID file / single-instance guard
# ─────────────────────────────────────────────────────────────────────────────
class AlreadyRunningError(Exception):
"""Raised when another instance of the process is already running."""
pass
class PidFile:
"""
Single-instance guard using a PID file with exclusive lock.
Holding the lock proves this process owns the PID file.
Example:
with PidFile("/var/run/myapp.pid") as pf:
print(f"Running with PID {pf.pid}")
main()
"""
def __init__(self, path: str | Path) -> None:
self._path = Path(path)
self._file = None
self.pid = os.getpid()
def __enter__(self) -> "PidFile":
self._path.parent.mkdir(parents=True, exist_ok=True)
self._file = open(self._path, "w")
if not try_lock(self._file):
self._file.close()
raise AlreadyRunningError(
f"Another instance is running (lock held on {self._path})"
)
self._file.write(str(self.pid) + "\n")
self._file.flush()
return self
def __exit__(self, *args) -> None:
if self._file:
fcntl.flock(self._file, fcntl.LOCK_UN)
self._file.close()
try:
self._path.unlink(missing_ok=True)
except OSError:
pass
@staticmethod
def read_pid(path: str | Path) -> int | None:
"""Read the PID from an existing PID file, or None if absent/invalid."""
try:
text = Path(path).read_text().strip()
return int(text)
except (OSError, ValueError):
return None
# ─────────────────────────────────────────────────────────────────────────────
# 4. Non-blocking pipe / socket reader
# ─────────────────────────────────────────────────────────────────────────────
def drain_nonblocking(fd: int, chunk: int = 4096) -> bytes:
"""
Read all available bytes from a non-blocking fd without blocking.
Returns empty bytes if no data is ready.
Example:
set_nonblocking(pipe_fd)
while True:
data = drain_nonblocking(pipe_fd)
if data: process(data)
"""
buf: list[bytes] = []
while True:
try:
chunk_data = os.read(fd, chunk)
if not chunk_data:
break
buf.append(chunk_data)
except BlockingIOError:
break
except OSError as e:
if e.errno == errno.EAGAIN:
break
raise
return b"".join(buf)
@contextmanager
def nonblocking_fd(fd: int) -> Generator[int, None, None]:
"""
Context manager that temporarily makes fd non-blocking.
Example:
with nonblocking_fd(pipe_read) as fd:
data = drain_nonblocking(fd)
"""
if not _FCNTL_AVAILABLE:
yield fd
return
was_blocking = not is_nonblocking(fd)
set_nonblocking(fd)
try:
yield fd
finally:
if was_blocking:
set_blocking(fd)
# ─────────────────────────────────────────────────────────────────────────────
# 5. FD info inspector
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class FdInfo:
fd: int
is_open: bool
is_nonblock: bool
is_cloexec: bool
flags_hex: str
def __str__(self) -> str:
flags: list[str] = []
if self.is_nonblock: flags.append("nonblock")
if self.is_cloexec: flags.append("cloexec")
return (f"fd={self.fd} open={self.is_open} "
f"flags={self.flags_hex} [{', '.join(flags) or 'default'}]")
def fd_info(fd: int) -> FdInfo:
"""
Return information about an open file descriptor.
Example:
print(fd_info(sys.stdout.fileno()))
"""
if not _FCNTL_AVAILABLE:
return FdInfo(fd=fd, is_open=True, is_nonblock=False, is_cloexec=False, flags_hex="0x0")
try:
fl_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fd_flags = fcntl.fcntl(fd, fcntl.F_GETFD)
return FdInfo(
fd=fd,
is_open=True,
is_nonblock=bool(fl_flags & os.O_NONBLOCK),
is_cloexec=bool(fd_flags & fcntl.FD_CLOEXEC),
flags_hex=f"{fl_flags:#010x}",
)
except OSError:
return FdInfo(fd=fd, is_open=False, is_nonblock=False, is_cloexec=False, flags_hex="0x0")
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import json, tempfile
print("=== fcntl demo ===")
if not _FCNTL_AVAILABLE:
print(" fcntl not available on Windows — skipping")
raise SystemExit(0)
# ── fd_info: examine stdin/stdout/stderr ───────────────────────────────────
print("\n--- fd_info ---")
for fd, name in [(0, "stdin"), (1, "stdout"), (2, "stderr")]:
print(f" {name}: {fd_info(fd)}")
# ── non-blocking pipe ──────────────────────────────────────────────────────
print("\n--- non-blocking pipe ---")
read_fd, write_fd = os.pipe()
try:
# Write some data
os.write(write_fd, b"hello fcntl")
# Read non-blocking
with nonblocking_fd(read_fd) as fd:
data = drain_nonblocking(fd)
print(f" drained: {data!r}")
# Verify nothing left
with nonblocking_fd(read_fd) as fd:
empty = drain_nonblocking(fd)
print(f" second drain (empty): {empty!r}")
finally:
os.close(read_fd)
os.close(write_fd)
# ── set_cloexec ────────────────────────────────────────────────────────────
print("\n--- cloexec ---")
r, w = os.pipe()
try:
print(f" pipe read fd before: cloexec={is_cloexec(r)}")
set_cloexec(r)
print(f" pipe read fd after: cloexec={is_cloexec(r)}")
finally:
os.close(r); os.close(w)
# ── exclusive_lock ─────────────────────────────────────────────────────────
print("\n--- exclusive_lock ---")
with tempfile.TemporaryDirectory() as tmpdir:
data_file = os.path.join(tmpdir, "counter.json")
# Initialize
with open(data_file, "w") as f:
json.dump({"count": 0}, f)
# Update with exclusive lock
with open(data_file, "r+") as f, exclusive_lock(f):
data = json.load(f)
data["count"] += 1
f.seek(0)
json.dump(data, f)
f.truncate()
with open(data_file) as f:
result = json.load(f)
print(f" counter after locked update: {result['count']}")
# try_lock demo
with open(data_file, "w") as f:
locked = try_lock(f)
print(f" try_lock (no competition): {locked}")
if locked:
fcntl.flock(f, fcntl.LOCK_UN)
# ── PidFile ────────────────────────────────────────────────────────────────
print("\n--- PidFile ---")
with tempfile.TemporaryDirectory() as tmpdir:
pid_path = os.path.join(tmpdir, "app.pid")
try:
with PidFile(pid_path) as pf:
print(f" acquired PID file: pid={pf.pid}")
read_pid = PidFile.read_pid(pid_path)
print(f" PidFile.read_pid: {read_pid}")
# Attempt to acquire a second instance
try:
with PidFile(pid_path):
print(" ERROR: second instance should not succeed")
except AlreadyRunningError as e:
print(f" second instance blocked: {type(e).__name__}")
except AlreadyRunningError:
print(" first instance already running?")
print("\n=== done ===")
For the portalocker alternative — portalocker (PyPI) provides the same advisory locking API (portalocker.lock(), portalocker.unlock()) with cross-platform support including Windows via msvcrt.locking() — use portalocker when you need file locking on Windows or want a single API that works everywhere; use fcntl.flock() or fcntl.lockf() on Unix when you want zero dependencies and are confident you’re only targeting Linux/macOS. For the multiprocessing.Lock / threading.Lock alternative — threading.Lock and multiprocessing.Lock provide in-process and inter-process synchronization through kernel semaphores or pipes, but do not prevent a separate unrelated process (e.g. a daemon script restarted by cron) from accessing a shared file — use fcntl.flock() as the shared file guard between independently launched Unix processes; use threading/multiprocessing locks for coordinating concurrent access within a single program. The Claude Skills 360 bundle includes fcntl skill sets covering set_nonblocking()/set_blocking()/is_nonblocking()/set_cloexec()/clear_cloexec()/is_cloexec() fd flag helpers, exclusive_lock()/shared_lock()/try_lock() advisory lock context managers, AlreadyRunningError/PidFile single-instance daemon guard, drain_nonblocking()/nonblocking_fd() non-blocking pipe reader, and FdInfo with fd_info() inspector. Start with the free tier to try file locking patterns and fcntl pipeline code generation.