Python’s concurrent.futures module provides high-level thread and process pools via a unified Executor API. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait. ThreadPoolExecutor: ThreadPoolExecutor(max_workers=N) — for I/O-bound work; threads share memory; N defaults to min(32, cpu_count+4). ProcessPoolExecutor: ProcessPoolExecutor(max_workers=N) — for CPU-bound work; each worker is a separate process; avoid pickling issues. submit: future = executor.submit(fn, *args, **kwargs) → Future. map: executor.map(fn, iterable, timeout=None, chunksize=1) → iterator of ordered results; raises the first exception encountered. as_completed: for future in as_completed(futures): — yield futures as they complete (not in submission order). Future API: .result(timeout=None) — block and return result or re-raise; .exception() — return exception or None; .done() — non-blocking check; .cancel() — attempt cancellation (not possible if already running). wait: done, not_done = wait(futures, timeout=None, return_when=ALL_COMPLETED) — also FIRST_COMPLETED, FIRST_EXCEPTION. Context manager: with ThreadPoolExecutor() as ex: — auto-shutdowns and waits on exit. chunksize (ProcessPool only): batch multiple items per subprocess call for better throughput. Claude Code generates parallel downloaders, batch API callers, CPU-parallel data transformers, and scatter-gather pipelines.
CLAUDE.md for concurrent.futures
## concurrent.futures Stack
- Stdlib: from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
- from concurrent.futures import as_completed, wait, FIRST_COMPLETED
- IO: with ThreadPoolExecutor(max_workers=16) as ex: futures = [ex.submit(fn, x) for x in items]
- CPU: with ProcessPoolExecutor() as ex: results = list(ex.map(cpu_fn, items))
- Order: for future in as_completed(futures): result = future.result()
- Safe: future.result() # re-raises exception from worker
concurrent.futures Parallel Pipeline
# app/fututil.py — parallel fetch, retry, scatter-gather, rate-limit, progress
from __future__ import annotations
import time
from concurrent.futures import (
ALL_COMPLETED,
FIRST_COMPLETED,
Future,
ProcessPoolExecutor,
ThreadPoolExecutor,
as_completed,
wait,
)
from dataclasses import dataclass, field
from typing import Any, Callable, Iterable, Iterator, TypeVar
T = TypeVar("T")
R = TypeVar("R")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Parallel map helpers
# ─────────────────────────────────────────────────────────────────────────────
def parallel_map(
fn: Callable[[T], R],
items: Iterable[T],
max_workers: int = 8,
use_processes: bool = False,
timeout: float | None = None,
) -> list[R]:
"""
Apply fn to each item in parallel; collect results in input order.
Raises the first exception encountered.
Example:
results = parallel_map(fetch_url, urls, max_workers=16)
"""
Executor = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
with Executor(max_workers=max_workers) as ex:
return list(ex.map(fn, items, timeout=timeout))
def parallel_map_safe(
fn: Callable[[T], R],
items: list[T],
max_workers: int = 8,
use_processes: bool = False,
default: R | None = None,
) -> list[R | None]:
"""
Apply fn to each item in parallel; replace failures with default.
Returns a list aligned to items (same order).
Example:
results = parallel_map_safe(fetch_url, urls, default=None)
"""
Executor = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
with Executor(max_workers=max_workers) as ex:
futures = [ex.submit(fn, item) for item in items]
results: list[R | None] = []
for fut in futures:
try:
results.append(fut.result())
except Exception:
results.append(default)
return results
def parallel_filter(
predicate: Callable[[T], bool],
items: list[T],
max_workers: int = 8,
) -> list[T]:
"""
Return elements of items for which predicate is True, evaluated in parallel.
Example:
live_urls = parallel_filter(is_url_reachable, urls, max_workers=20)
"""
Executor = ThreadPoolExecutor
with Executor(max_workers=max_workers) as ex:
futures = {ex.submit(predicate, item): item for item in items}
return [futures[fut] for fut in futures if fut.result()]
# ─────────────────────────────────────────────────────────────────────────────
# 2. As-completed processing
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TaskResult:
item: Any
result: Any
error: Exception | None
elapsed: float
@property
def ok(self) -> bool:
return self.error is None
def run_as_completed(
fn: Callable[[T], R],
items: Iterable[T],
max_workers: int = 8,
timeout: float | None = None,
) -> Iterator[TaskResult]:
"""
Submit all items, yield TaskResult as each finishes (completion order).
Captures exceptions per task rather than failing the whole batch.
Example:
for r in run_as_completed(download, urls, max_workers=20):
if r.ok:
save(r.result)
else:
log_error(r.item, r.error)
"""
with ThreadPoolExecutor(max_workers=max_workers) as ex:
future_to_item: dict[Future, Any] = {}
for item in items:
future_to_item[ex.submit(fn, item)] = item
for fut in as_completed(future_to_item, timeout=timeout):
item = future_to_item[fut]
t0 = time.monotonic()
try:
result = fut.result()
yield TaskResult(item=item, result=result, error=None,
elapsed=time.monotonic() - t0)
except Exception as exc:
yield TaskResult(item=item, result=None, error=exc,
elapsed=time.monotonic() - t0)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Batch processing with retry
# ─────────────────────────────────────────────────────────────────────────────
def with_retry(
fn: Callable[[T], R],
item: T,
max_attempts: int = 3,
backoff: float = 0.5,
backoff_factor: float = 2.0,
) -> R:
"""
Call fn(item) with exponential-backoff retry on exception.
Example:
result = with_retry(fetch_url, url, max_attempts=4)
"""
delay = backoff
last_exc: BaseException | None = None
for attempt in range(max_attempts):
try:
return fn(item)
except Exception as exc:
last_exc = exc
if attempt < max_attempts - 1:
time.sleep(delay)
delay *= backoff_factor
raise last_exc # type: ignore[misc]
def parallel_with_retry(
fn: Callable[[T], R],
items: Iterable[T],
max_workers: int = 8,
max_attempts: int = 3,
backoff: float = 0.5,
) -> list[TaskResult]:
"""
Run fn on items in parallel, with per-item retry on failure.
Example:
results = parallel_with_retry(flaky_api_call, ids, max_attempts=4)
"""
import functools
retry_fn = functools.partial(with_retry, fn, max_attempts=max_attempts, backoff=backoff)
return list(run_as_completed(retry_fn, items, max_workers=max_workers))
# ─────────────────────────────────────────────────────────────────────────────
# 4. Scatter-gather pattern
# ─────────────────────────────────────────────────────────────────────────────
def scatter_gather(
fns: dict[str, Callable[[], R]],
max_workers: int | None = None,
timeout: float | None = None,
) -> dict[str, R]:
"""
Run multiple named callables in parallel; return {name: result}.
Raises the first exception on any task.
Example:
results = scatter_gather({
"users": lambda: db.query("SELECT * FROM users"),
"products": lambda: db.query("SELECT * FROM products"),
"stats": lambda: compute_stats(),
})
users = results["users"]
"""
n = max_workers or len(fns)
with ThreadPoolExecutor(max_workers=n) as ex:
future_to_name = {ex.submit(fn): name for name, fn in fns.items()}
return {
future_to_name[fut]: fut.result(timeout=timeout)
for fut in as_completed(future_to_name, timeout=timeout)
}
def first_success(
fns: dict[str, Callable[[], R]],
timeout: float | None = None,
) -> tuple[str, R]:
"""
Run fns in parallel; return the name and result of the first to succeed.
Cancels remaining futures.
Example:
name, data = first_success({
"primary": lambda: fetch("https://primary.api/data"),
"fallback": lambda: fetch("https://fallback.api/data"),
})
"""
with ThreadPoolExecutor(max_workers=len(fns)) as ex:
future_to_name = {ex.submit(fn): name for name, fn in fns.items()}
for fut in as_completed(future_to_name, timeout=timeout):
try:
result = fut.result()
# Cancel remaining
for other in future_to_name:
if other is not fut:
other.cancel()
return future_to_name[fut], result
except Exception:
continue
raise RuntimeError("All parallel tasks failed")
# ─────────────────────────────────────────────────────────────────────────────
# 5. CPU-bound process pool helpers
# ─────────────────────────────────────────────────────────────────────────────
def cpu_parallel_map(
fn: Callable[[T], R],
items: list[T],
max_workers: int | None = None,
chunksize: int = 1,
) -> list[R]:
"""
CPU-parallel map using ProcessPoolExecutor.
fn and items must be picklable.
Example:
results = cpu_parallel_map(compress_image, image_paths, chunksize=4)
"""
with ProcessPoolExecutor(max_workers=max_workers) as ex:
return list(ex.map(fn, items, chunksize=chunksize))
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== concurrent.futures demo ===")
def slow_square(x: int) -> int:
time.sleep(0.01) # simulate I/O
return x * x
def occasionally_fails(x: int) -> int:
if x % 3 == 0:
raise ValueError(f"failed on {x}")
return x * 2
print("\n--- parallel_map ---")
t0 = time.monotonic()
results = parallel_map(slow_square, range(20), max_workers=10)
elapsed = time.monotonic() - t0
print(f" parallel_map(slow_square, 0..19): {results[:5]}... elapsed={elapsed:.2f}s")
print("\n--- parallel_map_safe ---")
safe = parallel_map_safe(occasionally_fails, list(range(10)), default=-1)
print(f" safe results: {safe}")
print("\n--- run_as_completed ---")
successes = failures = 0
for r in run_as_completed(occasionally_fails, range(9), max_workers=4):
if r.ok:
successes += 1
else:
failures += 1
print(f" successes={successes} failures={failures}")
print("\n--- scatter_gather ---")
results_sg = scatter_gather({
"squares": lambda: [x**2 for x in range(5)],
"cubes": lambda: [x**3 for x in range(5)],
"sum100": lambda: sum(range(100)),
})
for name, val in sorted(results_sg.items()):
print(f" {name}: {val}")
print("\n--- parallel_with_retry ---")
call_counts: dict[int, int] = {}
def flaky(x: int) -> int:
call_counts[x] = call_counts.get(x, 0) + 1
if call_counts[x] < 2:
raise RuntimeError(f"transient error for {x}")
return x * 10
retry_results = parallel_with_retry(flaky, [1, 2, 3], max_attempts=3, backoff=0.01)
for r in sorted(retry_results, key=lambda r: r.item):
print(f" item={r.item} ok={r.ok} result={r.result} attempts={call_counts.get(r.item, '?')}")
print("\n=== done ===")
For the asyncio alternative — asyncio provides cooperative concurrency via coroutines and event loops; it is ideal for managing thousands of concurrent I/O-bound tasks on a single thread (HTTP, websocket, database); ThreadPoolExecutor creates OS threads which can run truly in parallel for blocking I/O calls that don’t support async — use asyncio with an async HTTP library (aiohttp, httpx) for high-concurrency network I/O in a new codebase; use ThreadPoolExecutor to parallelize calls to blocking (non-async) client libraries without rewriting them. For the joblib alternative — joblib (PyPI) wraps multiprocessing with memory-mapped array sharing, intelligent joblib caching (Memory), and the Parallel / delayed API used by scikit-learn; it handles array sharing between processes more efficiently than pickling — use joblib for scientific computing pipelines, machine learning preprocessing, and any workflow that passes large NumPy arrays between workers; use ProcessPoolExecutor for general-purpose CPU-bound Python code that doesn’t involve large NumPy arrays. The Claude Skills 360 bundle includes concurrent.futures skill sets covering parallel_map()/parallel_map_safe()/parallel_filter() batch helpers, TaskResult dataclass with run_as_completed() iterator, with_retry()/parallel_with_retry() retry helpers, scatter_gather()/first_success() fan-out patterns, and cpu_parallel_map() process pool wrapper. Start with the free tier to try high-level concurrency patterns and concurrent.futures pipeline code generation.