Claude Code for concurrent.futures: Python High-Level Concurrency — Claude Skills 360 Blog
Blog / AI / Claude Code for concurrent.futures: Python High-Level Concurrency
AI

Claude Code for concurrent.futures: Python High-Level Concurrency

Published: September 1, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s concurrent.futures module provides high-level thread and process pools via a unified Executor API. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed, wait. ThreadPoolExecutor: ThreadPoolExecutor(max_workers=N) — for I/O-bound work; threads share memory; N defaults to min(32, cpu_count+4). ProcessPoolExecutor: ProcessPoolExecutor(max_workers=N) — for CPU-bound work; each worker is a separate process; avoid pickling issues. submit: future = executor.submit(fn, *args, **kwargs) → Future. map: executor.map(fn, iterable, timeout=None, chunksize=1) → iterator of ordered results; raises the first exception encountered. as_completed: for future in as_completed(futures): — yield futures as they complete (not in submission order). Future API: .result(timeout=None) — block and return result or re-raise; .exception() — return exception or None; .done() — non-blocking check; .cancel() — attempt cancellation (not possible if already running). wait: done, not_done = wait(futures, timeout=None, return_when=ALL_COMPLETED) — also FIRST_COMPLETED, FIRST_EXCEPTION. Context manager: with ThreadPoolExecutor() as ex: — auto-shutdowns and waits on exit. chunksize (ProcessPool only): batch multiple items per subprocess call for better throughput. Claude Code generates parallel downloaders, batch API callers, CPU-parallel data transformers, and scatter-gather pipelines.

CLAUDE.md for concurrent.futures

## concurrent.futures Stack
- Stdlib: from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
-          from concurrent.futures import as_completed, wait, FIRST_COMPLETED
- IO:     with ThreadPoolExecutor(max_workers=16) as ex: futures = [ex.submit(fn, x) for x in items]
- CPU:    with ProcessPoolExecutor() as ex: results = list(ex.map(cpu_fn, items))
- Order:  for future in as_completed(futures): result = future.result()
- Safe:   future.result()  # re-raises exception from worker

concurrent.futures Parallel Pipeline

# app/fututil.py — parallel fetch, retry, scatter-gather, rate-limit, progress
from __future__ import annotations

import time
from concurrent.futures import (
    ALL_COMPLETED,
    FIRST_COMPLETED,
    Future,
    ProcessPoolExecutor,
    ThreadPoolExecutor,
    as_completed,
    wait,
)
from dataclasses import dataclass, field
from typing import Any, Callable, Iterable, Iterator, TypeVar

T = TypeVar("T")
R = TypeVar("R")


# ─────────────────────────────────────────────────────────────────────────────
# 1. Parallel map helpers
# ─────────────────────────────────────────────────────────────────────────────

def parallel_map(
    fn: Callable[[T], R],
    items: Iterable[T],
    max_workers: int = 8,
    use_processes: bool = False,
    timeout: float | None = None,
) -> list[R]:
    """
    Apply fn to each item in parallel; collect results in input order.
    Raises the first exception encountered.

    Example:
        results = parallel_map(fetch_url, urls, max_workers=16)
    """
    Executor = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
    with Executor(max_workers=max_workers) as ex:
        return list(ex.map(fn, items, timeout=timeout))


def parallel_map_safe(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int = 8,
    use_processes: bool = False,
    default: R | None = None,
) -> list[R | None]:
    """
    Apply fn to each item in parallel; replace failures with default.
    Returns a list aligned to items (same order).

    Example:
        results = parallel_map_safe(fetch_url, urls, default=None)
    """
    Executor = ProcessPoolExecutor if use_processes else ThreadPoolExecutor
    with Executor(max_workers=max_workers) as ex:
        futures = [ex.submit(fn, item) for item in items]
    results: list[R | None] = []
    for fut in futures:
        try:
            results.append(fut.result())
        except Exception:
            results.append(default)
    return results


def parallel_filter(
    predicate: Callable[[T], bool],
    items: list[T],
    max_workers: int = 8,
) -> list[T]:
    """
    Return elements of items for which predicate is True, evaluated in parallel.

    Example:
        live_urls = parallel_filter(is_url_reachable, urls, max_workers=20)
    """
    Executor = ThreadPoolExecutor
    with Executor(max_workers=max_workers) as ex:
        futures = {ex.submit(predicate, item): item for item in items}
    return [futures[fut] for fut in futures if fut.result()]


