Python’s queue module provides thread-safe FIFO, LIFO, and priority queues for concurrent programming. import queue. Queue: q = queue.Queue(maxsize=0) — FIFO; maxsize=0 means unbounded. LifoQueue: queue.LifoQueue() — LIFO (stack). PriorityQueue: queue.PriorityQueue() — min-heap; items should be (priority, value) tuples. SimpleQueue: queue.SimpleQueue() — lightweight, no maxsize, no task_done/join. put: q.put(item, block=True, timeout=None) — add; raises Full if bounded and timeout exceeded. get: q.get(block=True, timeout=None) — remove and return; raises Empty on timeout. put_nowait/get_nowait: non-blocking; raise Full/Empty immediately. task_done: q.task_done() — signal that a gotten item has been processed. join: q.join() — block until all items have had task_done called. qsize: q.qsize() — approximate size (not reliable for flow control). empty/full: q.empty() / q.full() — approximate; don’t use for logic. Empty/Full: queue.Empty, queue.Full exceptions. asyncio.Queue: asyncio.Queue(maxsize=0) — same API but await q.put() / await q.get(). asyncio.PriorityQueue / LifoQueue: async variants. Claude Code generates worker pools, pipeline stages, log aggregators, retry queues, and async producer-consumer systems.
CLAUDE.md for queue
## queue Stack
- Stdlib: import queue
- FIFO: q = queue.Queue(maxsize=100) # bounded with backpressure
- Priority: q = queue.PriorityQueue() # (priority_int, item) tuples
- Worker: threading.Thread(target=worker, daemon=True).start()
- task_done: q.task_done() after each item processed; q.join() to await drain
- Async: asyncio.Queue / asyncio.PriorityQueue for async pipelines
queue Producer-Consumer Pipeline
# app/queueutil.py — worker pool, pipeline, priority queue, async queue
from __future__ import annotations
import asyncio
import queue
import threading
import time
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar
T = TypeVar("T")
R = TypeVar("R")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Worker pool
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class WorkResult(Generic[T]):
item: Any
result: T | None = None
error: Exception | None = None
ok: bool = True
class WorkerPool(Generic[T, R]):
"""
A fixed-size thread pool driven by a bounded Queue.
Submit items; collect WorkResult objects from the results queue.
Example:
def process(url: str) -> str:
return fetch(url)
pool = WorkerPool(process, workers=4, maxsize=50)
pool.start()
for url in urls:
pool.submit(url)
pool.stop()
for result in pool.drain():
if result.ok:
store(result.result)
"""
def __init__(
self,
fn: Callable[[Any], R],
workers: int = 4,
maxsize: int = 0,
) -> None:
self._fn = fn
self._workers = workers
self._work_q: queue.Queue = queue.Queue(maxsize=maxsize)
self._result_q: queue.Queue = queue.Queue()
self._threads: list[threading.Thread] = []
self._stopped = threading.Event()
def start(self) -> "WorkerPool":
for _ in range(self._workers):
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self._threads.append(t)
return self
def _worker_loop(self) -> None:
while True:
try:
item = self._work_q.get(timeout=0.1)
except queue.Empty:
if self._stopped.is_set():
break
continue
try:
result = self._fn(item)
self._result_q.put(WorkResult(item=item, result=result, ok=True))
except Exception as exc:
self._result_q.put(WorkResult(item=item, error=exc, ok=False))
finally:
self._work_q.task_done()
def submit(self, item: Any, timeout: float | None = None) -> None:
"""Submit work item; blocks if queue is full (backpressure)."""
self._work_q.put(item, timeout=timeout)
def wait_drain(self) -> None:
"""Block until all submitted items have been processed."""
self._work_q.join()
def stop(self, wait: bool = True) -> None:
"""Signal workers to stop; optionally wait for them to finish."""
self._stopped.set()
if wait:
for t in self._threads:
t.join()
def drain(self) -> Iterator[WorkResult]:
"""Yield all pending results without blocking."""
while True:
try:
yield self._result_q.get_nowait()
except queue.Empty:
break
def __enter__(self) -> "WorkerPool":
return self.start()
def __exit__(self, *_: Any) -> None:
self.stop()
# ─────────────────────────────────────────────────────────────────────────────
# 2. Pipeline of queues (multi-stage)
# ─────────────────────────────────────────────────────────────────────────────
class Stage:
"""One stage in a multi-stage processing pipeline."""
def __init__(
self,
name: str,
fn: Callable[[Any], Any],
workers: int = 1,
maxsize: int = 0,
) -> None:
self.name = name
self._fn = fn
self._workers = workers
self._in_q: queue.Queue = queue.Queue(maxsize=maxsize)
self._out_q: queue.Queue | None = None
self._threads: list[threading.Thread] = []
self._sentinel = object()
def _loop(self) -> None:
while True:
item = self._in_q.get()
if item is self._sentinel:
self._in_q.task_done()
if self._out_q is not None:
self._out_q.put(self._sentinel)
break
try:
result = self._fn(item)
if self._out_q is not None:
self._out_q.put(result)
except Exception:
pass
finally:
self._in_q.task_done()
def start(self, out_q: "queue.Queue | None" = None) -> None:
self._out_q = out_q
for _ in range(self._workers):
t = threading.Thread(target=self._loop, daemon=True)
t.start()
self._threads.append(t)
def put(self, item: Any) -> None:
self._in_q.put(item)
def close(self) -> None:
self._in_q.put(self._sentinel)
for t in self._threads:
t.join()
def run_pipeline(
stages: list[Stage],
items: list[Any],
) -> list[Any]:
"""
Wire stages together and run items through the pipeline.
Returns items from the final queue.
Example:
results = run_pipeline(
stages=[
Stage("parse", parse_line),
Stage("validate", validate_record),
Stage("enrich", enrich_record, workers=2),
],
items=raw_lines,
)
"""
# Wire: each stage's out_q is the next stage's in_q
# Last stage writes to result_q
result_q: queue.Queue = queue.Queue()
out_queues = [s._in_q for s in stages[1:]] + [result_q]
for stage, out_q in zip(stages, out_queues):
stage.start(out_q=out_q)
# Feed items to first stage
for item in items:
stages[0].put(item)
stages[0].close()
# Close cascade: each stage's sentinel closes the next
for stage in stages[1:]:
stage.close()
# Collect results
results = []
while True:
try:
results.append(result_q.get(timeout=5.0))
except queue.Empty:
break
return results
# ─────────────────────────────────────────────────────────────────────────────
# 3. Priority queue dispatcher
# ─────────────────────────────────────────────────────────────────────────────
@dataclass(order=True)
class PrioritizedTask:
priority: int
item: Any = field(compare=False)
class PriorityDispatcher:
"""
Dispatch tasks by integer priority (lower = higher priority).
Example:
disp = PriorityDispatcher(workers=2)
disp.start()
disp.submit("low-priority report", priority=10)
disp.submit("critical alert", priority=1)
disp.stop()
"""
def __init__(self, fn: Callable = lambda x: x, workers: int = 2) -> None:
self._fn = fn
self._q: queue.PriorityQueue = queue.PriorityQueue()
self._workers = workers
self._threads: list[threading.Thread] = []
self._done = threading.Event()
self._results: list[WorkResult] = []
def start(self) -> "PriorityDispatcher":
for _ in range(self._workers):
t = threading.Thread(target=self._loop, daemon=True)
t.start()
self._threads.append(t)
return self
def _loop(self) -> None:
while True:
try:
task: PrioritizedTask = self._q.get(timeout=0.1)
except queue.Empty:
if self._done.is_set():
break
continue
try:
r = self._fn(task.item)
self._results.append(WorkResult(item=task.item, result=r, ok=True))
except Exception as exc:
self._results.append(WorkResult(item=task.item, error=exc, ok=False))
finally:
self._q.task_done()
def submit(self, item: Any, priority: int = 5) -> None:
self._q.put(PrioritizedTask(priority=priority, item=item))
def stop(self, wait: bool = True) -> list[WorkResult]:
self._q.join()
self._done.set()
if wait:
for t in self._threads:
t.join()
return list(self._results)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Retry queue
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class RetryItem:
payload: Any
attempts: int = 0
last_error: str = ""
def run_with_retry(
fn: Callable[[Any], Any],
items: list[Any],
max_attempts: int = 3,
delay: float = 0.0,
) -> tuple[list[WorkResult], list[RetryItem]]:
"""
Process items with a retry queue; items that fail are requeued up to max_attempts.
Returns (successes, permanent_failures).
Example:
ok, failed = run_with_retry(post_to_api, payloads, max_attempts=3, delay=0.5)
"""
q: queue.Queue = queue.Queue()
for item in items:
q.put(RetryItem(payload=item))
successes: list[WorkResult] = []
failures: list[RetryItem] = []
while not q.empty():
try:
ri = q.get_nowait()
except queue.Empty:
break
ri.attempts += 1
try:
result = fn(ri.payload)
successes.append(WorkResult(item=ri.payload, result=result, ok=True))
except Exception as exc:
ri.last_error = str(exc)
if ri.attempts < max_attempts:
if delay > 0:
time.sleep(delay)
q.put(ri)
else:
failures.append(ri)
finally:
q.task_done()
return successes, failures
# ─────────────────────────────────────────────────────────────────────────────
# 5. Async queue producer-consumer
# ─────────────────────────────────────────────────────────────────────────────
async def async_worker_pool(
items: list[Any],
fn: Callable[[Any], Any],
workers: int = 4,
maxsize: int = 0,
) -> list[WorkResult]:
"""
Async producer-consumer: put items into asyncio.Queue, process with coroutines.
Example:
async def fetch(url): ...
results = await async_worker_pool(urls, fetch, workers=10)
"""
q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
results: list[WorkResult] = []
lock = asyncio.Lock()
async def worker() -> None:
while True:
item = await q.get()
if item is None:
q.task_done()
break
try:
r = await asyncio.coroutine(fn)(item) if asyncio.iscoroutinefunction(fn) else fn(item)
async with lock:
results.append(WorkResult(item=item, result=r, ok=True))
except Exception as exc:
async with lock:
results.append(WorkResult(item=item, error=exc, ok=False))
finally:
q.task_done()
# Start workers
tasks = [asyncio.create_task(worker()) for _ in range(workers)]
# Produce
for item in items:
await q.put(item)
# Poison pills
for _ in range(workers):
await q.put(None)
await q.join()
await asyncio.gather(*tasks)
return results
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== queue demo ===")
print("\n--- WorkerPool ---")
import math
def heavy(n: int) -> float:
return math.sqrt(n) * math.log(n + 1)
with WorkerPool(heavy, workers=4, maxsize=20) as pool:
for i in range(1, 21):
pool.submit(i)
pool.wait_drain()
results = list(pool.drain())
ok_count = sum(1 for r in results if r.ok)
print(f" processed {len(results)} items, {ok_count} ok")
print("\n--- Pipeline ---")
stages = [
Stage("parse", lambda s: int(s.strip()), workers=1),
Stage("double", lambda x: x * 2, workers=2),
Stage("to_str", lambda x: f"result:{x}", workers=1),
]
output = run_pipeline(stages, ["1", "2", "3", "4", "5"])
print(f" pipeline output (unsorted): {sorted(output)}")
print("\n--- PriorityDispatcher ---")
order: list[int] = []
def record(x: int) -> int:
order.append(x)
return x
disp = PriorityDispatcher(fn=record, workers=1)
disp.start()
for pri, val in [(10, 100), (1, 1), (5, 50), (2, 20), (3, 30)]:
disp.submit(val, priority=pri)
disp.stop()
print(f" processed order: {order} (lower priority = processed first)")
print("\n--- run_with_retry ---")
call_counts: dict[int, int] = {}
def flaky(n: int) -> int:
call_counts[n] = call_counts.get(n, 0) + 1
if call_counts[n] < 2:
raise RuntimeError(f"item {n} failed attempt {call_counts[n]}")
return n * 10
ok_res, fail_res = run_with_retry(flaky, [1, 2, 3], max_attempts=3)
print(f" successes: {[r.result for r in ok_res]}")
print(f" failures: {[f.payload for f in fail_res]}")
print("\n--- SimpleQueue ---")
sq: queue.SimpleQueue = queue.SimpleQueue()
for v in range(5):
sq.put(v)
drained = []
while not sq.empty():
drained.append(sq.get_nowait())
print(f" SimpleQueue drain: {drained}")
print("\n=== done ===")
For the collections.deque alternative — collections.deque offers O(1) append/popleft with maxlen for bounded circular buffers but is not thread-safe for concurrent producers and consumers without an external lock; queue.Queue builds a deque internally and wraps it with a threading.Condition for safe multi-thread access — use deque in single-threaded code or within a single thread for fast FIFO operations; use queue.Queue whenever multiple threads share a queue. For the multiprocessing.Queue alternative — multiprocessing.Queue bridges separate OS processes using a pipe + serialization (pickle) and provides the same put/get/task_done/join API; queue.Queue operates within a single process (shared memory) — use multiprocessing.Queue or multiprocessing.JoinableQueue when workers run in separate processes to bypass the GIL for CPU-bound work; use queue.Queue for I/O-bound threading within one process. The Claude Skills 360 bundle includes queue skill sets covering WorkerPool generic multi-thread pool with WorkResult and drain(), Stage/run_pipeline() multi-stage pipeline wiring, PriorityDispatcher with PrioritizedTask dataclass, run_with_retry() retry queue with configurable delays, and async_worker_pool() asyncio producer-consumer pattern. Start with the free tier to try concurrent work dispatch and queue pipeline code generation.