Claude Code for threading: I/O-Bound Concurrency in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for threading: I/O-Bound Concurrency in Python
AI

Claude Code for threading: I/O-Bound Concurrency in Python

Published: June 28, 2028
Read time: 5 min read
By: Claude Skills 360

Python threading runs multiple threads sharing one GIL — ideal for I/O-bound work (network, disk, database). import threading. Thread: t = threading.Thread(target=fn, args=(x,), daemon=True); t.start(); t.join(). name: threading.current_thread().name. active_count: threading.active_count(). enumerate: threading.enumerate(). Lock: lock = threading.Lock(); with lock: shared_var += 1. RLock: threading.RLock() — reentrant (same thread may acquire multiple times). Event: e = threading.Event(); e.set(); e.wait(); e.clear(); e.is_set(). Condition: c = threading.Condition(); with c: c.wait(); c.notify_all(). Semaphore: sem = threading.Semaphore(5) — limit concurrent access. Timer: t = threading.Timer(3.0, callback); t.start(); t.cancel(). Barrier: b = threading.Barrier(n); b.wait(). local: data = threading.local(); data.user_id = 42 — per-thread storage. Queue: from queue import Queue; q = Queue(maxsize=100); q.put(item); item = q.get(); q.task_done(); q.join(). PriorityQueue: from queue import PriorityQueue. ThreadPoolExecutor: from concurrent.futures import ThreadPoolExecutor, as_completed; with ThreadPoolExecutor(max_workers=8) as ex: futures = [ex.submit(fn, x) for x in items]. map: results = list(ex.map(fn, items, timeout=30)). wait: from concurrent.futures import wait, ALL_COMPLETED. daemon: t.daemon = True — dies when main exits. join(timeout): t.join(timeout=5.0). is_alive: t.is_alive(). Claude Code generates thread pools, producer-consumer pipelines, rate limiters, and I/O worker patterns.

CLAUDE.md for threading

## threading Stack
- Stdlib: import threading | from concurrent.futures import ThreadPoolExecutor, as_completed
- Pool: ThreadPoolExecutor(max_workers=N) | ex.map(fn, items) | [ex.submit(fn,x) for x in items]
- Sync: threading.Lock() | threading.Event() | threading.Semaphore(N) | queue.Queue(maxsize=100)
- Safe: always use Lock/Queue for shared state — never bare shared variables across threads
- I/O only: for CPU-bound work use multiprocessing instead (GIL blocks CPU parallelism)

threading Concurrency Pipeline

# app/threads.py — ThreadPoolExecutor, Lock, Event, Queue, rate limiter, worker pool
from __future__ import annotations

import logging
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Callable, Generator, Iterator, TypeVar

log = logging.getLogger(__name__)
T   = TypeVar("T")
R   = TypeVar("R")


# ─────────────────────────────────────────────────────────────────────────────
# 1. ThreadPoolExecutor helpers
# ─────────────────────────────────────────────────────────────────────────────

def thread_map(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int = 8,
    timeout: float | None = None,
) -> list[R]:
    """
    Run fn(item) in a thread pool; return results in order.

    Example:
        pages = thread_map(fetch_url, urls, max_workers=16)
    """
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        return list(ex.map(fn, items, timeout=timeout))


def thread_submit(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int = 8,
    return_exceptions: bool = False,
    timeout: float | None = None,
) -> list[R]:
    """
    Submit all items; collect results in original order.
    If return_exceptions=True, exceptions are placed in results instead of raised.

    Example:
        results = thread_submit(download_file, paths, max_workers=4)
    """
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(fn, item): i for i, item in enumerate(items)}
        ordered: dict[int, Any] = {}

        for future in as_completed(futures, timeout=timeout):
            idx = futures[future]
            try:
                ordered[idx] = future.result()
            except Exception as exc:
                if return_exceptions:
                    ordered[idx] = exc
                else:
                    raise

    return [ordered[i] for i in range(len(items))]


def thread_map_progress(
    fn: Callable[[T], R],
    items: list[T],
    max_workers: int = 8,
    on_done: Callable[[int, int, R], None] | None = None,
) -> list[R]:
    """
    Submit all items; call on_done(completed, total, result) as each finishes.

    Example:
        def show(done, total, result):
            print(f"  {done}/{total} done")
        pages = thread_map_progress(fetch, urls, on_done=show)
    """
    total   = len(items)
    results: dict[int, R] = {}

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(fn, item): i for i, item in enumerate(items)}
        done_count = 0
        for future in as_completed(futures):
            idx    = futures[future]
            result = future.result()
            results[idx] = result
            done_count   += 1
            if on_done:
                on_done(done_count, total, result)

    return [results[i] for i in range(total)]


# ─────────────────────────────────────────────────────────────────────────────
# 2. Producer–consumer with queue.Queue
# ─────────────────────────────────────────────────────────────────────────────