# ─────────────────────────────────────────────────────────────────────────────
# 2. As-completed processing
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class TaskResult:
    item:    Any
    result:  Any
    error:   Exception | None
    elapsed: float

    @property
    def ok(self) -> bool:
        return self.error is None


def run_as_completed(
    fn: Callable[[T], R],
    items: Iterable[T],
    max_workers: int = 8,
    timeout: float | None = None,
) -> Iterator[TaskResult]:
    """
    Submit all items, yield TaskResult as each finishes (completion order).
    Captures exceptions per task rather than failing the whole batch.

    Example:
        for r in run_as_completed(download, urls, max_workers=20):
            if r.ok:
                save(r.result)
            else:
                log_error(r.item, r.error)
    """
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        future_to_item: dict[Future, Any] = {}
        for item in items:
            future_to_item[ex.submit(fn, item)] = item

        for fut in as_completed(future_to_item, timeout=timeout):
            item = future_to_item[fut]
            t0 = time.monotonic()
            try:
                result = fut.result()
                yield TaskResult(item=item, result=result, error=None,
                                 elapsed=time.monotonic() - t0)
            except Exception as exc:
                yield TaskResult(item=item, result=None, error=exc,
                                 elapsed=time.monotonic() - t0)


# ─────────────────────────────────────────────────────────────────────────────
# 3. Batch processing with retry
# ─────────────────────────────────────────────────────────────────────────────

def with_retry(
    fn: Callable[[T], R],
    item: T,
    max_attempts: int = 3,
    backoff: float = 0.5,
    backoff_factor: float = 2.0,
) -> R:
    """
    Call fn(item) with exponential-backoff retry on exception.

    Example:
        result = with_retry(fetch_url, url, max_attempts=4)
    """
    delay = backoff
    last_exc: BaseException | None = None
    for attempt in range(max_attempts):
        try:
            return fn(item)
        except Exception as exc:
            last_exc = exc
            if attempt < max_attempts - 1:
                time.sleep(delay)
                delay *= backoff_factor
    raise last_exc  # type: ignore[misc]


def parallel_with_retry(
    fn: Callable[[T], R],
    items: Iterable[T],
    max_workers: int = 8,
    max_attempts: int = 3,
    backoff: float = 0.5,
) -> list[TaskResult]:
    """
    Run fn on items in parallel, with per-item retry on failure.

    Example:
        results = parallel_with_retry(flaky_api_call, ids, max_attempts=4)
    """
    import functools
    retry_fn = functools.partial(with_retry, fn, max_attempts=max_attempts, backoff=backoff)
    return list(run_as_completed(retry_fn, items, max_workers=max_workers))


# ─────────────────────────────────────────────────────────────────────────────
# 4. Scatter-gather pattern
# ─────────────────────────────────────────────────────────────────────────────

def scatter_gather(
    fns: dict[str, Callable[[], R]],
    max_workers: int | None = None,
    timeout: float | None = None,
) -> dict[str, R]:
    """
    Run multiple named callables in parallel; return {name: result}.
    Raises the first exception on any task.

    Example:
        results = scatter_gather({
            "users":    lambda: db.query("SELECT * FROM users"),
            "products": lambda: db.query("SELECT * FROM products"),
            "stats":    lambda: compute_stats(),
        })
        users = results["users"]
    """
    n = max_workers or len(fns)
    with ThreadPoolExecutor(max_workers=n) as ex:
        future_to_name = {ex.submit(fn): name for name, fn in fns.items()}
        return {
            future_to_name[fut]: fut.result(timeout=timeout)
            for fut in as_completed(future_to_name, timeout=timeout)
        }


def first_success(
    fns: dict[str, Callable[[], R]],
    timeout: float | None = None,
) -> tuple[str, R]:
    """
    Run fns in parallel; return the name and result of the first to succeed.
    Cancels remaining futures.

    Example:
        name, data = first_success({
            "primary":  lambda: fetch("https://primary.api/data"),
            "fallback": lambda: fetch("https://fallback.api/data"),
        })
    """
    with ThreadPoolExecutor(max_workers=len(fns)) as ex:
        future_to_name = {ex.submit(fn): name for name, fn in fns.items()}
        for fut in as_completed(future_to_name, timeout=timeout):
            try:
                result = fut.result()
                # Cancel remaining
                for other in future_to_name:
                    if other is not fut:
                        other.cancel()
                return future_to_name[fut], result
            except Exception:
                continue
    raise RuntimeError("All parallel tasks failed")


