Python’s select module wraps OS-level I/O readiness notification APIs: select.select, select.poll (Linux), select.epoll (Linux), select.kqueue/kevent (BSD/macOS). import select. select.select(rlist, wlist, xlist, timeout=None) → (readable, writable, exceptional) — blocks until at least one descriptor in any list is ready, or timeout expires (timeout=0 for instant, None to block forever); accepts sockets, pipes, and any object with a fileno() method. select.poll() → poll object — avoids the 1024-fd limit of select; p.register(fd, eventmask) with select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP | select.POLLNVAL; p.poll(timeout_ms=-1) → [(fd, event), ...]; p.unregister(fd). select.epoll(sizehint=-1, flags=0) → epoll — e.register(fd, eventmask) using select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET (edge-triggered); e.poll(timeout=-1, maxevents=-1) → [(fd, event), ...]; e.modify(fd, eventmask); e.unregister(fd). All three support use as context managers. Claude Code generates echo servers, multiplexed chat servers, non-blocking I/O loops, and event-driven pipelines.
CLAUDE.md for select
## select Stack
- Stdlib: import select, socket
- select: r, w, x = select.select(rlist, wlist, [], timeout)
- poll: p = select.poll() # Linux only
- p.register(fd, select.POLLIN | select.POLLHUP)
- events = p.poll(timeout_ms) # [(fd, event), ...]
- epoll: e = select.epoll() # Linux only
- e.register(fd, select.EPOLLIN | select.EPOLLET)
- events = e.poll(timeout=-1)
- Note: Use selectors module for cross-platform abstraction
select I/O Multiplexing Pipeline
# app/selectutil.py — select-based server, poll, epoll, event loop, echo
from __future__ import annotations
import errno
import os
import platform
import select
import socket
import threading
from dataclasses import dataclass, field
# ─────────────────────────────────────────────────────────────────────────────
# 1. select.select-based multi-socket reader
# ─────────────────────────────────────────────────────────────────────────────
def read_available(
sockets: list[socket.socket],
timeout: float = 1.0,
) -> list[socket.socket]:
"""
Return the subset of sockets that have data ready to read.
Uses select.select — portable across all platforms.
Example:
ready = read_available([sock1, sock2], timeout=0.5)
for s in ready:
data = s.recv(4096)
"""
try:
readable, _, _ = select.select(sockets, [], [], timeout)
return readable
except select.error:
return []
def wait_writable(
sockets: list[socket.socket],
timeout: float = 1.0,
) -> list[socket.socket]:
"""
Return the subset of sockets ready for writing (send buffer has space).
Example:
writable = wait_writable([sock], timeout=2.0)
if sock in writable:
sock.send(data)
"""
try:
_, writable, _ = select.select([], sockets, [], timeout)
return writable
except select.error:
return []
# ─────────────────────────────────────────────────────────────────────────────
# 2. select-based single-threaded echo server
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class SelectEchoServer:
"""
Non-blocking echo server using select.select.
Handles multiple clients in a single thread.
Example:
server = SelectEchoServer("127.0.0.1", 9000)
# run in thread for testing:
t = threading.Thread(target=server.run, daemon=True)
t.start()
"""
host: str = "127.0.0.1"
port: int = 9000
bufsize: int = 4096
running: bool = field(default=False, repr=False)
_server_sock: socket.socket = field(default=None, repr=False) # type: ignore
def start(self) -> None:
"""Bind and listen. Call run() or run_in_thread()."""
self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server_sock.bind((self.host, self.port))
self._server_sock.listen(50)
self._server_sock.setblocking(False)
self.running = True
def run(self, max_iters: int | None = None) -> None:
"""
Event loop: accept new connections, echo data, close on empty read.
Stops after max_iters iterations (None = run forever).
"""
if self._server_sock is None:
self.start()
clients: list[socket.socket] = []
iters = 0
while self.running:
if max_iters is not None and iters >= max_iters:
break
iters += 1
all_socks = [self._server_sock] + clients
try:
readable, _, exceptional = select.select(all_socks, [], all_socks, 1.0)
except select.error:
break
for s in readable:
if s is self._server_sock:
conn, _addr = s.accept()
conn.setblocking(False)
clients.append(conn)
else:
data = self._recv(s)
if data:
s.sendall(data)
else:
s.close()
if s in clients:
clients.remove(s)
for s in exceptional:
s.close()
if s in clients:
clients.remove(s)
for c in clients:
c.close()
if self._server_sock:
self._server_sock.close()
def _recv(self, s: socket.socket) -> bytes:
try:
return s.recv(self.bufsize)
except OSError:
return b""
def stop(self) -> None:
self.running = False
def run_in_thread(self, max_iters: int | None = None) -> threading.Thread:
self.start()
t = threading.Thread(
target=self.run, args=(max_iters,), daemon=True
)
t.start()
return t
# ─────────────────────────────────────────────────────────────────────────────
# 3. poll-based server (Linux)
# ─────────────────────────────────────────────────────────────────────────────
_HAS_POLL = hasattr(select, "poll")
def poll_read_events(
sockets: list[socket.socket],
timeout_ms: int = 1000,
) -> list[tuple[socket.socket, int]]:
"""
Use select.poll to find readable/error sockets.
Returns [(socket, event_mask), ...].
Falls back to select.select on non-Linux platforms.
Example:
events = poll_read_events([sock1, sock2])
for s, event in events:
if event & select.POLLIN:
data = s.recv(4096)
"""
if not _HAS_POLL:
readable = read_available(sockets, timeout=timeout_ms / 1000)
return [(s, select.POLLIN if hasattr(select, "POLLIN") else 1) # type: ignore[attr-defined]
for s in readable]
p = select.poll()
fd_to_sock = {}
for s in sockets:
fd = s.fileno()
fd_to_sock[fd] = s
p.register(fd, select.POLLIN | select.POLLHUP | select.POLLERR)
try:
events = p.poll(timeout_ms)
except select.error:
return []
result = []
for fd, event in events:
s = fd_to_sock.get(fd)
if s:
result.append((s, event))
return result
# ─────────────────────────────────────────────────────────────────────────────
# 4. epoll-based server (Linux only)
# ─────────────────────────────────────────────────────────────────────────────
_HAS_EPOLL = hasattr(select, "epoll")
class EpollEventLoop:
"""
Edge-triggered epoll event loop for Linux.
Manages a server socket plus client connections.
Example:
loop = EpollEventLoop("127.0.0.1", 9100)
loop.start()
loop.run(max_events=100)
loop.close()
"""
def __init__(self, host: str = "127.0.0.1", port: int = 9100):
if not _HAS_EPOLL:
raise RuntimeError("epoll is only available on Linux")
self.host = host
self.port = port
self._epoll: select.epoll | None = None
self._server: socket.socket | None = None
self._connections: dict[int, socket.socket] = {}
self._buffers: dict[int, bytes] = {}
def start(self) -> None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.host, self.port))
s.listen(100)
s.setblocking(False)
self._server = s
self._epoll = select.epoll()
self._epoll.register(s.fileno(), select.EPOLLIN) # type: ignore[attr-defined]
def run(self, max_events: int = 1000) -> None:
"""Process up to max_events, echoing all received data."""
if self._epoll is None or self._server is None:
self.start()
handled = 0
try:
while handled < max_events:
events = self._epoll.poll(timeout=1.0) # type: ignore[union-attr]
for fd, event in events:
handled += 1
if fd == self._server.fileno():
conn, _ = self._server.accept()
conn.setblocking(False)
self._epoll.register(conn.fileno(), # type: ignore[union-attr]
select.EPOLLIN | select.EPOLLET) # type: ignore[attr-defined]
self._connections[conn.fileno()] = conn
self._buffers[conn.fileno()] = b""
elif event & select.EPOLLIN: # type: ignore[attr-defined]
conn = self._connections.get(fd)
if conn:
try:
data = conn.recv(4096)
if data:
conn.sendall(data)
else:
self._close(fd)
except OSError:
self._close(fd)
elif event & (select.EPOLLHUP | select.EPOLLERR): # type: ignore[attr-defined]
self._close(fd)
except Exception:
pass
def _close(self, fd: int) -> None:
conn = self._connections.pop(fd, None)
if conn:
if self._epoll:
try:
self._epoll.unregister(fd) # type: ignore[union-attr]
except Exception:
pass
conn.close()
self._buffers.pop(fd, None)
def close(self) -> None:
for fd in list(self._connections):
self._close(fd)
if self._epoll:
self._epoll.close()
if self._server:
self._server.close()
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== select demo ===")
print(f" platform: {platform.system()}")
print(f" has poll: {_HAS_POLL}")
print(f" has epoll: {_HAS_EPOLL}")
# ── select.select echo server ─────────────────────────────────────────────
print("\n--- SelectEchoServer ---")
import time
with socket.socket() as s:
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
server = SelectEchoServer("127.0.0.1", port)
t = server.run_in_thread(max_iters=50)
time.sleep(0.05)
# Connect client
with socket.create_connection(("127.0.0.1", port)) as client:
messages = [b"hello select", b"multiple clients", b"echo test"]
for msg in messages:
client.sendall(msg)
client.settimeout(2.0)
echo = client.recv(4096)
print(f" sent={msg!r} echo={echo!r} match={msg == echo}")
server.stop()
t.join(timeout=3)
# ── read_available (non-blocking check) ───────────────────────────────────
print("\n--- read_available (timeout=0) ---")
with socket.socketpair() as (a, b):
# Nothing ready initially
ready = read_available([a], timeout=0)
print(f" before send: {len(ready)} ready")
# Send data, then check
b.send(b"ping")
ready = read_available([a], timeout=0.1)
print(f" after send: {len(ready)} ready")
if ready:
print(f" recv: {ready[0].recv(64)!r}")
# ── wait_writable ──────────────────────────────────────────────────────────
print("\n--- wait_writable ---")
with socket.socketpair() as (a, b):
writable = wait_writable([a], timeout=0.1)
print(f" writable immediately: {len(writable)} socket(s)")
print("\n=== done ===")
For the selectors alternative — selectors.DefaultSelector() automatically picks the best available multiplexer (epoll on Linux, kqueue on macOS, select everywhere else), with a unified register(fileobj, events, data) / select(timeout) API — use selectors for new cross-platform I/O multiplexing code; use select when you need direct access to OS-specific features (epoll edge-triggered mode, kqueue filters, poll event masks) that selectors abstracts away. For the asyncio alternative — asyncio.get_event_loop(), asyncio.start_server(), and await asyncio.wait_for(coro, timeout) build high-level async servers backed by selectors or uvloop under the hood — use asyncio for all new concurrent I/O code; use select / poll / epoll only when writing a custom event loop, embedding Python’s I/O into a C framework, or working at the raw fd level where asyncio’s coroutine model is too high-level. The Claude Skills 360 bundle includes select skill sets covering read_available()/wait_writable() portable helpers, SelectEchoServer single-threaded multi-client echo server, poll_read_events() Linux poll interface, and EpollEventLoop edge-triggered epoll event loop. Start with the free tier to try I/O multiplexing patterns and select pipeline code generation.