_STOP = object()  # sentinel


def _worker(task_q: queue.Queue, result_q: queue.Queue, fn: Callable) -> None:
    while True:
        item = task_q.get()
        if item is _STOP:
            task_q.task_done()
            break
        try:
            result = fn(item)
            result_q.put((item, result, None))
        except Exception as exc:
            result_q.put((item, None, exc))
        finally:
            task_q.task_done()


def producer_consumer(
    fn: Callable[[T], R],
    items: list[T],
    n_workers: int = 8,
) -> list[tuple[T, R | None, Exception | None]]:
    """
    Feed items into a Queue; n_workers threads drain it.
    Returns [(item, result, exc)] for each item.

    Example:
        results = producer_consumer(parse_record, records, n_workers=4)
        errors  = [(item, exc) for item, _, exc in results if exc]
    """
    task_q   = queue.Queue()
    result_q = queue.Queue()

    threads = [
        threading.Thread(target=_worker, args=(task_q, result_q, fn), daemon=True)
        for _ in range(n_workers)
    ]
    for t in threads:
        t.start()

    for item in items:
        task_q.put(item)
    for _ in range(n_workers):
        task_q.put(_STOP)

    task_q.join()

    outputs = []
    while not result_q.empty():
        outputs.append(result_q.get_nowait())

    return outputs


# ─────────────────────────────────────────────────────────────────────────────
# 3. Rate limiter
# ─────────────────────────────────────────────────────────────────────────────

class RateLimiter:
    """
    Token-bucket rate limiter, thread-safe.

    Example:
        limiter = RateLimiter(rate=10, per=1.0)   # 10 calls/second
        with limiter:
            response = session.get(url)
    """

    def __init__(self, rate: int, per: float = 1.0) -> None:
        self._rate      = rate
        self._per       = per
        self._tokens    = float(rate)
        self._last      = time.monotonic()
        self._lock      = threading.Lock()

    def __enter__(self) -> None:
        self.acquire()

    def __exit__(self, *_) -> None:
        pass

    def acquire(self) -> None:
        with self._lock:
            now = time.monotonic()
            elapsed = now - self._last
            self._tokens += elapsed * (self._rate / self._per)
            self._tokens = min(self._tokens, float(self._rate))
            self._last = now
            if self._tokens < 1.0:
                sleep_for = (1.0 - self._tokens) / (self._rate / self._per)
                time.sleep(sleep_for)
                self._tokens = 0.0
            else:
                self._tokens -= 1.0


# ─────────────────────────────────────────────────────────────────────────────
# 4. Shared state helpers
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class AtomicCounter:
    """
    Thread-safe integer counter.

    Example:
        counter = AtomicCounter()
        # In threads: counter.increment()
        # In main:    print(counter.value)
    """
    _value: int = field(default=0, init=False)
    _lock:  threading.Lock = field(default_factory=threading.Lock, init=False)

    def increment(self, by: int = 1) -> int:
        with self._lock:
            self._value += by
            return self._value

    def reset(self) -> None:
        with self._lock:
            self._value = 0

    @property
    def value(self) -> int:
        with self._lock:
            return self._value


class ThreadSafeDict:
    """
    Dict protected by a RLock.

    Example:
        cache = ThreadSafeDict()
        cache["key"] = "value"
        val = cache.get("key")
    """

    def __init__(self) -> None:
        self._data: dict = {}
        self._lock = threading.RLock()

    def __setitem__(self, key, val) -> None:
        with self._lock:
            self._data[key] = val

    def __getitem__(self, key):
        with self._lock:
            return self._data[key]

    def get(self, key, default=None):
        with self._lock:
            return self._data.get(key, default)

    def __contains__(self, key) -> bool:
        with self._lock:
            return key in self._data

    def items(self):
        with self._lock:
            return list(self._data.items())

    def __len__(self) -> int:
        with self._lock:
            return len(self._data)


# ─────────────────────────────────────────────────────────────────────────────
# 5. Background worker / daemon thread
# ─────────────────────────────────────────────────────────────────────────────

class BackgroundWorker:
    """
    Daemon thread that processes tasks from a queue until stopped.

    Example:
        worker = BackgroundWorker(process_event)
        worker.start()
        worker.submit(event_data)
        worker.stop(wait=True)
    """

    def __init__(self, fn: Callable, maxsize: int = 0) -> None:
        self._fn        = fn
        self._queue:    queue.Queue = queue.Queue(maxsize=maxsize)
        self._stop_evt  = threading.Event()
        self._thread    = threading.Thread(target=self._run, daemon=True)

    def start(self) -> None:
        self._thread.start()

    def submit(self, item: Any) -> None:
        if not self._stop_evt.is_set():
            self._queue.put(item)

    def stop(self, wait: bool = True) -> None:
        self._stop_evt.set()
        self._queue.put(_STOP)
        if wait:
            self._thread.join()

    def _run(self) -> None:
        while True:
            item = self._queue.get()
            if item is _STOP:
                break
            try:
                self._fn(item)
            except Exception:
                log.exception("BackgroundWorker error processing %r", item)
            finally:
                self._queue.task_done()


