gevent provides cooperative multitasking via greenlets and monkey-patching. pip install gevent. Monkey-patch: from gevent import monkey; monkey.patch_all() — call at module top before other imports. Spawn: from gevent import spawn; g = spawn(my_fn, arg). Join: g.join(). JoinAll: from gevent import joinall; joinall([spawn(fn, x) for x in items]). Pool: from gevent.pool import Pool; p = Pool(10); results = p.map(fn, items). Queue: from gevent.queue import Queue; q = Queue(); q.put(item); item = q.get(). Event: from gevent.event import Event; e = Event(); e.set(); e.wait(). AsyncResult: ar = AsyncResult(); ar.set(value); ar.get(). Timeout: from gevent import Timeout; with Timeout(5): slow_fn(). Sleep: from gevent import sleep; sleep(1). Greenlet: from gevent import Greenlet; g = Greenlet(fn, args); g.start(); g.join(). g.value — result after completion. g.exception — unhandled exception. Hub: from gevent import get_hub; hub = get_hub(). WSGI: from gevent.pywsgi import WSGIServer; WSGIServer(("0.0.0.0",8000), app).serve_forever(). Requests: after monkey.patch_all() stdlib urllib, socket, and requests become non-blocking. gevent.wait(objects, count=N). gevent.iwait(objects) — iterator. Claude Code generates gevent parallel scrapers, connection pools, producer-consumer pipelines, and cooperative I/O workers.
CLAUDE.md for gevent
## gevent Stack
- Version: gevent >= 23.0 | pip install gevent
- Patch: from gevent import monkey; monkey.patch_all() # FIRST import in main.py
- Spawn: from gevent import spawn, joinall; joinall([spawn(fn, x) for x in items])
- Pool: from gevent.pool import Pool; Pool(20).map(fn, items)
- Queue: from gevent.queue import Queue; q.put(x); q.get()
- Timeout: from gevent import Timeout; with Timeout(5.0): blocking_call()
gevent Cooperative Concurrency Pipeline
# app/concurrent.py — gevent spawn, Pool, Queue, producer-consumer, HTTP, and WSGI
from __future__ import annotations
# IMPORTANT: monkey-patch FIRST, before all other standard library imports
from gevent import monkey
monkey.patch_all()
import logging
import time
import queue as _stdlib_queue
from typing import Any, Callable, Generator, Iterator
import gevent
import gevent.pool
import gevent.queue
import gevent.event
import gevent.timeout
from gevent import Greenlet, sleep, spawn, joinall
from gevent.event import AsyncResult, Event
from gevent.pool import Pool
from gevent.queue import Empty, Full, JoinableQueue, Queue
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Spawn and coordination helpers
# ─────────────────────────────────────────────────────────────────────────────
def run_concurrent(
fn: Callable,
items: list,
concurrency: int = 10,
timeout: float | None = None,
raise_errors: bool = True,
) -> list:
"""
Run fn(item) for each item in items, up to `concurrency` in parallel.
Returns list of results in original order.
Errors are re-raised if raise_errors=True, else stored as Exception objects.
Example:
urls = ["http://api.example.com/users/1", "http://api.example.com/users/2"]
responses = run_concurrent(fetch_url, urls, concurrency=20)
"""
pool = Pool(concurrency)
results = [None] * len(items)
errors = [None] * len(items)
def worker(idx: int, item: Any) -> None:
try:
results[idx] = fn(item)
except Exception as e:
errors[idx] = e
greenlets = [pool.spawn(worker, i, item) for i, item in enumerate(items)]
if timeout is not None:
with gevent.timeout.Timeout(timeout):
joinall(greenlets)
else:
joinall(greenlets)
if raise_errors:
for i, err in enumerate(errors):
if err is not None:
raise err
else:
for i, err in enumerate(errors):
if err is not None:
results[i] = err
return results
def run_with_timeout(
fn: Callable,
*args: Any,
timeout: float = 10.0,
default: Any = None,
**kwargs: Any,
) -> Any:
"""
Run fn(*args, **kwargs) with a timeout.
Returns fn's result, or `default` if timed out.
Example:
data = run_with_timeout(fetch_slow_api, url, timeout=5.0, default={})
"""
try:
with gevent.timeout.Timeout(timeout):
return fn(*args, **kwargs)
except gevent.timeout.Timeout:
log.warning("Timeout after %.1fs calling %s", timeout, getattr(fn, "__name__", fn))
return default
def map_parallel(
fn: Callable,
items: list,
size: int = 10,
ordered: bool = True,
) -> list:
"""
Pool.map() equivalent with configurable pool size.
ordered=True: preserve input order.
ordered=False: imap_unordered for slightly faster completion with unequal tasks.
Example:
prices = map_parallel(fetch_price, sku_list, size=50)
"""
pool = Pool(size)
if ordered:
return list(pool.map(fn, items))
else:
return list(pool.imap_unordered(fn, items))
# ─────────────────────────────────────────────────────────────────────────────
# 2. Producer-consumer queue patterns
# ─────────────────────────────────────────────────────────────────────────────
def producer_consumer(
producer_fn: Callable[[], Iterator],
consumer_fn: Callable[[Any], Any],
n_consumers: int = 5,
queue_size: int = 100,
sentinel: Any = None,
) -> list:
"""
Classic producer-consumer pipeline using gevent Queue.
producer_fn: generator or callable that yields items.
consumer_fn: called with each item; results collected.
sentinel: value placed in queue to signal consumers to stop.
Example:
def read_urls(): yield from open("urls.txt")
def fetch(url): return requests.get(url.strip()).status_code
results = producer_consumer(read_urls, fetch, n_consumers=20)
"""
q = Queue(queue_size)
results = []
def producer():
for item in producer_fn():
q.put(item)
for _ in range(n_consumers):
q.put(sentinel)
def consumer():
while True:
item = q.get()
if item is sentinel:
break
try:
results.append(consumer_fn(item))
except Exception as e:
log.error("Consumer error: %s", e)
consumers = [spawn(consumer) for _ in range(n_consumers)]
spawn(producer)
joinall(consumers)
return results
class WorkQueue:
"""
Thread-safe gevent work queue with worker pool.
Items are put()'d from any greenlet; workers process them concurrently.
Usage:
def handle(item):
data = requests.get(item).json()
return data["id"]
wq = WorkQueue(handle, workers=20)
wq.start()
for url in urls: wq.put(url)
results = wq.wait()
wq.stop()
"""
def __init__(
self,
handler: Callable[[Any], Any],
workers: int = 10,
maxsize: int = 0,
) -> None:
self._handler = handler
self._workers = workers
self._q = JoinableQueue(maxsize)
self._results: list = []
self._pool: Pool | None = None
self._greenlets: list = []
def start(self) -> None:
self._pool = Pool(self._workers)
self._greenlets = [self._pool.spawn(self._worker) for _ in range(self._workers)]
def _worker(self) -> None:
while True:
item = self._q.get()
if item is None:
self._q.task_done()
break
try:
result = self._handler(item)
self._results.append(result)
except Exception as e:
log.error("Worker error: %s", e)
finally:
self._q.task_done()
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
self._q.put(item, block=block, timeout=timeout)
def wait(self, timeout: float | None = None) -> list:
self._q.join()
return list(self._results)
def stop(self) -> None:
for _ in self._greenlets:
self._q.put(None)
joinall(self._greenlets)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Rate-limited fetcher
# ─────────────────────────────────────────────────────────────────────────────
class RateLimitedFetcher:
"""
Fetch URLs concurrently with a configurable rate limit (requests/second).
Uses a token bucket via gevent sleep.
Usage:
fetcher = RateLimitedFetcher(rps=10, concurrency=20)
results = fetcher.fetch_all(url_list)
"""
def __init__(
self,
rps: float = 10.0,
concurrency: int = 20,
timeout: float = 10.0,
) -> None:
self.rps = rps
self.concurrency = concurrency
self.timeout = timeout
self._interval = 1.0 / rps
self._lock = gevent.lock.Semaphore(1)
def _fetch(self, url: str) -> dict:
import urllib.request
with self._lock:
sleep(self._interval) # rate-limit token bucket
try:
with gevent.timeout.Timeout(self.timeout):
with urllib.request.urlopen(url) as resp:
return {"url": url, "status": resp.status, "body_len": len(resp.read())}
except Exception as e:
return {"url": url, "error": str(e)}
def fetch_all(self, urls: list[str]) -> list[dict]:
return map_parallel(self._fetch, urls, size=self.concurrency)
# ─────────────────────────────────────────────────────────────────────────────
# 4. WSGI server pattern
# ─────────────────────────────────────────────────────────────────────────────
WSGI_EXAMPLE = '''
# gevent WSGI server — handles concurrent requests via greenlets
from gevent import monkey; monkey.patch_all()
from gevent.pywsgi import WSGIServer
def application(environ, start_response):
import time, json
# This sleep is non-blocking — other requests continue concurrently
time.sleep(0.01)
body = json.dumps({"status": "ok", "workers": "greenlets"}).encode()
start_response("200 OK", [
("Content-Type", "application/json"),
("Content-Length", str(len(body))),
])
return [body]
if __name__ == "__main__":
print("Serving on http://0.0.0.0:8000")
WSGIServer(("0.0.0.0", 8000), application, log=None).serve_forever()
# For Flask/Django with gevent:
# WSGIServer(("0.0.0.0", 8000), flask_app).serve_forever()
'''
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== run_concurrent ===")
start = time.perf_counter()
def slow_task(n: int) -> int:
sleep(0.05) # simulate I/O
return n * n
results = run_concurrent(slow_task, list(range(20)), concurrency=10)
elapsed = time.perf_counter() - start
print(f" 20 tasks (50ms each) in {elapsed*1000:.0f}ms (serial would be 1000ms)")
print(f" First 5 results: {results[:5]}")
print("\n=== producer_consumer ===")
def gen_items():
for i in range(10):
yield i
def double(x):
sleep(0.01)
return x * 2
pc_results = producer_consumer(gen_items, double, n_consumers=5)
print(f" Processed {len(pc_results)} items: {sorted(pc_results)}")
print("\n=== run_with_timeout ===")
def sometimes_slow(n):
sleep(n)
return f"done after {n}s"
result = run_with_timeout(sometimes_slow, 0.1, timeout=1.0)
print(f" Fast call: {result}")
result2 = run_with_timeout(sometimes_slow, 5.0, timeout=0.5, default="timed out")
print(f" Slow call: {result2}")
print("\n=== WorkQueue ===")
processed = []
def handle(item):
sleep(0.01)
return item ** 2
wq = WorkQueue(handle, workers=5)
wq.start()
for i in range(20):
wq.put(i)
final = wq.wait()
wq.stop()
print(f" Processed {len(final)} items via WorkQueue")
For the asyncio alternative — asyncio is Python’s built-in async framework using explicit async/await syntax with tasks and event loops; gevent achieves the same cooperative scheduling transparently — existing synchronous code (including requests, socket, stdlib http) becomes concurrent after monkey.patch_all() without changing any of its source — use gevent for quick concurrency wins on codebases that can’t be rewritten to async/await, use asyncio for new greenfield async applications. For the threading alternative — Python threads share a GIL, limiting CPU parallelism while incurring thread-creation and context-switch overhead; gevent greenlets are lighter (microseconds to switch vs milliseconds for threads) and work well for I/O-bound concurrency with thousands of concurrent connections — use gevent for high-concurrency I/O tasks, threading for CPU-bound work that releases the GIL (C extensions). The Claude Skills 360 bundle includes gevent skill sets covering monkey.patch_all() positioning, run_concurrent() Pool-bounded parallel map, run_with_timeout() cancellable I/O, map_parallel() ordered and unordered, producer_consumer() queue pattern, WorkQueue class with start/put/wait/stop, RateLimitedFetcher token bucket, gevent WSGI server with Flask integration, and JoinableQueue task_done() synchronization. Start with the free tier to try gevent cooperative concurrency code generation.