Claude Code for multiprocessing: CPU Parallelism in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for multiprocessing: CPU Parallelism in Python
AI

Claude Code for multiprocessing: CPU Parallelism in Python

Published: June 23, 2028
Read time: 5 min read
By: Claude Skills 360

multiprocessing runs Python code on multiple CPU cores by bypassing the GIL. import multiprocessing as mp. cpu_count: mp.cpu_count(). Pool.map: with mp.Pool() as pool: results = pool.map(fn, items). Pool.starmap: pool.starmap(fn, [(a1,b1),(a2,b2)]). Pool.imap_unordered: for r in pool.imap_unordered(fn, items): process(r) — yields as completed. apply_async: ar = pool.apply_async(fn, args=(x,)); result = ar.get(timeout=60). Process: p = mp.Process(target=worker, args=(queue,)); p.start(); p.join(). Queue: q = mp.Queue(); q.put(item); q.get(). Pipe: parent_conn, child_conn = mp.Pipe(); parent_conn.send(data); child_conn.recv(). Manager: with mp.Manager() as mgr: d = mgr.dict(); lst = mgr.list(). Value/Array: v = mp.Value("i",0); with v.get_lock(): v.value += 1. Event: e = mp.Event(); e.set(); e.wait(). Semaphore: sem = mp.Semaphore(4). Barrier: b = mp.Barrier(n_workers). shared_memory: from multiprocessing.shared_memory import SharedMemory; shm = SharedMemory(create=True, size=4*n). ProcessPoolExecutor: from concurrent.futures import ProcessPoolExecutor; with ProcessPoolExecutor(max_workers=4) as ex: futures = [ex.submit(fn,x) for x in items]. Context: ctx = mp.get_context("spawn") — safe on all platforms. freeze_support: call in if __name__ == "__main__":. chunksize: pool.map(fn, big_list, chunksize=100) — reduces IPC overhead. Claude Code generates multiprocessing worker pools, pipelines, progress reporters, and parallel data transformers.

CLAUDE.md for multiprocessing

## multiprocessing Stack
- Stdlib: import multiprocessing as mp | from concurrent.futures import ProcessPoolExecutor
- CPU count: workers = min(mp.cpu_count(), len(items))
- Pool: with mp.Pool(workers) as pool: results = pool.map(fn, items)
- Modern: with ProcessPoolExecutor(max_workers=workers) as ex: fs = [ex.submit(fn, x) for x in items]
- IPC: mp.Queue() | mp.Pipe() | mp.Manager().dict()
- Guard: if __name__ == "__main__": (required on Windows/spawn)

multiprocessing Parallel Pipeline

# app/parallel.py — multiprocessing Pool, Process, Queue, shared state, futures
from __future__ import annotations

import multiprocessing as mp
import os
import time
import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any, Callable, Iterable, Iterator, TypeVar

log = logging.getLogger(__name__)
T   = TypeVar("T")
R   = TypeVar("R")


# ─────────────────────────────────────────────────────────────────────────────
# 1. Pool helpers
# ─────────────────────────────────────────────────────────────────────────────

def n_workers(max_workers: int | None = None) -> int:
    """
    Sensible worker count: min(cpu_count, max_workers or cpu_count).

    Example:
        workers = n_workers()       # all CPUs
        workers = n_workers(8)      # cap at 8
    """
    cpus = mp.cpu_count()
    if max_workers is None:
        return cpus
    return min(cpus, max_workers)


