Python threading runs multiple threads sharing one GIL — ideal for I/O-bound work (network, disk, database). import threading. Thread: t = threading.Thread(target=fn, args=(x,), daemon=True); t.start(); t.join(). name: threading.current_thread().name. active_count: threading.active_count(). enumerate: threading.enumerate(). Lock: lock = threading.Lock(); with lock: shared_var += 1. RLock: threading.RLock() — reentrant (same thread may acquire multiple times). Event: e = threading.Event(); e.set(); e.wait(); e.clear(); e.is_set(). Condition: c = threading.Condition(); with c: c.wait(); c.notify_all(). Semaphore: sem = threading.Semaphore(5) — limit concurrent access. Timer: t = threading.Timer(3.0, callback); t.start(); t.cancel(). Barrier: b = threading.Barrier(n); b.wait(). local: data = threading.local(); data.user_id = 42 — per-thread storage. Queue: from queue import Queue; q = Queue(maxsize=100); q.put(item); item = q.get(); q.task_done(); q.join(). PriorityQueue: from queue import PriorityQueue. ThreadPoolExecutor: from concurrent.futures import ThreadPoolExecutor, as_completed; with ThreadPoolExecutor(max_workers=8) as ex: futures = [ex.submit(fn, x) for x in items]. map: results = list(ex.map(fn, items, timeout=30)). wait: from concurrent.futures import wait, ALL_COMPLETED. daemon: t.daemon = True — dies when main exits. join(timeout): t.join(timeout=5.0). is_alive: t.is_alive(). Claude Code generates thread pools, producer-consumer pipelines, rate limiters, and I/O worker patterns.
CLAUDE.md for threading
## threading Stack
- Stdlib: import threading | from concurrent.futures import ThreadPoolExecutor, as_completed
- Pool: ThreadPoolExecutor(max_workers=N) | ex.map(fn, items) | [ex.submit(fn,x) for x in items]
- Sync: threading.Lock() | threading.Event() | threading.Semaphore(N) | queue.Queue(maxsize=100)
- Safe: always use Lock/Queue for shared state — never bare shared variables across threads
- I/O only: for CPU-bound work use multiprocessing instead (GIL blocks CPU parallelism)
threading Concurrency Pipeline
# app/threads.py — ThreadPoolExecutor, Lock, Event, Queue, rate limiter, worker pool
from __future__ import annotations
import logging
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Callable, Generator, Iterator, TypeVar
log = logging.getLogger(__name__)
T = TypeVar("T")
R = TypeVar("R")
# ─────────────────────────────────────────────────────────────────────────────
# 1. ThreadPoolExecutor helpers
# ─────────────────────────────────────────────────────────────────────────────
def thread_map(
fn: Callable[[T], R],
items: list[T],
max_workers: int = 8,
timeout: float | None = None,
) -> list[R]:
"""
Run fn(item) in a thread pool; return results in order.
Example:
pages = thread_map(fetch_url, urls, max_workers=16)
"""
with ThreadPoolExecutor(max_workers=max_workers) as ex:
return list(ex.map(fn, items, timeout=timeout))
def thread_submit(
fn: Callable[[T], R],
items: list[T],
max_workers: int = 8,
return_exceptions: bool = False,
timeout: float | None = None,
) -> list[R]:
"""
Submit all items; collect results in original order.
If return_exceptions=True, exceptions are placed in results instead of raised.
Example:
results = thread_submit(download_file, paths, max_workers=4)
"""
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(fn, item): i for i, item in enumerate(items)}
ordered: dict[int, Any] = {}
for future in as_completed(futures, timeout=timeout):
idx = futures[future]
try:
ordered[idx] = future.result()
except Exception as exc:
if return_exceptions:
ordered[idx] = exc
else:
raise
return [ordered[i] for i in range(len(items))]
def thread_map_progress(
fn: Callable[[T], R],
items: list[T],
max_workers: int = 8,
on_done: Callable[[int, int, R], None] | None = None,
) -> list[R]:
"""
Submit all items; call on_done(completed, total, result) as each finishes.
Example:
def show(done, total, result):
print(f" {done}/{total} done")
pages = thread_map_progress(fetch, urls, on_done=show)
"""
total = len(items)
results: dict[int, R] = {}
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(fn, item): i for i, item in enumerate(items)}
done_count = 0
for future in as_completed(futures):
idx = futures[future]
result = future.result()
results[idx] = result
done_count += 1
if on_done:
on_done(done_count, total, result)
return [results[i] for i in range(total)]
# ─────────────────────────────────────────────────────────────────────────────
# 2. Producer–consumer with queue.Queue
# ─────────────────────────────────────────────────────────────────────────────
_STOP = object() # sentinel
def _worker(task_q: queue.Queue, result_q: queue.Queue, fn: Callable) -> None:
while True:
item = task_q.get()
if item is _STOP:
task_q.task_done()
break
try:
result = fn(item)
result_q.put((item, result, None))
except Exception as exc:
result_q.put((item, None, exc))
finally:
task_q.task_done()
def producer_consumer(
fn: Callable[[T], R],
items: list[T],
n_workers: int = 8,
) -> list[tuple[T, R | None, Exception | None]]:
"""
Feed items into a Queue; n_workers threads drain it.
Returns [(item, result, exc)] for each item.
Example:
results = producer_consumer(parse_record, records, n_workers=4)
errors = [(item, exc) for item, _, exc in results if exc]
"""
task_q = queue.Queue()
result_q = queue.Queue()
threads = [
threading.Thread(target=_worker, args=(task_q, result_q, fn), daemon=True)
for _ in range(n_workers)
]
for t in threads:
t.start()
for item in items:
task_q.put(item)
for _ in range(n_workers):
task_q.put(_STOP)
task_q.join()
outputs = []
while not result_q.empty():
outputs.append(result_q.get_nowait())
return outputs
# ─────────────────────────────────────────────────────────────────────────────
# 3. Rate limiter
# ─────────────────────────────────────────────────────────────────────────────
class RateLimiter:
"""
Token-bucket rate limiter, thread-safe.
Example:
limiter = RateLimiter(rate=10, per=1.0) # 10 calls/second
with limiter:
response = session.get(url)
"""
def __init__(self, rate: int, per: float = 1.0) -> None:
self._rate = rate
self._per = per
self._tokens = float(rate)
self._last = time.monotonic()
self._lock = threading.Lock()
def __enter__(self) -> None:
self.acquire()
def __exit__(self, *_) -> None:
pass
def acquire(self) -> None:
with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens += elapsed * (self._rate / self._per)
self._tokens = min(self._tokens, float(self._rate))
self._last = now
if self._tokens < 1.0:
sleep_for = (1.0 - self._tokens) / (self._rate / self._per)
time.sleep(sleep_for)
self._tokens = 0.0
else:
self._tokens -= 1.0
# ─────────────────────────────────────────────────────────────────────────────
# 4. Shared state helpers
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class AtomicCounter:
"""
Thread-safe integer counter.
Example:
counter = AtomicCounter()
# In threads: counter.increment()
# In main: print(counter.value)
"""
_value: int = field(default=0, init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def increment(self, by: int = 1) -> int:
with self._lock:
self._value += by
return self._value
def reset(self) -> None:
with self._lock:
self._value = 0
@property
def value(self) -> int:
with self._lock:
return self._value
class ThreadSafeDict:
"""
Dict protected by a RLock.
Example:
cache = ThreadSafeDict()
cache["key"] = "value"
val = cache.get("key")
"""
def __init__(self) -> None:
self._data: dict = {}
self._lock = threading.RLock()
def __setitem__(self, key, val) -> None:
with self._lock:
self._data[key] = val
def __getitem__(self, key):
with self._lock:
return self._data[key]
def get(self, key, default=None):
with self._lock:
return self._data.get(key, default)
def __contains__(self, key) -> bool:
with self._lock:
return key in self._data
def items(self):
with self._lock:
return list(self._data.items())
def __len__(self) -> int:
with self._lock:
return len(self._data)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Background worker / daemon thread
# ─────────────────────────────────────────────────────────────────────────────
class BackgroundWorker:
"""
Daemon thread that processes tasks from a queue until stopped.
Example:
worker = BackgroundWorker(process_event)
worker.start()
worker.submit(event_data)
worker.stop(wait=True)
"""
def __init__(self, fn: Callable, maxsize: int = 0) -> None:
self._fn = fn
self._queue: queue.Queue = queue.Queue(maxsize=maxsize)
self._stop_evt = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
def start(self) -> None:
self._thread.start()
def submit(self, item: Any) -> None:
if not self._stop_evt.is_set():
self._queue.put(item)
def stop(self, wait: bool = True) -> None:
self._stop_evt.set()
self._queue.put(_STOP)
if wait:
self._thread.join()
def _run(self) -> None:
while True:
item = self._queue.get()
if item is _STOP:
break
try:
self._fn(item)
except Exception:
log.exception("BackgroundWorker error processing %r", item)
finally:
self._queue.task_done()
# ─────────────────────────────────────────────────────────────────────────────
# 6. Thread-local request context
# ─────────────────────────────────────────────────────────────────────────────
_local = threading.local()
def set_request_id(request_id: str) -> None:
"""
Store a request ID in thread-local storage.
Example:
set_request_id("req-abc123")
log.info("Processing: %s", get_request_id())
"""
_local.request_id = request_id
def get_request_id() -> str:
return getattr(_local, "request_id", "unknown")
@contextmanager
def request_context(request_id: str) -> Generator[None, None, None]:
"""
Context manager to bind a request_id to the current thread.
Example:
with request_context("req-xyz"):
process_request()
"""
set_request_id(request_id)
try:
yield
finally:
_local.request_id = "unknown"
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== threading demo ===")
print(f"\n Active threads at start: {threading.active_count()}")
# 1. thread_map
def slow_square(x: int) -> int:
time.sleep(0.02)
return x * x
import time as _time
t0 = _time.perf_counter()
results = thread_map(slow_square, list(range(10)), max_workers=5)
elapsed = _time.perf_counter() - t0
print(f"\n--- thread_map (10 items, 5 workers) ---")
print(f" results: {results}")
print(f" elapsed: {elapsed:.3f}s (sequential would be ~0.20s)")
# 2. producer_consumer
print("\n--- producer_consumer ---")
pq = producer_consumer(slow_square, list(range(6)), n_workers=3)
print(f" {sorted((item, res) for item, res, exc in pq if exc is None)}")
# 3. AtomicCounter
print("\n--- AtomicCounter ---")
counter = AtomicCounter()
threads = [threading.Thread(target=lambda: counter.increment()) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f" 100 threads incremented: counter={counter.value}")
# 4. ThreadSafeDict
print("\n--- ThreadSafeDict ---")
cache = ThreadSafeDict()
def fill(i):
cache[f"k{i}"] = i * 2
threads2 = [threading.Thread(target=fill, args=(i,)) for i in range(10)]
for t in threads2:
t.start()
for t in threads2:
t.join()
print(f" {len(cache)} keys written")
# 5. BackgroundWorker
print("\n--- BackgroundWorker ---")
received = []
worker = BackgroundWorker(received.append)
worker.start()
for i in range(5):
worker.submit(f"event-{i}")
time.sleep(0.05)
worker.stop(wait=True)
print(f" processed: {received}")
# 6. thread-local context
print("\n--- request_context ---")
def handle_request(req_id):
with request_context(req_id):
rid = get_request_id()
results_list.append(rid)
results_list: list[str] = []
threads3 = [threading.Thread(target=handle_request, args=(f"req-{i}",)) for i in range(5)]
for t in threads3:
t.start()
for t in threads3:
t.join()
print(f" thread-local IDs captured: {sorted(results_list)}")
print("\n=== done ===")
For the asyncio alternative — asyncio uses a single-threaded cooperative event loop where coroutines yield control at await points, making it highly scalable for thousands of concurrent I/O connections without OS thread overhead; Python threading creates real OS threads that can run I/O in parallel (GIL released during I/O syscalls) but have higher per-thread memory cost (~8 MB stack) — use asyncio when building servers, APIs, or any code that is async/await-native from top to bottom (aiohttp, FastAPI, httpx), threading when integrating with synchronous libraries (requests, psycopg2, boto3) or adding concurrency to an existing sync codebase. For the concurrent.futures alternative — concurrent.futures.ThreadPoolExecutor provides Future objects, as_completed() for unordered result streams, and a unified interface shared with ProcessPoolExecutor; raw threading.Thread gives lower-level control including custom synchronization, daemon threads, thread-local storage, and fine-grained lifecycle management — use ThreadPoolExecutor for most new code wanting clean future-based control flow, threading primitives when you need explicit locks, events, barriers, or long-lived daemon threads. The Claude Skills 360 bundle includes threading skill sets covering thread_map()/thread_submit()/thread_map_progress() pool helpers, producer_consumer() Queue-based pipeline, RateLimiter token bucket, AtomicCounter/ThreadSafeDict shared state, BackgroundWorker daemon, and request_context() thread-local storage. Start with the free tier to try I/O concurrency and threading pipeline code generation.