Claude Code for queue: Thread-Safe Queues in Python — Claude Skills 360 Blog
Blog / AI / Claude Code for queue: Thread-Safe Queues in Python
AI

Claude Code for queue: Thread-Safe Queues in Python

Published: August 3, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s queue module provides thread-safe FIFO, LIFO, and priority queues for concurrent programming. import queue. Queue: q = queue.Queue(maxsize=0) — FIFO; maxsize=0 means unbounded. LifoQueue: queue.LifoQueue() — LIFO (stack). PriorityQueue: queue.PriorityQueue() — min-heap; items should be (priority, value) tuples. SimpleQueue: queue.SimpleQueue() — lightweight, no maxsize, no task_done/join. put: q.put(item, block=True, timeout=None) — add; raises Full if bounded and timeout exceeded. get: q.get(block=True, timeout=None) — remove and return; raises Empty on timeout. put_nowait/get_nowait: non-blocking; raise Full/Empty immediately. task_done: q.task_done() — signal that a gotten item has been processed. join: q.join() — block until all items have had task_done called. qsize: q.qsize() — approximate size (not reliable for flow control). empty/full: q.empty() / q.full() — approximate; don’t use for logic. Empty/Full: queue.Empty, queue.Full exceptions. asyncio.Queue: asyncio.Queue(maxsize=0) — same API but await q.put() / await q.get(). asyncio.PriorityQueue / LifoQueue: async variants. Claude Code generates worker pools, pipeline stages, log aggregators, retry queues, and async producer-consumer systems.

CLAUDE.md for queue

## queue Stack
- Stdlib: import queue
- FIFO:       q = queue.Queue(maxsize=100)   # bounded with backpressure
- Priority:   q = queue.PriorityQueue()       # (priority_int, item) tuples
- Worker:     threading.Thread(target=worker, daemon=True).start()
- task_done:  q.task_done() after each item processed; q.join() to await drain
- Async:      asyncio.Queue / asyncio.PriorityQueue for async pipelines

queue Producer-Consumer Pipeline

# app/queueutil.py — worker pool, pipeline, priority queue, async queue
from __future__ import annotations

import asyncio
import queue
import threading
import time
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar

T = TypeVar("T")
R = TypeVar("R")


# ─────────────────────────────────────────────────────────────────────────────
# 1. Worker pool
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class WorkResult(Generic[T]):
    item:      Any
    result:    T       | None = None
    error:     Exception | None = None
    ok:        bool           = True


class WorkerPool(Generic[T, R]):
    """
    A fixed-size thread pool driven by a bounded Queue.
    Submit items; collect WorkResult objects from the results queue.

    Example:
        def process(url: str) -> str:
            return fetch(url)

        pool = WorkerPool(process, workers=4, maxsize=50)
        pool.start()
        for url in urls:
            pool.submit(url)
        pool.stop()
        for result in pool.drain():
            if result.ok:
                store(result.result)
    """

    def __init__(
        self,
        fn:      Callable[[Any], R],
        workers: int = 4,
        maxsize: int = 0,
    ) -> None:
        self._fn       = fn
        self._workers  = workers
        self._work_q:    queue.Queue = queue.Queue(maxsize=maxsize)
        self._result_q:  queue.Queue = queue.Queue()
        self._threads:   list[threading.Thread] = []
        self._stopped    = threading.Event()

    def start(self) -> "WorkerPool":
        for _ in range(self._workers):
            t = threading.Thread(target=self._worker_loop, daemon=True)
            t.start()
            self._threads.append(t)
        return self

    def _worker_loop(self) -> None:
        while True:
            try:
                item = self._work_q.get(timeout=0.1)
            except queue.Empty:
                if self._stopped.is_set():
                    break
                continue
            try:
                result = self._fn(item)
                self._result_q.put(WorkResult(item=item, result=result, ok=True))
            except Exception as exc:
                self._result_q.put(WorkResult(item=item, error=exc, ok=False))
            finally:
                self._work_q.task_done()

    def submit(self, item: Any, timeout: float | None = None) -> None:
        """Submit work item; blocks if queue is full (backpressure)."""
        self._work_q.put(item, timeout=timeout)

    def wait_drain(self) -> None:
        """Block until all submitted items have been processed."""
        self._work_q.join()

    def stop(self, wait: bool = True) -> None:
        """Signal workers to stop; optionally wait for them to finish."""
        self._stopped.set()
        if wait:
            for t in self._threads:
                t.join()

    def drain(self) -> Iterator[WorkResult]:
        """Yield all pending results without blocking."""
        while True:
            try:
                yield self._result_q.get_nowait()
            except queue.Empty:
                break

    def __enter__(self) -> "WorkerPool":
        return self.start()

    def __exit__(self, *_: Any) -> None:
        self.stop()


# ─────────────────────────────────────────────────────────────────────────────
# 2. Pipeline of queues (multi-stage)
# ─────────────────────────────────────────────────────────────────────────────

