Claude Code for select: Python I/O Multiplexing — Claude Skills 360 Blog
Blog / AI / Claude Code for select: Python I/O Multiplexing
AI

Claude Code for select: Python I/O Multiplexing

Published: November 25, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s select module wraps OS-level I/O readiness notification APIs: select.select, select.poll (Linux), select.epoll (Linux), select.kqueue/kevent (BSD/macOS). import select. select.select(rlist, wlist, xlist, timeout=None)(readable, writable, exceptional) — blocks until at least one descriptor in any list is ready, or timeout expires (timeout=0 for instant, None to block forever); accepts sockets, pipes, and any object with a fileno() method. select.poll()poll object — avoids the 1024-fd limit of select; p.register(fd, eventmask) with select.POLLIN | select.POLLOUT | select.POLLERR | select.POLLHUP | select.POLLNVAL; p.poll(timeout_ms=-1)[(fd, event), ...]; p.unregister(fd). select.epoll(sizehint=-1, flags=0)epolle.register(fd, eventmask) using select.EPOLLIN | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP | select.EPOLLET (edge-triggered); e.poll(timeout=-1, maxevents=-1)[(fd, event), ...]; e.modify(fd, eventmask); e.unregister(fd). All three support use as context managers. Claude Code generates echo servers, multiplexed chat servers, non-blocking I/O loops, and event-driven pipelines.

CLAUDE.md for select

## select Stack
- Stdlib: import select, socket
- select: r, w, x = select.select(rlist, wlist, [], timeout)
- poll:   p = select.poll()  # Linux only
-         p.register(fd, select.POLLIN | select.POLLHUP)
-         events = p.poll(timeout_ms)  # [(fd, event), ...]
- epoll:  e = select.epoll()  # Linux only
-         e.register(fd, select.EPOLLIN | select.EPOLLET)
-         events = e.poll(timeout=-1)
- Note:   Use selectors module for cross-platform abstraction

select I/O Multiplexing Pipeline

# app/selectutil.py — select-based server, poll, epoll, event loop, echo
from __future__ import annotations

import errno
import os
import platform
import select
import socket
import threading
from dataclasses import dataclass, field


# ─────────────────────────────────────────────────────────────────────────────
# 1. select.select-based multi-socket reader
# ─────────────────────────────────────────────────────────────────────────────

def read_available(
    sockets: list[socket.socket],
    timeout: float = 1.0,
) -> list[socket.socket]:
    """
    Return the subset of sockets that have data ready to read.
    Uses select.select — portable across all platforms.

    Example:
        ready = read_available([sock1, sock2], timeout=0.5)
        for s in ready:
            data = s.recv(4096)
    """
    try:
        readable, _, _ = select.select(sockets, [], [], timeout)
        return readable
    except select.error:
        return []


def wait_writable(
    sockets: list[socket.socket],
    timeout: float = 1.0,
) -> list[socket.socket]:
    """
    Return the subset of sockets ready for writing (send buffer has space).

    Example:
        writable = wait_writable([sock], timeout=2.0)
        if sock in writable:
            sock.send(data)
    """
    try:
        _, writable, _ = select.select([], sockets, [], timeout)
        return writable
    except select.error:
        return []


