Python’s selectors module provides a high-level, cross-platform I/O multiplexing API that automatically selects the best available backend (epoll on Linux, kqueue on macOS, poll or select as fallbacks). import selectors. Create: sel = selectors.DefaultSelector() — picks the platform-optimal implementation. Register: sel.register(fileobj, events, data=None) — events is selectors.EVENT_READ | selectors.EVENT_WRITE; data is any Python object attached to the key (e.g., a callback or state object); returns a SelectorKey(fileobj, fd, events, data). Wait: ready = sel.select(timeout=None) → list[(SelectorKey, events)] — blocks until at least one fd is ready, or timeout expires (None = block forever, 0 = poll). Modify: sel.modify(fileobj, events, data=None) — change mask/data without unregister+register. Unregister: sel.unregister(fileobj). Lookup: sel.get_key(fileobj) → SelectorKey; sel.get_map() → {fd: SelectorKey}. Close: sel.close() — use as context manager. Constants: selectors.EVENT_READ = 1, selectors.EVENT_WRITE = 4. Other selectors: EpollSelector, KqueueSelector, PollSelector, SelectSelector — use DefaultSelector in all new code. Claude Code generates cross-platform echo servers, callback-driven event loops, non-blocking TCP connects, and multiplexed client pools.
CLAUDE.md for selectors
## selectors Stack
- Stdlib: import selectors, socket
- Create: sel = selectors.DefaultSelector()
- Register: sel.register(sock, selectors.EVENT_READ, data=callback)
- Wait: ready = sel.select(timeout=1.0) # [(SelectorKey, events), ...]
- for key, events in ready:
- key.data(key.fileobj) # call attached callback
- Modify: sel.modify(sock, selectors.EVENT_WRITE, data=write_cb)
- Note: Always close selector; use as context manager
selectors Cross-Platform I/O Multiplexing Pipeline
# app/selectorsutil.py — helpers, echo server, event loop, connect, pool
from __future__ import annotations
import selectors
import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1. Portable readiness helpers
# ─────────────────────────────────────────────────────────────────────────────
def wait_readable(
sockets: list[socket.socket],
timeout: float = 1.0,
) -> list[socket.socket]:
"""
Return the subset of sockets that have data ready to read.
Uses DefaultSelector — works on all platforms.
Example:
ready = wait_readable([sock1, sock2], timeout=0.5)
for s in ready:
data = s.recv(4096)
"""
if not sockets:
return []
with selectors.DefaultSelector() as sel:
for s in sockets:
sel.register(s, selectors.EVENT_READ)
events = sel.select(timeout=timeout)
return [key.fileobj for key, _ in events] # type: ignore[misc]
def wait_writable(
sockets: list[socket.socket],
timeout: float = 1.0,
) -> list[socket.socket]:
"""
Return the subset of sockets ready for writing.
Example:
writable = wait_writable([sock], timeout=2.0)
if sock in writable:
sock.send(data)
"""
if not sockets:
return []
with selectors.DefaultSelector() as sel:
for s in sockets:
sel.register(s, selectors.EVENT_WRITE)
events = sel.select(timeout=timeout)
return [key.fileobj for key, _ in events] # type: ignore[misc]
# ─────────────────────────────────────────────────────────────────────────────
# 2. Callback-based SelectorEventLoop
# ─────────────────────────────────────────────────────────────────────────────
class SelectorEventLoop:
"""
Minimal callback-driven event loop backed by selectors.DefaultSelector.
Example:
loop = SelectorEventLoop()
loop.register_read(sock, lambda s: print(s.recv(64)))
loop.run(timeout=5.0, max_iterations=100)
loop.close()
"""
def __init__(self) -> None:
self._sel = selectors.DefaultSelector()
self._running = False
def register_read(
self,
sock: socket.socket,
callback: Callable[[socket.socket], None],
) -> None:
"""Register sock for read events; callback(sock) fired on readiness."""
self._sel.register(sock, selectors.EVENT_READ, data=callback)
def register_write(
self,
sock: socket.socket,
callback: Callable[[socket.socket], None],
) -> None:
"""Register sock for write events; callback(sock) fired on readiness."""
self._sel.register(sock, selectors.EVENT_WRITE, data=callback)
def unregister(self, sock: socket.socket) -> None:
"""Remove sock from the selector (ignore if not registered)."""
try:
self._sel.unregister(sock)
except KeyError:
pass
def run_once(self, timeout: float = 1.0) -> int:
"""
Run one iteration: fire callbacks for all ready events.
Returns the number of events handled.
"""
ready = self._sel.select(timeout=timeout)
for key, _ in ready:
cb = key.data
if cb is not None:
cb(key.fileobj)
return len(ready)
def run(
self,
timeout: float = 1.0,
max_iterations: int | None = None,
) -> None:
"""
Run the event loop until stop() is called or max_iterations reached.
Example:
loop.run(timeout=0.5, max_iterations=200)
"""
self._running = True
iters = 0
while self._running:
if max_iterations is not None and iters >= max_iterations:
break
self.run_once(timeout=timeout)
iters += 1
def stop(self) -> None:
"""Signal the loop to stop after the current iteration."""
self._running = False
def close(self) -> None:
"""Close the underlying selector."""
self._sel.close()
def __enter__(self) -> "SelectorEventLoop":
return self
def __exit__(self, *_: object) -> None:
self.close()
# ─────────────────────────────────────────────────────────────────────────────
# 3. selectors-based single-threaded echo server
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class EchoServer:
"""
Non-blocking echo server using selectors.DefaultSelector.
Handles multiple clients in a single thread via callbacks.
Example:
server = EchoServer("127.0.0.1", 9200)
t = server.run_in_thread(max_iters=100)
t.join(timeout=5)
"""
host: str = "127.0.0.1"
port: int = 9200
bufsize: int = 4096
_sel: selectors.BaseSelector = field(default=None, repr=False) # type: ignore
_server: socket.socket = field(default=None, repr=False) # type: ignore
_running: bool = field(default=False, repr=False)
def _accept(self, server: socket.socket) -> None:
conn, _ = server.accept()
conn.setblocking(False)
self._sel.register(conn, selectors.EVENT_READ, data=self._echo)
def _echo(self, conn: socket.socket) -> None:
try:
data = conn.recv(self.bufsize)
except OSError:
data = b""
if data:
conn.sendall(data)
else:
self._sel.unregister(conn)
conn.close()
def start(self) -> None:
"""Bind, listen, and set up selector."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.host, self.port))
s.listen(50)
s.setblocking(False)
self._server = s
self._sel = selectors.DefaultSelector()
self._sel.register(s, selectors.EVENT_READ, data=self._accept)
self._running = True
def run(self, max_iters: int | None = None) -> None:
"""
Run the event loop: accept + echo until stop() or max_iters.
"""
if self._server is None:
self.start()
iters = 0
while self._running:
if max_iters is not None and iters >= max_iters:
break
iters += 1
try:
ready = self._sel.select(timeout=1.0)
except OSError:
break
for key, _ in ready:
key.data(key.fileobj)
for key in list(self._sel.get_map().values()):
key.fileobj.close() # type: ignore[union-attr]
self._sel.close()
def stop(self) -> None:
self._running = False
def run_in_thread(self, max_iters: int | None = None) -> threading.Thread:
self.start()
t = threading.Thread(target=self.run, args=(max_iters,), daemon=True)
t.start()
return t
# ─────────────────────────────────────────────────────────────────────────────
# 4. Non-blocking connect with timeout
# ─────────────────────────────────────────────────────────────────────────────
def connect_with_timeout(
host: str,
port: int,
timeout: float = 5.0,
) -> socket.socket:
"""
Open a non-blocking TCP connection, waiting up to timeout seconds.
Returns the connected socket (in blocking mode).
Raises TimeoutError if not ready within timeout, OSError on failure.
Example:
sock = connect_with_timeout("127.0.0.1", 9200, timeout=3.0)
sock.sendall(b"hello")
print(sock.recv(64))
sock.close()
"""
import errno as _errno
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(False)
try:
s.connect((host, port))
except BlockingIOError:
pass
except OSError as exc:
if exc.errno not in (_errno.EINPROGRESS, _errno.EWOULDBLOCK):
s.close()
raise
with selectors.DefaultSelector() as sel:
sel.register(s, selectors.EVENT_WRITE)
ready = sel.select(timeout=timeout)
if not ready:
s.close()
raise TimeoutError(f"connect to {host}:{port} timed out after {timeout}s")
err = s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
s.close()
raise OSError(err, f"connect to {host}:{port} failed")
s.setblocking(True)
return s
# ─────────────────────────────────────────────────────────────────────────────
# 5. Timeout-aware read
# ─────────────────────────────────────────────────────────────────────────────
def read_with_timeout(sock: socket.socket, n: int, timeout: float = 5.0) -> bytes:
"""
Read exactly n bytes from a non-blocking socket using selectors.
Raises TimeoutError or ConnectionError on failure.
Example:
sock.setblocking(False)
data = read_with_timeout(sock, 1024, timeout=3.0)
"""
buf = bytearray()
deadline = time.monotonic() + timeout
with selectors.DefaultSelector() as sel:
sel.register(sock, selectors.EVENT_READ)
while len(buf) < n:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"read_with_timeout: got {len(buf)} of {n} bytes")
ready = sel.select(timeout=remaining)
if not ready:
raise TimeoutError(f"read_with_timeout: got {len(buf)} of {n} bytes")
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("Connection closed")
buf.extend(chunk)
return bytes(buf)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== selectors demo ===")
print(f" DefaultSelector backend: {selectors.DefaultSelector().__class__.__name__}")
# ── EchoServer ────────────────────────────────────────────────────────────
print("\n--- EchoServer ---")
with socket.socket() as tmp:
tmp.bind(("127.0.0.1", 0))
port = tmp.getsockname()[1]
server = EchoServer("127.0.0.1", port)
t = server.run_in_thread(max_iters=60)
time.sleep(0.05)
with socket.create_connection(("127.0.0.1", port)) as client:
for msg in [b"hello selectors", b"cross-platform", b"echo test"]:
client.sendall(msg)
client.settimeout(2.0)
echo = client.recv(4096)
print(f" sent={msg!r} echo={echo!r} match={msg == echo}")
server.stop()
t.join(timeout=3)
# ── wait_readable ─────────────────────────────────────────────────────────
print("\n--- wait_readable (timeout=0) ---")
with socket.socketpair() as (a, b):
before = wait_readable([a], timeout=0)
print(f" before send: {len(before)} ready")
b.send(b"ping")
after = wait_readable([a], timeout=0.1)
print(f" after send: {len(after)} ready")
if after:
print(f" recv: {after[0].recv(64)!r}")
# ── wait_writable ─────────────────────────────────────────────────────────
print("\n--- wait_writable ---")
with socket.socketpair() as (a, b):
writable = wait_writable([a], timeout=0.1)
print(f" writable immediately: {len(writable)} socket(s)")
# ── SelectorEventLoop callback ────────────────────────────────────────────
print("\n--- SelectorEventLoop callback ---")
received: list[bytes] = []
with socket.socketpair() as (a, b):
b.send(b"callback-data")
with SelectorEventLoop() as loop:
def on_read(s: socket.socket) -> None:
received.append(s.recv(64))
loop.stop()
loop.register_read(a, on_read)
loop.run(timeout=1.0, max_iterations=10)
print(f" received via callback: {received}")
# ── connect_with_timeout pattern ──────────────────────────────────────────
print("\n--- connect_with_timeout ---")
print(" (live connect skipped in demo; pattern shown)")
print(" sock = connect_with_timeout('127.0.0.1', port, timeout=3.0)")
print("\n=== done ===")
For the select module alternative — select.select(), select.poll(), and select.epoll() are the lower-level platform-specific primitives that selectors wraps; selectors is always preferred over raw select because it normalises the API, picks the best available mechanism, and works on all platforms — use select only when you need direct access to epoll flags like EPOLLET (edge-triggered mode) or kqueue-specific filter parameters that selectors abstracts away. For the asyncio alternative — asyncio.start_server(), asyncio.open_connection(), and await asyncio.wait_for(coro, timeout) build high-level coroutine-based servers that use selectors (or uvloop) under the hood — use asyncio for all new concurrent I/O code where coroutines and async/await fit naturally; use selectors when writing a custom event loop, embedding Python I/O into a C extension, or retrofitting callback-style multiplexing into existing synchronous code that cannot adopt coroutines. The Claude Skills 360 bundle includes selectors skill sets covering wait_readable()/wait_writable() portable helpers, SelectorEventLoop with register_read()/register_write()/run_once()/run()/stop(), EchoServer callback-driven multi-client echo server, connect_with_timeout() non-blocking TCP connect, and read_with_timeout() selector-backed timed read. Start with the free tier to try cross-platform I/O multiplexing patterns and selectors pipeline code generation.