Python’s _thread module is the low-level threading interface that threading is built on. import _thread. Spawn: _thread.start_new_thread(func, args, kwargs={}) — starts a new OS thread running func(*args, **kwargs); no join, no return value. Primitive lock: lock = _thread.allocate_lock() — lock.acquire(), lock.acquire(blocking=True, timeout=-1), lock.release(), lock.locked(). Thread ID: _thread.get_ident() — integer, unique per live thread. Exit current thread: _thread.exit() — raises SystemExit. Interrupt main: _thread.interrupt_main() — raises KeyboardInterrupt in main thread. Stack size: _thread.stack_size(size=0) — get/set in bytes. The module is intentionally minimal: no join, no daemon flag, no thread-local storage, no exception propagation. threading adds all of these. _thread is useful when you need the absolute minimum overhead or are implementing custom synchronisation primitives. Claude Code generates spinlocks, multi-producer queues, background I/O workers, lightweight daemon threads, signal routing helpers, and thread-local storage patterns.
CLAUDE.md for _thread
## _thread Stack
- Stdlib: import _thread
- Spawn: _thread.start_new_thread(func, args) # no join
- Lock: lock = _thread.allocate_lock()
- lock.acquire() / lock.release() / lock.locked()
- lock.acquire(blocking=True, timeout=-1)
- ID: _thread.get_ident()
- Exit: _thread.exit() # SystemExit in current thread
- Intr: _thread.interrupt_main() # KeyboardInterrupt in main
- Stack: _thread.stack_size(65536)
- Note: Use threading for join/daemon/exception handling
_thread Low-Level Threading Pipeline
# app/threadutil.py — spawn, lock, barrier, event, pool, watchdog, interrupt
from __future__ import annotations
import _thread
import time
import os
from collections import deque
from typing import Callable, Any
# ─────────────────────────────────────────────────────────────────────────────
# 1. Minimal spawn helper with error capture
# ─────────────────────────────────────────────────────────────────────────────
_lock_print = _thread.allocate_lock()
def _safe_print(*args: Any) -> None:
"""Thread-safe print."""
with _lock_print:
print(*args)
class ThreadError:
"""Mutable container to capture exceptions from worker threads."""
__slots__ = ("exc", "_lock")
def __init__(self) -> None:
self.exc: BaseException | None = None
self._lock = _thread.allocate_lock()
def set(self, exc: BaseException) -> None:
with self._lock:
if self.exc is None:
self.exc = exc
def get(self) -> BaseException | None:
with self._lock:
return self.exc
def spawn(func: Callable[..., Any], *args: Any,
errors: ThreadError | None = None,
**kwargs: Any) -> None:
"""
Start a new thread running func(*args, **kwargs).
If errors is provided, exceptions are captured there rather than crashing.
Example:
def worker(n):
time.sleep(0.1)
print(f"done {n}")
spawn(worker, 1)
spawn(worker, 2)
time.sleep(0.3)
"""
def _wrapper():
try:
func(*args, **kwargs)
except SystemExit:
pass
except BaseException as e:
if errors is not None:
errors.set(e)
else:
_safe_print(f"[_thread] unhandled: {e!r}")
_thread.start_new_thread(_wrapper, ())
# ─────────────────────────────────────────────────────────────────────────────
# 2. Counting semaphore built from primitive locks
# ─────────────────────────────────────────────────────────────────────────────
class Semaphore:
"""
A counting semaphore built from _thread.allocate_lock().
Example:
sem = Semaphore(3) # allow 3 concurrent
def task(i):
sem.acquire()
try:
time.sleep(0.1)
finally:
sem.release()
for i in range(10):
spawn(task, i)
"""
def __init__(self, count: int = 1) -> None:
self._count = count
self._lock = _thread.allocate_lock()
self._wait = _thread.allocate_lock()
self._wait.acquire() # start locked
def acquire(self) -> None:
while True:
with self._lock:
if self._count > 0:
self._count -= 1
return
time.sleep(0.001) # brief yield
def release(self) -> None:
with self._lock:
self._count += 1
def __enter__(self) -> "Semaphore":
self.acquire()
return self
def __exit__(self, *_: Any) -> None:
self.release()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Manual event flag
# ─────────────────────────────────────────────────────────────────────────────
class Event:
"""
Manual-reset event flag built from primitive locks.
Example:
ready = Event()
def producer():
time.sleep(0.2)
ready.set()
spawn(producer)
ready.wait()
print("producer signalled")
"""
def __init__(self) -> None:
self._flag = False
self._lock = _thread.allocate_lock()
def set(self) -> None:
with self._lock:
self._flag = True
def clear(self) -> None:
with self._lock:
self._flag = False
def is_set(self) -> bool:
with self._lock:
return self._flag
def wait(self, timeout: float | None = None) -> bool:
deadline = None if timeout is None else time.monotonic() + timeout
while True:
if self.is_set():
return True
if deadline is not None and time.monotonic() >= deadline:
return False
time.sleep(0.002)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Thread-safe bounded queue
# ─────────────────────────────────────────────────────────────────────────────
class BoundedQueue:
"""
A thread-safe FIFO queue with a maximum size, built on _thread locks.
Example:
q = BoundedQueue(maxsize=5)
spawn(lambda: q.put("hello"))
print(q.get())
"""
def __init__(self, maxsize: int = 0) -> None:
self._buf: deque = deque()
self._lock = _thread.allocate_lock()
self._maxsize = maxsize
def put(self, item: Any, timeout: float = 5.0) -> bool:
deadline = time.monotonic() + timeout
while True:
with self._lock:
if self._maxsize <= 0 or len(self._buf) < self._maxsize:
self._buf.append(item)
return True
if time.monotonic() >= deadline:
return False
time.sleep(0.002)
def get(self, timeout: float = 5.0) -> Any:
deadline = time.monotonic() + timeout
while True:
with self._lock:
if self._buf:
return self._buf.popleft()
if time.monotonic() >= deadline:
raise TimeoutError("get() timed out")
time.sleep(0.002)
def qsize(self) -> int:
with self._lock:
return len(self._buf)
def empty(self) -> bool:
return self.qsize() == 0
# ─────────────────────────────────────────────────────────────────────────────
# 5. interrupt_main watchdog
# ─────────────────────────────────────────────────────────────────────────────
def watchdog(timeout_s: float) -> None:
"""
Start a background thread that calls _thread.interrupt_main() after
timeout_s seconds. Useful as a hard-kill fallback for stuck processes.
Example:
watchdog(10.0) # raise KeyboardInterrupt in main after 10s if not cancelled
"""
def _watcher():
time.sleep(timeout_s)
_thread.interrupt_main()
_thread.start_new_thread(_watcher, ())
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== _thread demo ===")
print(f" _thread.get_ident() = {_thread.get_ident()}")
# ── spawn + error capture ─────────────────────────────────────────────────
print("\n--- spawn ---")
done = Event()
results: list[int] = []
lock_results = _thread.allocate_lock()
def worker(n: int) -> None:
time.sleep(0.05 * n)
with lock_results:
results.append(n)
if len(results) >= 3:
done.set()
for i in [3, 1, 2]:
spawn(worker, i)
done.wait(timeout=2.0)
print(f" completion order: {results}")
# ── Semaphore ─────────────────────────────────────────────────────────────
print("\n--- Semaphore(2) ---")
sem = Semaphore(2)
active_count = [0]
max_active = [0]
ac_lock = _thread.allocate_lock()
sem_done = Event()
sem_results: list[int] = []
def sem_worker(i: int) -> None:
with sem:
with ac_lock:
active_count[0] += 1
max_active[0] = max(max_active[0], active_count[0])
time.sleep(0.05)
with ac_lock:
active_count[0] -= 1
sem_results.append(i)
if len(sem_results) >= 5:
sem_done.set()
for i in range(5):
spawn(sem_worker, i)
sem_done.wait(timeout=2.0)
print(f" max concurrent: {max_active[0]} (limit=2) ok={max_active[0] <= 2}")
# ── BoundedQueue ──────────────────────────────────────────────────────────
print("\n--- BoundedQueue ---")
q: BoundedQueue = BoundedQueue(maxsize=3)
for item in ["a", "b", "c"]:
q.put(item)
print(f" qsize={q.qsize()}")
print(f" items: {q.get()} {q.get()} {q.get()}")
print(f" empty: {q.empty()}")
print("\n=== done ===")
For the threading stdlib replacement — threading.Thread(target=func, args=args, daemon=True) adds .join(), .is_alive(), .daemon flag, a name, and exception propagation via sys.excepthook — use threading for all production multi-threaded code; _thread lacks join, daemon management, and thread-local storage. For the concurrent.futures.ThreadPoolExecutor alternative — ThreadPoolExecutor(max_workers=8).submit(func, *args) returns a Future with .result(), automatic exception re-raising, and integrated done callbacks — use ThreadPoolExecutor when you need managed worker pools, task submission, futures, and clean shutdown; use _thread only when you need the absolute minimum abstraction or are studying Python internals. The Claude Skills 360 bundle includes _thread skill sets covering ThreadError exception container, spawn() safe spawner, Semaphore counting semaphore, Event manual-reset event flag, BoundedQueue thread-safe FIFO, and watchdog() interrupt-main timeout guard. Start with the free tier to try low-level threading patterns and _thread pipeline code generation.