# ─────────────────────────────────────────────────────────────────────────────
# 2. select-based single-threaded echo server
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class SelectEchoServer:
    """
    Non-blocking echo server using select.select.
    Handles multiple clients in a single thread.

    Example:
        server = SelectEchoServer("127.0.0.1", 9000)
        # run in thread for testing:
        t = threading.Thread(target=server.run, daemon=True)
        t.start()
    """
    host:    str   = "127.0.0.1"
    port:    int   = 9000
    bufsize: int   = 4096
    running: bool  = field(default=False, repr=False)
    _server_sock: socket.socket = field(default=None, repr=False)  # type: ignore

    def start(self) -> None:
        """Bind and listen. Call run() or run_in_thread()."""
        self._server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._server_sock.bind((self.host, self.port))
        self._server_sock.listen(50)
        self._server_sock.setblocking(False)
        self.running = True

    def run(self, max_iters: int | None = None) -> None:
        """
        Event loop: accept new connections, echo data, close on empty read.
        Stops after max_iters iterations (None = run forever).
        """
        if self._server_sock is None:
            self.start()
        clients: list[socket.socket] = []
        iters = 0
        while self.running:
            if max_iters is not None and iters >= max_iters:
                break
            iters += 1
            all_socks = [self._server_sock] + clients
            try:
                readable, _, exceptional = select.select(all_socks, [], all_socks, 1.0)
            except select.error:
                break
            for s in readable:
                if s is self._server_sock:
                    conn, _addr = s.accept()
                    conn.setblocking(False)
                    clients.append(conn)
                else:
                    data = self._recv(s)
                    if data:
                        s.sendall(data)
                    else:
                        s.close()
                        if s in clients:
                            clients.remove(s)
            for s in exceptional:
                s.close()
                if s in clients:
                    clients.remove(s)
        for c in clients:
            c.close()
        if self._server_sock:
            self._server_sock.close()

    def _recv(self, s: socket.socket) -> bytes:
        try:
            return s.recv(self.bufsize)
        except OSError:
            return b""

    def stop(self) -> None:
        self.running = False

    def run_in_thread(self, max_iters: int | None = None) -> threading.Thread:
        self.start()
        t = threading.Thread(
            target=self.run, args=(max_iters,), daemon=True
        )
        t.start()
        return t


# ─────────────────────────────────────────────────────────────────────────────
# 3. poll-based server (Linux)
# ─────────────────────────────────────────────────────────────────────────────

_HAS_POLL = hasattr(select, "poll")


def poll_read_events(
    sockets: list[socket.socket],
    timeout_ms: int = 1000,
) -> list[tuple[socket.socket, int]]:
    """
    Use select.poll to find readable/error sockets.
    Returns [(socket, event_mask), ...].
    Falls back to select.select on non-Linux platforms.

    Example:
        events = poll_read_events([sock1, sock2])
        for s, event in events:
            if event & select.POLLIN:
                data = s.recv(4096)
    """
    if not _HAS_POLL:
        readable = read_available(sockets, timeout=timeout_ms / 1000)
        return [(s, select.POLLIN if hasattr(select, "POLLIN") else 1)  # type: ignore[attr-defined]
                for s in readable]

    p = select.poll()
    fd_to_sock = {}
    for s in sockets:
        fd = s.fileno()
        fd_to_sock[fd] = s
        p.register(fd, select.POLLIN | select.POLLHUP | select.POLLERR)

    try:
        events = p.poll(timeout_ms)
    except select.error:
        return []

    result = []
    for fd, event in events:
        s = fd_to_sock.get(fd)
        if s:
            result.append((s, event))
    return result


# ─────────────────────────────────────────────────────────────────────────────
# 4. epoll-based server (Linux only)
# ─────────────────────────────────────────────────────────────────────────────

_HAS_EPOLL = hasattr(select, "epoll")