class Stage:
    """One stage in a multi-stage processing pipeline."""

    def __init__(
        self,
        name:    str,
        fn:      Callable[[Any], Any],
        workers: int = 1,
        maxsize: int = 0,
    ) -> None:
        self.name       = name
        self._fn        = fn
        self._workers   = workers
        self._in_q:     queue.Queue = queue.Queue(maxsize=maxsize)
        self._out_q:    queue.Queue | None = None
        self._threads:  list[threading.Thread] = []
        self._sentinel  = object()

    def _loop(self) -> None:
        while True:
            item = self._in_q.get()
            if item is self._sentinel:
                self._in_q.task_done()
                if self._out_q is not None:
                    self._out_q.put(self._sentinel)
                break
            try:
                result = self._fn(item)
                if self._out_q is not None:
                    self._out_q.put(result)
            except Exception:
                pass
            finally:
                self._in_q.task_done()

    def start(self, out_q: "queue.Queue | None" = None) -> None:
        self._out_q = out_q
        for _ in range(self._workers):
            t = threading.Thread(target=self._loop, daemon=True)
            t.start()
            self._threads.append(t)

    def put(self, item: Any) -> None:
        self._in_q.put(item)

    def close(self) -> None:
        self._in_q.put(self._sentinel)
        for t in self._threads:
            t.join()


def run_pipeline(
    stages: list[Stage],
    items: list[Any],
) -> list[Any]:
    """
    Wire stages together and run items through the pipeline.
    Returns items from the final queue.

    Example:
        results = run_pipeline(
            stages=[
                Stage("parse", parse_line),
                Stage("validate", validate_record),
                Stage("enrich", enrich_record, workers=2),
            ],
            items=raw_lines,
        )
    """
    # Wire: each stage's out_q is the next stage's in_q
    # Last stage writes to result_q
    result_q: queue.Queue = queue.Queue()
    out_queues = [s._in_q for s in stages[1:]] + [result_q]
    for stage, out_q in zip(stages, out_queues):
        stage.start(out_q=out_q)

    # Feed items to first stage
    for item in items:
        stages[0].put(item)
    stages[0].close()

    # Close cascade: each stage's sentinel closes the next
    for stage in stages[1:]:
        stage.close()

    # Collect results
    results = []
    while True:
        try:
            results.append(result_q.get(timeout=5.0))
        except queue.Empty:
            break
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 3. Priority queue dispatcher
# ─────────────────────────────────────────────────────────────────────────────

@dataclass(order=True)
class PrioritizedTask:
    priority: int
    item:     Any = field(compare=False)


class PriorityDispatcher:
    """
    Dispatch tasks by integer priority (lower = higher priority).

    Example:
        disp = PriorityDispatcher(workers=2)
        disp.start()
        disp.submit("low-priority report", priority=10)
        disp.submit("critical alert",      priority=1)
        disp.stop()
    """

    def __init__(self, fn: Callable = lambda x: x, workers: int = 2) -> None:
        self._fn     = fn
        self._q:     queue.PriorityQueue = queue.PriorityQueue()
        self._workers = workers
        self._threads: list[threading.Thread] = []
        self._done    = threading.Event()
        self._results: list[WorkResult] = []

    def start(self) -> "PriorityDispatcher":
        for _ in range(self._workers):
            t = threading.Thread(target=self._loop, daemon=True)
            t.start()
            self._threads.append(t)
        return self

    def _loop(self) -> None:
        while True:
            try:
                task: PrioritizedTask = self._q.get(timeout=0.1)
            except queue.Empty:
                if self._done.is_set():
                    break
                continue
            try:
                r = self._fn(task.item)
                self._results.append(WorkResult(item=task.item, result=r, ok=True))
            except Exception as exc:
                self._results.append(WorkResult(item=task.item, error=exc, ok=False))
            finally:
                self._q.task_done()

    def submit(self, item: Any, priority: int = 5) -> None:
        self._q.put(PrioritizedTask(priority=priority, item=item))

    def stop(self, wait: bool = True) -> list[WorkResult]:
        self._q.join()
        self._done.set()
        if wait:
            for t in self._threads:
                t.join()
        return list(self._results)


# ─────────────────────────────────────────────────────────────────────────────
# 4. Retry queue
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class RetryItem:
    payload:   Any
    attempts:  int = 0
    last_error: str = ""


def run_with_retry(
    fn:           Callable[[Any], Any],
    items:        list[Any],
    max_attempts: int = 3,
    delay:        float = 0.0,
) -> tuple[list[WorkResult], list[RetryItem]]:
    """
    Process items with a retry queue; items that fail are requeued up to max_attempts.
    Returns (successes, permanent_failures).

    Example:
        ok, failed = run_with_retry(post_to_api, payloads, max_attempts=3, delay=0.5)
    """
    q: queue.Queue = queue.Queue()
    for item in items:
        q.put(RetryItem(payload=item))

    successes: list[WorkResult]  = []
    failures:  list[RetryItem]   = []

    while not q.empty():
        try:
            ri = q.get_nowait()
        except queue.Empty:
            break
        ri.attempts += 1
        try:
            result = fn(ri.payload)
            successes.append(WorkResult(item=ri.payload, result=result, ok=True))
        except Exception as exc:
            ri.last_error = str(exc)
            if ri.attempts < max_attempts:
                if delay > 0:
                    time.sleep(delay)
                q.put(ri)
            else:
                failures.append(ri)
        finally:
            q.task_done()

    return successes, failures


