Claude Code for _thread: Python Low-Level Threading Primitives — Claude Skills 360 Blog
Blog / AI / Claude Code for _thread: Python Low-Level Threading Primitives
AI

Claude Code for _thread: Python Low-Level Threading Primitives

Published: January 24, 2029
Read time: 5 min read
By: Claude Skills 360

Python’s _thread module is the low-level threading interface that threading is built on. import _thread. Spawn: _thread.start_new_thread(func, args, kwargs={}) — starts a new OS thread running func(*args, **kwargs); no join, no return value. Primitive lock: lock = _thread.allocate_lock()lock.acquire(), lock.acquire(blocking=True, timeout=-1), lock.release(), lock.locked(). Thread ID: _thread.get_ident() — integer, unique per live thread. Exit current thread: _thread.exit() — raises SystemExit. Interrupt main: _thread.interrupt_main() — raises KeyboardInterrupt in main thread. Stack size: _thread.stack_size(size=0) — get/set in bytes. The module is intentionally minimal: no join, no daemon flag, no thread-local storage, no exception propagation. threading adds all of these. _thread is useful when you need the absolute minimum overhead or are implementing custom synchronisation primitives. Claude Code generates spinlocks, multi-producer queues, background I/O workers, lightweight daemon threads, signal routing helpers, and thread-local storage patterns.

CLAUDE.md for _thread

## _thread Stack
- Stdlib: import _thread
- Spawn:  _thread.start_new_thread(func, args)   # no join
- Lock:   lock = _thread.allocate_lock()
-         lock.acquire() / lock.release() / lock.locked()
-         lock.acquire(blocking=True, timeout=-1)
- ID:     _thread.get_ident()
- Exit:   _thread.exit()          # SystemExit in current thread
- Intr:   _thread.interrupt_main()  # KeyboardInterrupt in main
- Stack:  _thread.stack_size(65536)
- Note:   Use threading for join/daemon/exception handling

_thread Low-Level Threading Pipeline

# app/threadutil.py — spawn, lock, barrier, event, pool, watchdog, interrupt
from __future__ import annotations

import _thread
import time
import os
from collections import deque
from typing import Callable, Any


# ─────────────────────────────────────────────────────────────────────────────
# 1. Minimal spawn helper with error capture
# ─────────────────────────────────────────────────────────────────────────────

_lock_print = _thread.allocate_lock()


def _safe_print(*args: Any) -> None:
    """Thread-safe print."""
    with _lock_print:
        print(*args)


class ThreadError:
    """Mutable container to capture exceptions from worker threads."""
    __slots__ = ("exc", "_lock")

    def __init__(self) -> None:
        self.exc: BaseException | None = None
        self._lock = _thread.allocate_lock()

    def set(self, exc: BaseException) -> None:
        with self._lock:
            if self.exc is None:
                self.exc = exc

    def get(self) -> BaseException | None:
        with self._lock:
            return self.exc


def spawn(func: Callable[..., Any], *args: Any,
          errors: ThreadError | None = None,
          **kwargs: Any) -> None:
    """
    Start a new thread running func(*args, **kwargs).
    If errors is provided, exceptions are captured there rather than crashing.

    Example:
        def worker(n):
            time.sleep(0.1)
            print(f"done {n}")
        spawn(worker, 1)
        spawn(worker, 2)
        time.sleep(0.3)
    """
    def _wrapper():
        try:
            func(*args, **kwargs)
        except SystemExit:
            pass
        except BaseException as e:
            if errors is not None:
                errors.set(e)
            else:
                _safe_print(f"[_thread] unhandled: {e!r}")

    _thread.start_new_thread(_wrapper, ())


# ─────────────────────────────────────────────────────────────────────────────
# 2. Counting semaphore built from primitive locks
# ─────────────────────────────────────────────────────────────────────────────

class Semaphore:
    """
    A counting semaphore built from _thread.allocate_lock().

    Example:
        sem = Semaphore(3)   # allow 3 concurrent
        def task(i):
            sem.acquire()
            try:
                time.sleep(0.1)
            finally:
                sem.release()
        for i in range(10):
            spawn(task, i)
    """

    def __init__(self, count: int = 1) -> None:
        self._count = count
        self._lock = _thread.allocate_lock()
        self._wait = _thread.allocate_lock()
        self._wait.acquire()  # start locked

    def acquire(self) -> None:
        while True:
            with self._lock:
                if self._count > 0:
                    self._count -= 1
                    return
            time.sleep(0.001)   # brief yield

    def release(self) -> None:
        with self._lock:
            self._count += 1

    def __enter__(self) -> "Semaphore":
        self.acquire()
        return self

    def __exit__(self, *_: Any) -> None:
        self.release()


# ─────────────────────────────────────────────────────────────────────────────
# 3. Manual event flag
# ─────────────────────────────────────────────────────────────────────────────