def parallel_map(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int | None = None,
    chunksize: int | None = None,
    timeout: float | None = None,
) -> list[R]:
    """
    Run fn(item) in parallel for all items using a Pool.
    Results are returned in the same order as inputs.

    Example:
        results = parallel_map(compress_file, file_paths, max_workers=4)
    """
    workers = n_workers(max_workers)
    chunk   = chunksize or max(1, len(items) // (workers * 4))

    with mp.Pool(processes=workers) as pool:
        async_result = pool.map_async(fn, items, chunksize=chunk)
        return async_result.get(timeout=timeout)


def parallel_starmap(
    fn: Callable[..., R],
    args_list: list[tuple],
    max_workers: int | None = None,
    chunksize: int | None = None,
) -> list[R]:
    """
    Like parallel_map but each item is a tuple of arguments (fn(*args)).

    Example:
        results = parallel_starmap(encode, [(text, "utf-8") for text in texts])
    """
    workers = n_workers(max_workers)
    chunk   = chunksize or max(1, len(args_list) // (workers * 4))
    with mp.Pool(processes=workers) as pool:
        return pool.starmap(fn, args_list, chunksize=chunk)


def parallel_imap(
    fn: Callable[[T], R],
    items: Iterable[T],
    max_workers: int | None = None,
    chunksize: int = 1,
) -> Iterator[R]:
    """
    Yield results as each item completes (unordered for performance).

    Example:
        for result in parallel_imap(process_chunk, data_chunks):
            save(result)
    """
    workers = n_workers(max_workers)
    with mp.Pool(processes=workers) as pool:
        yield from pool.imap_unordered(fn, items, chunksize=chunksize)


# ─────────────────────────────────────────────────────────────────────────────
# 2. concurrent.futures ProcessPoolExecutor
# ─────────────────────────────────────────────────────────────────────────────

def pool_submit(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int | None = None,
    timeout: float | None = None,
    return_exceptions: bool = False,
) -> list[R]:
    """
    Submit fn(item) for each item via ProcessPoolExecutor.
    Returns results in order; optionally captures exceptions instead of raising.

    Example:
        results = pool_submit(parse_log_file, log_paths, max_workers=4)
    """
    workers = n_workers(max_workers)
    results: list[R] = []

    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(fn, item): i for i, item in enumerate(items)}
        ordered: dict[int, Any] = {}

        for future in as_completed(futures, timeout=timeout):
            idx = futures[future]
            try:
                ordered[idx] = future.result()
            except Exception as exc:
                if return_exceptions:
                    ordered[idx] = exc
                else:
                    raise

    return [ordered[i] for i in range(len(items))]


def pool_map_progress(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int | None = None,
    on_done: Callable[[int, int, R], None] | None = None,
) -> list[R]:
    """
    Submit all items, calling on_done(completed, total, result) as each finishes.

    Example:
        def show_progress(done, total, result):
            print(f"  {done}/{total}: {result!r}")

        results = pool_map_progress(crunch, items, on_done=show_progress)
    """
    workers = n_workers(max_workers)
    total   = len(items)
    results: dict[int, R] = {}

    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(fn, item): i for i, item in enumerate(items)}
        done_count = 0
        for future in as_completed(futures):
            idx    = futures[future]
            result = future.result()
            results[idx] = result
            done_count += 1
            if on_done:
                on_done(done_count, total, result)

    return [results[i] for i in range(total)]


# ─────────────────────────────────────────────────────────────────────────────
# 3. Producer–consumer with Queue
# ─────────────────────────────────────────────────────────────────────────────

_SENTINEL = None  # poison pill to stop workers


def _consumer_worker(task_queue: mp.Queue, result_queue: mp.Queue, fn: Callable) -> None:
    """Worker process: consume tasks until sentinel, put results."""
    while True:
        item = task_queue.get()
        if item is _SENTINEL:
            break
        try:
            result = fn(item)
            result_queue.put((item, result, None))
        except Exception as exc:
            result_queue.put((item, None, exc))


def producer_consumer(
    fn: Callable[[T], R],
    items: list[T],
    n_consumers: int | None = None,
) -> list[tuple[T, R | None, Exception | None]]:
    """
    Producer–consumer pattern: main process feeds a Queue, workers drain it.
    Returns [(item, result, exception)] for each item.

    Example:
        results = producer_consumer(fetch_url, urls, n_consumers=4)
        errors  = [(item, exc) for item, res, exc in results if exc]
    """
    n         = n_consumers or n_workers()
    task_q    = mp.Queue()
    result_q  = mp.Queue()

    workers = [
        mp.Process(target=_consumer_worker, args=(task_q, result_q, fn), daemon=True)
        for _ in range(n)
    ]
    for w in workers:
        w.start()

    for item in items:
        task_q.put(item)
    for _ in range(n):
        task_q.put(_SENTINEL)  # poison pills

    outputs = []
    for _ in range(len(items)):
        outputs.append(result_q.get())

    for w in workers:
        w.join()

    return outputs


# ─────────────────────────────────────────────────────────────────────────────
# 4. Shared state helpers
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class SharedCounter:
    """
    Thread/process-safe counter using mp.Value.

    Example:
        counter = SharedCounter()
        # In worker: counter.increment()
        # In main:   print(counter.value)
    """
    _value: mp.Value = None  # type: ignore

    def __post_init__(self):
        self._value = mp.Value("i", 0)

    def increment(self, by: int = 1) -> None:
        with self._value.get_lock():
            self._value.value += by

    @property
    def value(self) -> int:
        return self._value.value