# ─────────────────────────────────────────────────────────────────────────────
# 6. Thread-local request context
# ─────────────────────────────────────────────────────────────────────────────

_local = threading.local()


def set_request_id(request_id: str) -> None:
    """
    Store a request ID in thread-local storage.

    Example:
        set_request_id("req-abc123")
        log.info("Processing: %s", get_request_id())
    """
    _local.request_id = request_id


def get_request_id() -> str:
    return getattr(_local, "request_id", "unknown")


@contextmanager
def request_context(request_id: str) -> Generator[None, None, None]:
    """
    Context manager to bind a request_id to the current thread.

    Example:
        with request_context("req-xyz"):
            process_request()
    """
    set_request_id(request_id)
    try:
        yield
    finally:
        _local.request_id = "unknown"


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== threading demo ===")

    print(f"\n  Active threads at start: {threading.active_count()}")

    # 1. thread_map
    def slow_square(x: int) -> int:
        time.sleep(0.02)
        return x * x

    import time as _time
    t0 = _time.perf_counter()
    results = thread_map(slow_square, list(range(10)), max_workers=5)
    elapsed = _time.perf_counter() - t0
    print(f"\n--- thread_map (10 items, 5 workers) ---")
    print(f"  results: {results}")
    print(f"  elapsed: {elapsed:.3f}s (sequential would be ~0.20s)")

    # 2. producer_consumer
    print("\n--- producer_consumer ---")
    pq = producer_consumer(slow_square, list(range(6)), n_workers=3)
    print(f"  {sorted((item, res) for item, res, exc in pq if exc is None)}")

    # 3. AtomicCounter
    print("\n--- AtomicCounter ---")
    counter = AtomicCounter()
    threads = [threading.Thread(target=lambda: counter.increment()) for _ in range(100)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f"  100 threads incremented: counter={counter.value}")

    # 4. ThreadSafeDict
    print("\n--- ThreadSafeDict ---")
    cache = ThreadSafeDict()
    def fill(i):
        cache[f"k{i}"] = i * 2
    threads2 = [threading.Thread(target=fill, args=(i,)) for i in range(10)]
    for t in threads2:
        t.start()
    for t in threads2:
        t.join()
    print(f"  {len(cache)} keys written")

    # 5. BackgroundWorker
    print("\n--- BackgroundWorker ---")
    received = []
    worker = BackgroundWorker(received.append)
    worker.start()
    for i in range(5):
        worker.submit(f"event-{i}")
    time.sleep(0.05)
    worker.stop(wait=True)
    print(f"  processed: {received}")

    # 6. thread-local context
    print("\n--- request_context ---")
    def handle_request(req_id):
        with request_context(req_id):
            rid = get_request_id()
            results_list.append(rid)

    results_list: list[str] = []
    threads3 = [threading.Thread(target=handle_request, args=(f"req-{i}",)) for i in range(5)]
    for t in threads3:
        t.start()
    for t in threads3:
        t.join()
    print(f"  thread-local IDs captured: {sorted(results_list)}")

    print("\n=== done ===")

For the asyncio alternative — asyncio uses a single-threaded cooperative event loop where coroutines yield control at await points, making it highly scalable for thousands of concurrent I/O connections without OS thread overhead; Python threading creates real OS threads that can run I/O in parallel (GIL released during I/O syscalls) but have higher per-thread memory cost (~8 MB stack) — use asyncio when building servers, APIs, or any code that is async/await-native from top to bottom (aiohttp, FastAPI, httpx), threading when integrating with synchronous libraries (requests, psycopg2, boto3) or adding concurrency to an existing sync codebase. For the concurrent.futures alternative — concurrent.futures.ThreadPoolExecutor provides Future objects, as_completed() for unordered result streams, and a unified interface shared with ProcessPoolExecutor; raw threading.Thread gives lower-level control including custom synchronization, daemon threads, thread-local storage, and fine-grained lifecycle management — use ThreadPoolExecutor for most new code wanting clean future-based control flow, threading primitives when you need explicit locks, events, barriers, or long-lived daemon threads. The Claude Skills 360 bundle includes threading skill sets covering thread_map()/thread_submit()/thread_map_progress() pool helpers, producer_consumer() Queue-based pipeline, RateLimiter token bucket, AtomicCounter/ThreadSafeDict shared state, BackgroundWorker daemon, and request_context() thread-local storage. Start with the free tier to try I/O concurrency and threading pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free