class Event:
    """
    Manual-reset event flag built from primitive locks.

    Example:
        ready = Event()
        def producer():
            time.sleep(0.2)
            ready.set()
        spawn(producer)
        ready.wait()
        print("producer signalled")
    """

    def __init__(self) -> None:
        self._flag = False
        self._lock = _thread.allocate_lock()

    def set(self) -> None:
        with self._lock:
            self._flag = True

    def clear(self) -> None:
        with self._lock:
            self._flag = False

    def is_set(self) -> bool:
        with self._lock:
            return self._flag

    def wait(self, timeout: float | None = None) -> bool:
        deadline = None if timeout is None else time.monotonic() + timeout
        while True:
            if self.is_set():
                return True
            if deadline is not None and time.monotonic() >= deadline:
                return False
            time.sleep(0.002)


# ─────────────────────────────────────────────────────────────────────────────
# 4. Thread-safe bounded queue
# ─────────────────────────────────────────────────────────────────────────────

class BoundedQueue:
    """
    A thread-safe FIFO queue with a maximum size, built on _thread locks.

    Example:
        q = BoundedQueue(maxsize=5)
        spawn(lambda: q.put("hello"))
        print(q.get())
    """

    def __init__(self, maxsize: int = 0) -> None:
        self._buf: deque = deque()
        self._lock = _thread.allocate_lock()
        self._maxsize = maxsize

    def put(self, item: Any, timeout: float = 5.0) -> bool:
        deadline = time.monotonic() + timeout
        while True:
            with self._lock:
                if self._maxsize <= 0 or len(self._buf) < self._maxsize:
                    self._buf.append(item)
                    return True
            if time.monotonic() >= deadline:
                return False
            time.sleep(0.002)

    def get(self, timeout: float = 5.0) -> Any:
        deadline = time.monotonic() + timeout
        while True:
            with self._lock:
                if self._buf:
                    return self._buf.popleft()
            if time.monotonic() >= deadline:
                raise TimeoutError("get() timed out")
            time.sleep(0.002)

    def qsize(self) -> int:
        with self._lock:
            return len(self._buf)

    def empty(self) -> bool:
        return self.qsize() == 0


# ─────────────────────────────────────────────────────────────────────────────
# 5. interrupt_main watchdog
# ─────────────────────────────────────────────────────────────────────────────

def watchdog(timeout_s: float) -> None:
    """
    Start a background thread that calls _thread.interrupt_main() after
    timeout_s seconds. Useful as a hard-kill fallback for stuck processes.

    Example:
        watchdog(10.0)   # raise KeyboardInterrupt in main after 10s if not cancelled
    """
    def _watcher():
        time.sleep(timeout_s)
        _thread.interrupt_main()

    _thread.start_new_thread(_watcher, ())


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== _thread demo ===")
    print(f"  _thread.get_ident() = {_thread.get_ident()}")

    # ── spawn + error capture ─────────────────────────────────────────────────
    print("\n--- spawn ---")
    done = Event()
    results: list[int] = []
    lock_results = _thread.allocate_lock()

    def worker(n: int) -> None:
        time.sleep(0.05 * n)
        with lock_results:
            results.append(n)
        if len(results) >= 3:
            done.set()

    for i in [3, 1, 2]:
        spawn(worker, i)

    done.wait(timeout=2.0)
    print(f"  completion order: {results}")

    # ── Semaphore ─────────────────────────────────────────────────────────────
    print("\n--- Semaphore(2) ---")
    sem = Semaphore(2)
    active_count = [0]
    max_active = [0]
    ac_lock = _thread.allocate_lock()
    sem_done = Event()
    sem_results: list[int] = []

    def sem_worker(i: int) -> None:
        with sem:
            with ac_lock:
                active_count[0] += 1
                max_active[0] = max(max_active[0], active_count[0])
            time.sleep(0.05)
            with ac_lock:
                active_count[0] -= 1
                sem_results.append(i)
                if len(sem_results) >= 5:
                    sem_done.set()

    for i in range(5):
        spawn(sem_worker, i)
    sem_done.wait(timeout=2.0)
    print(f"  max concurrent: {max_active[0]} (limit=2)  ok={max_active[0] <= 2}")

    # ── BoundedQueue ──────────────────────────────────────────────────────────
    print("\n--- BoundedQueue ---")
    q: BoundedQueue = BoundedQueue(maxsize=3)
    for item in ["a", "b", "c"]:
        q.put(item)
    print(f"  qsize={q.qsize()}")
    print(f"  items: {q.get()} {q.get()} {q.get()}")
    print(f"  empty: {q.empty()}")

    print("\n=== done ===")

For the threading stdlib replacement — threading.Thread(target=func, args=args, daemon=True) adds .join(), .is_alive(), .daemon flag, a name, and exception propagation via sys.excepthook — use threading for all production multi-threaded code; _thread lacks join, daemon management, and thread-local storage. For the concurrent.futures.ThreadPoolExecutor alternative — ThreadPoolExecutor(max_workers=8).submit(func, *args) returns a Future with .result(), automatic exception re-raising, and integrated done callbacks — use ThreadPoolExecutor when you need managed worker pools, task submission, futures, and clean shutdown; use _thread only when you need the absolute minimum abstraction or are studying Python internals. The Claude Skills 360 bundle includes _thread skill sets covering ThreadError exception container, spawn() safe spawner, Semaphore counting semaphore, Event manual-reset event flag, BoundedQueue thread-safe FIFO, and watchdog() interrupt-main timeout guard. Start with the free tier to try low-level threading patterns and _thread pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free