# ─────────────────────────────────────────────────────────────────────────────
# 5. Async queue producer-consumer
# ─────────────────────────────────────────────────────────────────────────────

async def async_worker_pool(
    items:   list[Any],
    fn:      Callable[[Any], Any],
    workers: int = 4,
    maxsize: int = 0,
) -> list[WorkResult]:
    """
    Async producer-consumer: put items into asyncio.Queue, process with coroutines.

    Example:
        async def fetch(url): ...
        results = await async_worker_pool(urls, fetch, workers=10)
    """
    q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
    results: list[WorkResult] = []
    lock = asyncio.Lock()

    async def worker() -> None:
        while True:
            item = await q.get()
            if item is None:
                q.task_done()
                break
            try:
                r = await asyncio.coroutine(fn)(item) if asyncio.iscoroutinefunction(fn) else fn(item)
                async with lock:
                    results.append(WorkResult(item=item, result=r, ok=True))
            except Exception as exc:
                async with lock:
                    results.append(WorkResult(item=item, error=exc, ok=False))
            finally:
                q.task_done()

    # Start workers
    tasks = [asyncio.create_task(worker()) for _ in range(workers)]

    # Produce
    for item in items:
        await q.put(item)

    # Poison pills
    for _ in range(workers):
        await q.put(None)

    await q.join()
    await asyncio.gather(*tasks)
    return results


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== queue demo ===")

    print("\n--- WorkerPool ---")
    import math

    def heavy(n: int) -> float:
        return math.sqrt(n) * math.log(n + 1)

    with WorkerPool(heavy, workers=4, maxsize=20) as pool:
        for i in range(1, 21):
            pool.submit(i)
        pool.wait_drain()
        results = list(pool.drain())

    ok_count = sum(1 for r in results if r.ok)
    print(f"  processed {len(results)} items, {ok_count} ok")

    print("\n--- Pipeline ---")
    stages = [
        Stage("parse",    lambda s: int(s.strip()),         workers=1),
        Stage("double",   lambda x: x * 2,                  workers=2),
        Stage("to_str",   lambda x: f"result:{x}",          workers=1),
    ]
    output = run_pipeline(stages, ["1", "2", "3", "4", "5"])
    print(f"  pipeline output (unsorted): {sorted(output)}")

    print("\n--- PriorityDispatcher ---")
    order: list[int] = []

    def record(x: int) -> int:
        order.append(x)
        return x

    disp = PriorityDispatcher(fn=record, workers=1)
    disp.start()
    for pri, val in [(10, 100), (1, 1), (5, 50), (2, 20), (3, 30)]:
        disp.submit(val, priority=pri)
    disp.stop()
    print(f"  processed order: {order}  (lower priority = processed first)")

    print("\n--- run_with_retry ---")
    call_counts: dict[int, int] = {}

    def flaky(n: int) -> int:
        call_counts[n] = call_counts.get(n, 0) + 1
        if call_counts[n] < 2:
            raise RuntimeError(f"item {n} failed attempt {call_counts[n]}")
        return n * 10

    ok_res, fail_res = run_with_retry(flaky, [1, 2, 3], max_attempts=3)
    print(f"  successes: {[r.result for r in ok_res]}")
    print(f"  failures: {[f.payload for f in fail_res]}")

    print("\n--- SimpleQueue ---")
    sq: queue.SimpleQueue = queue.SimpleQueue()
    for v in range(5):
        sq.put(v)
    drained = []
    while not sq.empty():
        drained.append(sq.get_nowait())
    print(f"  SimpleQueue drain: {drained}")

    print("\n=== done ===")

For the collections.deque alternative — collections.deque offers O(1) append/popleft with maxlen for bounded circular buffers but is not thread-safe for concurrent producers and consumers without an external lock; queue.Queue builds a deque internally and wraps it with a threading.Condition for safe multi-thread access — use deque in single-threaded code or within a single thread for fast FIFO operations; use queue.Queue whenever multiple threads share a queue. For the multiprocessing.Queue alternative — multiprocessing.Queue bridges separate OS processes using a pipe + serialization (pickle) and provides the same put/get/task_done/join API; queue.Queue operates within a single process (shared memory) — use multiprocessing.Queue or multiprocessing.JoinableQueue when workers run in separate processes to bypass the GIL for CPU-bound work; use queue.Queue for I/O-bound threading within one process. The Claude Skills 360 bundle includes queue skill sets covering WorkerPool generic multi-thread pool with WorkResult and drain(), Stage/run_pipeline() multi-stage pipeline wiring, PriorityDispatcher with PrioritizedTask dataclass, run_with_retry() retry queue with configurable delays, and async_worker_pool() asyncio producer-consumer pattern. Start with the free tier to try concurrent work dispatch and queue pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free