class EpollEventLoop:
    """
    Edge-triggered epoll event loop for Linux.
    Manages a server socket plus client connections.

    Example:
        loop = EpollEventLoop("127.0.0.1", 9100)
        loop.start()
        loop.run(max_events=100)
        loop.close()
    """

    def __init__(self, host: str = "127.0.0.1", port: int = 9100):
        if not _HAS_EPOLL:
            raise RuntimeError("epoll is only available on Linux")
        self.host = host
        self.port = port
        self._epoll: select.epoll | None = None
        self._server: socket.socket | None = None
        self._connections: dict[int, socket.socket] = {}
        self._buffers: dict[int, bytes] = {}

    def start(self) -> None:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((self.host, self.port))
        s.listen(100)
        s.setblocking(False)
        self._server = s
        self._epoll = select.epoll()
        self._epoll.register(s.fileno(), select.EPOLLIN)  # type: ignore[attr-defined]

    def run(self, max_events: int = 1000) -> None:
        """Process up to max_events, echoing all received data."""
        if self._epoll is None or self._server is None:
            self.start()
        handled = 0
        try:
            while handled < max_events:
                events = self._epoll.poll(timeout=1.0)  # type: ignore[union-attr]
                for fd, event in events:
                    handled += 1
                    if fd == self._server.fileno():
                        conn, _ = self._server.accept()
                        conn.setblocking(False)
                        self._epoll.register(conn.fileno(),  # type: ignore[union-attr]
                                             select.EPOLLIN | select.EPOLLET)  # type: ignore[attr-defined]
                        self._connections[conn.fileno()] = conn
                        self._buffers[conn.fileno()] = b""
                    elif event & select.EPOLLIN:  # type: ignore[attr-defined]
                        conn = self._connections.get(fd)
                        if conn:
                            try:
                                data = conn.recv(4096)
                                if data:
                                    conn.sendall(data)
                                else:
                                    self._close(fd)
                            except OSError:
                                self._close(fd)
                    elif event & (select.EPOLLHUP | select.EPOLLERR):  # type: ignore[attr-defined]
                        self._close(fd)
        except Exception:
            pass

    def _close(self, fd: int) -> None:
        conn = self._connections.pop(fd, None)
        if conn:
            if self._epoll:
                try:
                    self._epoll.unregister(fd)  # type: ignore[union-attr]
                except Exception:
                    pass
            conn.close()
        self._buffers.pop(fd, None)

    def close(self) -> None:
        for fd in list(self._connections):
            self._close(fd)
        if self._epoll:
            self._epoll.close()
        if self._server:
            self._server.close()


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== select demo ===")
    print(f"  platform: {platform.system()}")
    print(f"  has poll:  {_HAS_POLL}")
    print(f"  has epoll: {_HAS_EPOLL}")

    # ── select.select echo server ─────────────────────────────────────────────
    print("\n--- SelectEchoServer ---")
    import time

    with socket.socket() as s:
        s.bind(("127.0.0.1", 0))
        port = s.getsockname()[1]

    server = SelectEchoServer("127.0.0.1", port)
    t = server.run_in_thread(max_iters=50)
    time.sleep(0.05)

    # Connect client
    with socket.create_connection(("127.0.0.1", port)) as client:
        messages = [b"hello select", b"multiple clients", b"echo test"]
        for msg in messages:
            client.sendall(msg)
            client.settimeout(2.0)
            echo = client.recv(4096)
            print(f"  sent={msg!r}  echo={echo!r}  match={msg == echo}")

    server.stop()
    t.join(timeout=3)

    # ── read_available (non-blocking check) ───────────────────────────────────
    print("\n--- read_available (timeout=0) ---")
    with socket.socketpair() as (a, b):
        # Nothing ready initially
        ready = read_available([a], timeout=0)
        print(f"  before send: {len(ready)} ready")
        # Send data, then check
        b.send(b"ping")
        ready = read_available([a], timeout=0.1)
        print(f"  after send:  {len(ready)} ready")
        if ready:
            print(f"  recv: {ready[0].recv(64)!r}")

    # ── wait_writable ──────────────────────────────────────────────────────────
    print("\n--- wait_writable ---")
    with socket.socketpair() as (a, b):
        writable = wait_writable([a], timeout=0.1)
        print(f"  writable immediately: {len(writable)} socket(s)")

    print("\n=== done ===")

For the selectors alternative — selectors.DefaultSelector() automatically picks the best available multiplexer (epoll on Linux, kqueue on macOS, select everywhere else), with a unified register(fileobj, events, data) / select(timeout) API — use selectors for new cross-platform I/O multiplexing code; use select when you need direct access to OS-specific features (epoll edge-triggered mode, kqueue filters, poll event masks) that selectors abstracts away. For the asyncio alternative — asyncio.get_event_loop(), asyncio.start_server(), and await asyncio.wait_for(coro, timeout) build high-level async servers backed by selectors or uvloop under the hood — use asyncio for all new concurrent I/O code; use select / poll / epoll only when writing a custom event loop, embedding Python’s I/O into a C framework, or working at the raw fd level where asyncio’s coroutine model is too high-level. The Claude Skills 360 bundle includes select skill sets covering read_available()/wait_writable() portable helpers, SelectEchoServer single-threaded multi-client echo server, poll_read_events() Linux poll interface, and EpollEventLoop edge-triggered epoll event loop. Start with the free tier to try I/O multiplexing patterns and select pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free