def parallel_map_with_progress(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int | None = None,
) -> list[R]:
    """
    parallel_map with a shared counter tracking completion.

    Example:
        results = parallel_map_with_progress(process, items)
    """
    counter = SharedCounter()
    total   = len(items)

    def _wrapped(item):
        result = fn(item)
        counter.increment()
        log.debug("Progress: %d/%d", counter.value, total)
        return result

    return parallel_map(_wrapped, items, max_workers=max_workers)


# ─────────────────────────────────────────────────────────────────────────────
# 5. Chunked processing (for large iterables)
# ─────────────────────────────────────────────────────────────────────────────

def chunks(lst: list[T], size: int) -> Iterator[list[T]]:
    """Split list into chunks of given size."""
    for i in range(0, len(lst), size):
        yield lst[i : i + size]


def parallel_batch(
    fn: Callable[[list[T]], list[R]],
    items: list[T],
    batch_size: int = 1000,
    max_workers: int | None = None,
) -> list[R]:
    """
    Process items in batches — each worker receives a chunk, not a single item.
    Reduces IPC overhead for large lists of lightweight items.

    Example:
        # fn receives a list of 500 items and returns a list of results
        all_results = parallel_batch(encode_batch, texts, batch_size=500)
    """
    batches = list(chunks(items, batch_size))
    batch_results = parallel_map(fn, batches, max_workers=max_workers)
    return [item for batch in batch_results for item in batch]


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

def _square(x: int) -> int:
    """Simple CPU-bound function for demo."""
    return x * x


def _slow_square(x: int) -> int:
    """Simulate work."""
    time.sleep(0.01)
    return x * x


if __name__ == "__main__":
    # Required on Windows (spawn context) — safe no-op on Linux/macOS
    mp.freeze_support()

    print("=== multiprocessing demo ===")
    print(f"  CPU count: {mp.cpu_count()}")
    print(f"  n_workers(): {n_workers()}")

    items = list(range(20))

    print("\n--- parallel_map ---")
    t0      = time.perf_counter()
    results = parallel_map(_square, items, max_workers=4)
    print(f"  {items[:5]} -> {results[:5]} ... ({time.perf_counter()-t0:.3f}s)")

    print("\n--- parallel_starmap ---")
    pairs   = [(x, x+1) for x in range(5)]
    sums    = parallel_starmap(lambda a, b: a + b, pairs)
    print(f"  pairs={pairs} -> sums={sums}")

    print("\n--- pool_submit (futures) ---")
    small = list(range(8))
    evens = pool_submit(lambda x: x % 2 == 0, small)
    print(f"  is_even: {list(zip(small, evens))}")

    print("\n--- pool_map_progress ---")
    log_msgs = []
    def _on_done(done, total, result):
        if done % 5 == 0 or done == total:
            log_msgs.append(f"  {done}/{total}")

    results2 = pool_map_progress(_square, list(range(10)), on_done=_on_done)
    print(f"  progress milestones: {log_msgs}")
    print(f"  results: {results2}")

    print("\n--- producer_consumer ---")
    pq_results = producer_consumer(_square, list(range(6)), n_consumers=2)
    print(f"  {sorted((item, res) for item, res, exc in pq_results if exc is None)}")

    print("\n=== done ===")

For the concurrent.futures alternative — concurrent.futures.ProcessPoolExecutor provides a higher-level API over multiprocessing.Pool with Future objects, as_completed() for unordered result streams, and a unified interface shared with ThreadPoolExecutor; raw multiprocessing.Pool gives lower-level control including imap_unordered, apply_async, and callback hooks — use ProcessPoolExecutor for most new code wanting clean future-based control flow, multiprocessing.Pool when you need imap_unordered streaming or fine-grained callbacks. For the ray alternative — Ray is a distributed execution framework that scales multiprocessing beyond a single machine to clusters of hundreds of nodes, supports actors (stateful workers), and integrates with ML libraries (RLlib, Tune, Serve); Python’s stdlib multiprocessing is limited to a single machine and offers no cluster management — use multiprocessing for single-machine CPU parallelism where no external dependencies are acceptable, Ray when you outgrow one machine or need distributed task scheduling, actor-based state, or ML workflow orchestration. The Claude Skills 360 bundle includes multiprocessing skill sets covering n_workers()/parallel_map()/parallel_starmap()/parallel_imap(), pool_submit()/pool_map_progress() futures API, producer_consumer() Queue-based pipeline, SharedCounter shared state, parallel_batch() large-iterable chunking, and Windows-safe freeze_support() patterns. Start with the free tier to try CPU parallelism and multiprocessing pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free