Python’s dummy_threading module (removed in Python 3.9) is a stub that mirrors the threading API but executes everything synchronously in the calling thread. It exists for Python interpreters compiled without thread support (--without-threads). import dummy_threading. All classes — Thread, Lock, RLock, Condition, Semaphore, BoundedSemaphore, Event, Timer, Barrier — have the same constructor signatures and method names as threading but: Lock.acquire() always returns immediately; Thread.start() calls target() inline; Event.wait() returns without blocking; Condition.wait() is a no-op. Idiom: try: import threading; except ImportError: import dummy_threading as threading — this pattern makes a library work on both threaded and threadless CPython builds. current_thread() returns a _DummyThread object; active_count() returns 1; enumerate() returns a single-item list. In Python 3.9+, CPython always compiles with thread support and dummy_threading was removed. Claude Code generates platform-portable libraries, embedded extension modules, and threadless CPython adapters.
CLAUDE.md for dummy_threading
## dummy_threading Stack
- Stdlib: import dummy_threading (removed Python 3.9)
- Compat: try:
- import threading
- except ImportError:
- import dummy_threading as threading
- API: Same as threading: Thread, Lock, RLock, Condition, Semaphore, Event
- Behavior: All operations are synchronous / no-ops
- Thread.start() → runs target() inline
- Lock.acquire() → always succeeds immediately
- Event.wait() → returns True immediately
- Use: Portable code for threadless CPython builds (<= 3.8)
dummy_threading Portability Pipeline
# app/dummythreadutil.py — compat import, portable primitives, thread-optional library
from __future__ import annotations
import time
from typing import Any, Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1. Portable threading import
# ─────────────────────────────────────────────────────────────────────────────
try:
import threading as _threading
_HAS_REAL_THREADS = True
except ImportError:
try:
import dummy_threading as _threading # type: ignore
_HAS_REAL_THREADS = False
except ImportError:
# Python 3.9+ removed dummy_threading; real threading always available
import threading as _threading
_HAS_REAL_THREADS = True
def has_real_threads() -> bool:
"""
Return True if the interpreter was compiled with thread support.
Example:
if has_real_threads():
print("multi-threaded execution available")
else:
print("single-threaded fallback active")
"""
return _HAS_REAL_THREADS
# ─────────────────────────────────────────────────────────────────────────────
# 2. Thread-optional worker runner
# ─────────────────────────────────────────────────────────────────────────────
def run_workers(tasks: list[Callable[[], Any]],
threaded: bool = True) -> list[Any]:
"""
Run a list of callables, optionally in separate threads.
On threadless platforms all tasks run sequentially in the caller.
Example:
import time
results = run_workers([
lambda: (time.sleep(0.1), "a")[1],
lambda: (time.sleep(0.1), "b")[1],
])
print(results)
"""
if not threaded or not has_real_threads():
return [t() for t in tasks]
results: list[Any] = [None] * len(tasks)
errors: list[BaseException | None] = [None] * len(tasks)
lock = _threading.Lock()
def _wrapper(idx: int, fn: Callable[[], Any]) -> None:
try:
val = fn()
with lock:
results[idx] = val
except BaseException as e:
with lock:
errors[idx] = e
threads = [
_threading.Thread(target=_wrapper, args=(i, fn), daemon=True)
for i, fn in enumerate(tasks)
]
for t in threads:
t.start()
for t in threads:
t.join()
# Re-raise first error if any
for e in errors:
if e is not None:
raise e
return results
# ─────────────────────────────────────────────────────────────────────────────
# 3. Portable thread-safe cache
# ─────────────────────────────────────────────────────────────────────────────
class ThreadSafeCache:
"""
A simple dict cache protected by a Lock (real or dummy).
Works identically in threaded and threadless environments;
on threadless platforms the Lock is a no-op.
Example:
cache = ThreadSafeCache()
cache.set("key", 42)
print(cache.get("key")) # 42
print(cache.get("missing")) # None
"""
def __init__(self, maxsize: int = 256) -> None:
self._data: dict[str, Any] = {}
self._lock = _threading.Lock()
self._maxsize = maxsize
def get(self, key: str, default: Any = None) -> Any:
with self._lock:
return self._data.get(key, default)
def set(self, key: str, value: Any) -> None:
with self._lock:
if len(self._data) >= self._maxsize:
# Evict oldest key (Python 3.7+ insertion order)
oldest = next(iter(self._data))
del self._data[oldest]
self._data[key] = value
def delete(self, key: str) -> bool:
with self._lock:
if key in self._data:
del self._data[key]
return True
return False
def clear(self) -> None:
with self._lock:
self._data.clear()
def size(self) -> int:
with self._lock:
return len(self._data)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Portable periodic background task
# ─────────────────────────────────────────────────────────────────────────────
class PeriodicTask:
"""
Schedule a callable to run every interval_s seconds.
On threaded platforms it runs in a daemon thread.
On threadless platforms it can only be triggered manually via tick().
Example:
task = PeriodicTask(lambda: print("tick"), interval_s=1.0)
task.start()
time.sleep(3.5)
task.stop()
"""
def __init__(self, func: Callable[[], Any], interval_s: float) -> None:
self._func = func
self._interval = interval_s
self._stop_event = _threading.Event()
self._thread: Any = None
def start(self) -> None:
self._stop_event.clear()
if has_real_threads():
self._thread = _threading.Thread(
target=self._run, daemon=True)
self._thread.start()
def _run(self) -> None:
while not self._stop_event.wait(timeout=self._interval):
try:
self._func()
except Exception as e:
print(f"[PeriodicTask] error: {e!r}")
def stop(self) -> None:
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=self._interval + 0.5)
def tick(self) -> None:
"""Manually trigger one execution (useful in threadless mode)."""
self._func()
# ─────────────────────────────────────────────────────────────────────────────
# 5. Thread environment inspector
# ─────────────────────────────────────────────────────────────────────────────
def thread_env_info() -> dict[str, Any]:
"""
Return a summary of the current thread environment.
Example:
info = thread_env_info()
print(info)
"""
try:
current = _threading.current_thread()
active = _threading.active_count()
all_threads = _threading.enumerate()
main = _threading.main_thread()
return {
"has_real_threads": has_real_threads(),
"current_thread": current.name,
"current_ident": getattr(current, "ident", None),
"active_count": active,
"threads": [t.name for t in all_threads],
"main_thread": main.name,
"is_main": current is main,
}
except Exception as e:
return {"error": str(e)}
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== dummy_threading demo ===")
print(f" has_real_threads: {has_real_threads()}")
# ── thread env ────────────────────────────────────────────────────────────
print("\n--- thread_env_info ---")
info = thread_env_info()
for k, v in info.items():
print(f" {k:20s}: {v!r}")
# ── run_workers ───────────────────────────────────────────────────────────
print("\n--- run_workers (3 tasks) ---")
import time as _time
start = _time.monotonic()
results = run_workers([
lambda: (_time.sleep(0.1), "task-A")[1],
lambda: (_time.sleep(0.1), "task-B")[1],
lambda: (_time.sleep(0.1), "task-C")[1],
])
elapsed = _time.monotonic() - start
print(f" results : {results}")
parallel = elapsed < 0.2 # would be ~0.1s if parallel
print(f" elapsed : {elapsed:.3f}s "
f"({'parallel' if parallel else 'sequential'})")
# ── ThreadSafeCache ───────────────────────────────────────────────────────
print("\n--- ThreadSafeCache ---")
cache = ThreadSafeCache(maxsize=4)
for k, v in [("a", 1), ("b", 2), ("c", 3)]:
cache.set(k, v)
print(f" get('a') = {cache.get('a')}")
print(f" get('miss') = {cache.get('miss', 'N/A')}")
print(f" size = {cache.size()}")
# ── PeriodicTask ──────────────────────────────────────────────────────────
print("\n--- PeriodicTask (0.1s interval, 2 ticks) ---")
tick_count = [0]
def _tick() -> None:
tick_count[0] += 1
task = PeriodicTask(_tick, interval_s=0.1)
if has_real_threads():
task.start()
_time.sleep(0.35)
task.stop()
print(f" ticks (threaded): {tick_count[0]}")
else:
task.tick()
task.tick()
print(f" ticks (manual): {tick_count[0]}")
print("\n=== done ===")
For the threading stdlib replacement — Python 3.9+ always compiles with thread support and dummy_threading was removed; simply import threading and use Thread, Lock, Event, Condition — use threading for all new code. For the concurrent.futures stdlib alternative — ThreadPoolExecutor and ProcessPoolExecutor provide higher-level worker pools with Future objects, timeouts, and automatic exception propagation — use concurrent.futures when you have a batch of independent tasks to distribute across threads or processes; use threading for long-lived threads with complex inter-thread communication. The Claude Skills 360 bundle includes dummy_threading skill sets covering has_real_threads() capability check, run_workers() thread-optional task executor, ThreadSafeCache portable dict cache, PeriodicTask optional-background periodic runner, and thread_env_info() environment inspector. Start with the free tier to try portable threading patterns and dummy_threading pipeline code generation.