# ─────────────────────────────────────────────────────────────────────────────
# 5. CPU-bound process pool helpers
# ─────────────────────────────────────────────────────────────────────────────

def cpu_parallel_map(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int | None = None,
    chunksize: int = 1,
) -> list[R]:
    """
    CPU-parallel map using ProcessPoolExecutor.
    fn and items must be picklable.

    Example:
        results = cpu_parallel_map(compress_image, image_paths, chunksize=4)
    """
    with ProcessPoolExecutor(max_workers=max_workers) as ex:
        return list(ex.map(fn, items, chunksize=chunksize))


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== concurrent.futures demo ===")

    def slow_square(x: int) -> int:
        time.sleep(0.01)   # simulate I/O
        return x * x

    def occasionally_fails(x: int) -> int:
        if x % 3 == 0:
            raise ValueError(f"failed on {x}")
        return x * 2

    print("\n--- parallel_map ---")
    t0 = time.monotonic()
    results = parallel_map(slow_square, range(20), max_workers=10)
    elapsed = time.monotonic() - t0
    print(f"  parallel_map(slow_square, 0..19): {results[:5]}... elapsed={elapsed:.2f}s")

    print("\n--- parallel_map_safe ---")
    safe = parallel_map_safe(occasionally_fails, list(range(10)), default=-1)
    print(f"  safe results: {safe}")

    print("\n--- run_as_completed ---")
    successes = failures = 0
    for r in run_as_completed(occasionally_fails, range(9), max_workers=4):
        if r.ok:
            successes += 1
        else:
            failures += 1
    print(f"  successes={successes}  failures={failures}")

    print("\n--- scatter_gather ---")
    results_sg = scatter_gather({
        "squares": lambda: [x**2 for x in range(5)],
        "cubes":   lambda: [x**3 for x in range(5)],
        "sum100":  lambda: sum(range(100)),
    })
    for name, val in sorted(results_sg.items()):
        print(f"  {name}: {val}")

    print("\n--- parallel_with_retry ---")
    call_counts: dict[int, int] = {}

    def flaky(x: int) -> int:
        call_counts[x] = call_counts.get(x, 0) + 1
        if call_counts[x] < 2:
            raise RuntimeError(f"transient error for {x}")
        return x * 10

    retry_results = parallel_with_retry(flaky, [1, 2, 3], max_attempts=3, backoff=0.01)
    for r in sorted(retry_results, key=lambda r: r.item):
        print(f"  item={r.item}  ok={r.ok}  result={r.result}  attempts={call_counts.get(r.item, '?')}")

    print("\n=== done ===")

For the asyncio alternative — asyncio provides cooperative concurrency via coroutines and event loops; it is ideal for managing thousands of concurrent I/O-bound tasks on a single thread (HTTP, websocket, database); ThreadPoolExecutor creates OS threads which can run truly in parallel for blocking I/O calls that don’t support async — use asyncio with an async HTTP library (aiohttp, httpx) for high-concurrency network I/O in a new codebase; use ThreadPoolExecutor to parallelize calls to blocking (non-async) client libraries without rewriting them. For the joblib alternative — joblib (PyPI) wraps multiprocessing with memory-mapped array sharing, intelligent joblib caching (Memory), and the Parallel / delayed API used by scikit-learn; it handles array sharing between processes more efficiently than pickling — use joblib for scientific computing pipelines, machine learning preprocessing, and any workflow that passes large NumPy arrays between workers; use ProcessPoolExecutor for general-purpose CPU-bound Python code that doesn’t involve large NumPy arrays. The Claude Skills 360 bundle includes concurrent.futures skill sets covering parallel_map()/parallel_map_safe()/parallel_filter() batch helpers, TaskResult dataclass with run_as_completed() iterator, with_retry()/parallel_with_retry() retry helpers, scatter_gather()/first_success() fan-out patterns, and cpu_parallel_map() process pool wrapper. Start with the free tier to try high-level concurrency patterns and concurrent.futures pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free