multiprocessing runs Python code on multiple CPU cores by bypassing the GIL. import multiprocessing as mp. cpu_count: mp.cpu_count(). Pool.map: with mp.Pool() as pool: results = pool.map(fn, items). Pool.starmap: pool.starmap(fn, [(a1,b1),(a2,b2)]). Pool.imap_unordered: for r in pool.imap_unordered(fn, items): process(r) — yields as completed. apply_async: ar = pool.apply_async(fn, args=(x,)); result = ar.get(timeout=60). Process: p = mp.Process(target=worker, args=(queue,)); p.start(); p.join(). Queue: q = mp.Queue(); q.put(item); q.get(). Pipe: parent_conn, child_conn = mp.Pipe(); parent_conn.send(data); child_conn.recv(). Manager: with mp.Manager() as mgr: d = mgr.dict(); lst = mgr.list(). Value/Array: v = mp.Value("i",0); with v.get_lock(): v.value += 1. Event: e = mp.Event(); e.set(); e.wait(). Semaphore: sem = mp.Semaphore(4). Barrier: b = mp.Barrier(n_workers). shared_memory: from multiprocessing.shared_memory import SharedMemory; shm = SharedMemory(create=True, size=4*n). ProcessPoolExecutor: from concurrent.futures import ProcessPoolExecutor; with ProcessPoolExecutor(max_workers=4) as ex: futures = [ex.submit(fn,x) for x in items]. Context: ctx = mp.get_context("spawn") — safe on all platforms. freeze_support: call in if __name__ == "__main__":. chunksize: pool.map(fn, big_list, chunksize=100) — reduces IPC overhead. Claude Code generates multiprocessing worker pools, pipelines, progress reporters, and parallel data transformers.
CLAUDE.md for multiprocessing
## multiprocessing Stack
- Stdlib: import multiprocessing as mp | from concurrent.futures import ProcessPoolExecutor
- CPU count: workers = min(mp.cpu_count(), len(items))
- Pool: with mp.Pool(workers) as pool: results = pool.map(fn, items)
- Modern: with ProcessPoolExecutor(max_workers=workers) as ex: fs = [ex.submit(fn, x) for x in items]
- IPC: mp.Queue() | mp.Pipe() | mp.Manager().dict()
- Guard: if __name__ == "__main__": (required on Windows/spawn)
multiprocessing Parallel Pipeline
# app/parallel.py — multiprocessing Pool, Process, Queue, shared state, futures
from __future__ import annotations
import multiprocessing as mp
import os
import time
import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any, Callable, Iterable, Iterator, TypeVar
log = logging.getLogger(__name__)
T = TypeVar("T")
R = TypeVar("R")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Pool helpers
# ─────────────────────────────────────────────────────────────────────────────
def n_workers(max_workers: int | None = None) -> int:
"""
Sensible worker count: min(cpu_count, max_workers or cpu_count).
Example:
workers = n_workers() # all CPUs
workers = n_workers(8) # cap at 8
"""
cpus = mp.cpu_count()
if max_workers is None:
return cpus
return min(cpus, max_workers)
def parallel_map(
fn: Callable[[T], R],
items: list[T],
max_workers: int | None = None,
chunksize: int | None = None,
timeout: float | None = None,
) -> list[R]:
"""
Run fn(item) in parallel for all items using a Pool.
Results are returned in the same order as inputs.
Example:
results = parallel_map(compress_file, file_paths, max_workers=4)
"""
workers = n_workers(max_workers)
chunk = chunksize or max(1, len(items) // (workers * 4))
with mp.Pool(processes=workers) as pool:
async_result = pool.map_async(fn, items, chunksize=chunk)
return async_result.get(timeout=timeout)
def parallel_starmap(
fn: Callable[..., R],
args_list: list[tuple],
max_workers: int | None = None,
chunksize: int | None = None,
) -> list[R]:
"""
Like parallel_map but each item is a tuple of arguments (fn(*args)).
Example:
results = parallel_starmap(encode, [(text, "utf-8") for text in texts])
"""
workers = n_workers(max_workers)
chunk = chunksize or max(1, len(args_list) // (workers * 4))
with mp.Pool(processes=workers) as pool:
return pool.starmap(fn, args_list, chunksize=chunk)
def parallel_imap(
fn: Callable[[T], R],
items: Iterable[T],
max_workers: int | None = None,
chunksize: int = 1,
) -> Iterator[R]:
"""
Yield results as each item completes (unordered for performance).
Example:
for result in parallel_imap(process_chunk, data_chunks):
save(result)
"""
workers = n_workers(max_workers)
with mp.Pool(processes=workers) as pool:
yield from pool.imap_unordered(fn, items, chunksize=chunksize)
# ─────────────────────────────────────────────────────────────────────────────
# 2. concurrent.futures ProcessPoolExecutor
# ─────────────────────────────────────────────────────────────────────────────
def pool_submit(
fn: Callable[[T], R],
items: list[T],
max_workers: int | None = None,
timeout: float | None = None,
return_exceptions: bool = False,
) -> list[R]:
"""
Submit fn(item) for each item via ProcessPoolExecutor.
Returns results in order; optionally captures exceptions instead of raising.
Example:
results = pool_submit(parse_log_file, log_paths, max_workers=4)
"""
workers = n_workers(max_workers)
results: list[R] = []
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(fn, item): i for i, item in enumerate(items)}
ordered: dict[int, Any] = {}
for future in as_completed(futures, timeout=timeout):
idx = futures[future]
try:
ordered[idx] = future.result()
except Exception as exc:
if return_exceptions:
ordered[idx] = exc
else:
raise
return [ordered[i] for i in range(len(items))]
def pool_map_progress(
fn: Callable[[T], R],
items: list[T],
max_workers: int | None = None,
on_done: Callable[[int, int, R], None] | None = None,
) -> list[R]:
"""
Submit all items, calling on_done(completed, total, result) as each finishes.
Example:
def show_progress(done, total, result):
print(f" {done}/{total}: {result!r}")
results = pool_map_progress(crunch, items, on_done=show_progress)
"""
workers = n_workers(max_workers)
total = len(items)
results: dict[int, R] = {}
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = {executor.submit(fn, item): i for i, item in enumerate(items)}
done_count = 0
for future in as_completed(futures):
idx = futures[future]
result = future.result()
results[idx] = result
done_count += 1
if on_done:
on_done(done_count, total, result)
return [results[i] for i in range(total)]
# ─────────────────────────────────────────────────────────────────────────────
# 3. Producer–consumer with Queue
# ─────────────────────────────────────────────────────────────────────────────
_SENTINEL = None # poison pill to stop workers
def _consumer_worker(task_queue: mp.Queue, result_queue: mp.Queue, fn: Callable) -> None:
"""Worker process: consume tasks until sentinel, put results."""
while True:
item = task_queue.get()
if item is _SENTINEL:
break
try:
result = fn(item)
result_queue.put((item, result, None))
except Exception as exc:
result_queue.put((item, None, exc))
def producer_consumer(
fn: Callable[[T], R],
items: list[T],
n_consumers: int | None = None,
) -> list[tuple[T, R | None, Exception | None]]:
"""
Producer–consumer pattern: main process feeds a Queue, workers drain it.
Returns [(item, result, exception)] for each item.
Example:
results = producer_consumer(fetch_url, urls, n_consumers=4)
errors = [(item, exc) for item, res, exc in results if exc]
"""
n = n_consumers or n_workers()
task_q = mp.Queue()
result_q = mp.Queue()
workers = [
mp.Process(target=_consumer_worker, args=(task_q, result_q, fn), daemon=True)
for _ in range(n)
]
for w in workers:
w.start()
for item in items:
task_q.put(item)
for _ in range(n):
task_q.put(_SENTINEL) # poison pills
outputs = []
for _ in range(len(items)):
outputs.append(result_q.get())
for w in workers:
w.join()
return outputs
# ─────────────────────────────────────────────────────────────────────────────
# 4. Shared state helpers
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SharedCounter:
"""
Thread/process-safe counter using mp.Value.
Example:
counter = SharedCounter()
# In worker: counter.increment()
# In main: print(counter.value)
"""
_value: mp.Value = None # type: ignore
def __post_init__(self):
self._value = mp.Value("i", 0)
def increment(self, by: int = 1) -> None:
with self._value.get_lock():
self._value.value += by
@property
def value(self) -> int:
return self._value.value
def parallel_map_with_progress(
fn: Callable[[T], R],
items: list[T],
max_workers: int | None = None,
) -> list[R]:
"""
parallel_map with a shared counter tracking completion.
Example:
results = parallel_map_with_progress(process, items)
"""
counter = SharedCounter()
total = len(items)
def _wrapped(item):
result = fn(item)
counter.increment()
log.debug("Progress: %d/%d", counter.value, total)
return result
return parallel_map(_wrapped, items, max_workers=max_workers)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Chunked processing (for large iterables)
# ─────────────────────────────────────────────────────────────────────────────
def chunks(lst: list[T], size: int) -> Iterator[list[T]]:
"""Split list into chunks of given size."""
for i in range(0, len(lst), size):
yield lst[i : i + size]
def parallel_batch(
fn: Callable[[list[T]], list[R]],
items: list[T],
batch_size: int = 1000,
max_workers: int | None = None,
) -> list[R]:
"""
Process items in batches — each worker receives a chunk, not a single item.
Reduces IPC overhead for large lists of lightweight items.
Example:
# fn receives a list of 500 items and returns a list of results
all_results = parallel_batch(encode_batch, texts, batch_size=500)
"""
batches = list(chunks(items, batch_size))
batch_results = parallel_map(fn, batches, max_workers=max_workers)
return [item for batch in batch_results for item in batch]
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
def _square(x: int) -> int:
"""Simple CPU-bound function for demo."""
return x * x
def _slow_square(x: int) -> int:
"""Simulate work."""
time.sleep(0.01)
return x * x
if __name__ == "__main__":
# Required on Windows (spawn context) — safe no-op on Linux/macOS
mp.freeze_support()
print("=== multiprocessing demo ===")
print(f" CPU count: {mp.cpu_count()}")
print(f" n_workers(): {n_workers()}")
items = list(range(20))
print("\n--- parallel_map ---")
t0 = time.perf_counter()
results = parallel_map(_square, items, max_workers=4)
print(f" {items[:5]} -> {results[:5]} ... ({time.perf_counter()-t0:.3f}s)")
print("\n--- parallel_starmap ---")
pairs = [(x, x+1) for x in range(5)]
sums = parallel_starmap(lambda a, b: a + b, pairs)
print(f" pairs={pairs} -> sums={sums}")
print("\n--- pool_submit (futures) ---")
small = list(range(8))
evens = pool_submit(lambda x: x % 2 == 0, small)
print(f" is_even: {list(zip(small, evens))}")
print("\n--- pool_map_progress ---")
log_msgs = []
def _on_done(done, total, result):
if done % 5 == 0 or done == total:
log_msgs.append(f" {done}/{total}")
results2 = pool_map_progress(_square, list(range(10)), on_done=_on_done)
print(f" progress milestones: {log_msgs}")
print(f" results: {results2}")
print("\n--- producer_consumer ---")
pq_results = producer_consumer(_square, list(range(6)), n_consumers=2)
print(f" {sorted((item, res) for item, res, exc in pq_results if exc is None)}")
print("\n=== done ===")
For the concurrent.futures alternative — concurrent.futures.ProcessPoolExecutor provides a higher-level API over multiprocessing.Pool with Future objects, as_completed() for unordered result streams, and a unified interface shared with ThreadPoolExecutor; raw multiprocessing.Pool gives lower-level control including imap_unordered, apply_async, and callback hooks — use ProcessPoolExecutor for most new code wanting clean future-based control flow, multiprocessing.Pool when you need imap_unordered streaming or fine-grained callbacks. For the ray alternative — Ray is a distributed execution framework that scales multiprocessing beyond a single machine to clusters of hundreds of nodes, supports actors (stateful workers), and integrates with ML libraries (RLlib, Tune, Serve); Python’s stdlib multiprocessing is limited to a single machine and offers no cluster management — use multiprocessing for single-machine CPU parallelism where no external dependencies are acceptable, Ray when you outgrow one machine or need distributed task scheduling, actor-based state, or ML workflow orchestration. The Claude Skills 360 bundle includes multiprocessing skill sets covering n_workers()/parallel_map()/parallel_starmap()/parallel_imap(), pool_submit()/pool_map_progress() futures API, producer_consumer() Queue-based pipeline, SharedCounter shared state, parallel_batch() large-iterable chunking, and Windows-safe freeze_support() patterns. Start with the free tier to try CPU parallelism and multiprocessing pipeline code generation.