Claude Code for selectors: Python Cross-Platform I/O Multiplexing — Claude Skills 360 Blog
Blog / AI / Claude Code for selectors: Python Cross-Platform I/O Multiplexing
AI

Claude Code for selectors: Python Cross-Platform I/O Multiplexing

Published: November 27, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s selectors module provides a high-level, cross-platform I/O multiplexing API that automatically selects the best available backend (epoll on Linux, kqueue on macOS, poll or select as fallbacks). import selectors. Create: sel = selectors.DefaultSelector() — picks the platform-optimal implementation. Register: sel.register(fileobj, events, data=None)events is selectors.EVENT_READ | selectors.EVENT_WRITE; data is any Python object attached to the key (e.g., a callback or state object); returns a SelectorKey(fileobj, fd, events, data). Wait: ready = sel.select(timeout=None)list[(SelectorKey, events)] — blocks until at least one fd is ready, or timeout expires (None = block forever, 0 = poll). Modify: sel.modify(fileobj, events, data=None) — change mask/data without unregister+register. Unregister: sel.unregister(fileobj). Lookup: sel.get_key(fileobj)SelectorKey; sel.get_map(){fd: SelectorKey}. Close: sel.close() — use as context manager. Constants: selectors.EVENT_READ = 1, selectors.EVENT_WRITE = 4. Other selectors: EpollSelector, KqueueSelector, PollSelector, SelectSelector — use DefaultSelector in all new code. Claude Code generates cross-platform echo servers, callback-driven event loops, non-blocking TCP connects, and multiplexed client pools.

CLAUDE.md for selectors

## selectors Stack
- Stdlib: import selectors, socket
- Create: sel = selectors.DefaultSelector()
- Register: sel.register(sock, selectors.EVENT_READ, data=callback)
- Wait:   ready = sel.select(timeout=1.0)  # [(SelectorKey, events), ...]
-         for key, events in ready:
-             key.data(key.fileobj)         # call attached callback
- Modify: sel.modify(sock, selectors.EVENT_WRITE, data=write_cb)
- Note:   Always close selector; use as context manager

selectors Cross-Platform I/O Multiplexing Pipeline

# app/selectorsutil.py — helpers, echo server, event loop, connect, pool
from __future__ import annotations

import selectors
import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable


# ─────────────────────────────────────────────────────────────────────────────
# 1. Portable readiness helpers
# ─────────────────────────────────────────────────────────────────────────────

def wait_readable(
    sockets: list[socket.socket],
    timeout: float = 1.0,
) -> list[socket.socket]:
    """
    Return the subset of sockets that have data ready to read.
    Uses DefaultSelector — works on all platforms.

    Example:
        ready = wait_readable([sock1, sock2], timeout=0.5)
        for s in ready:
            data = s.recv(4096)
    """
    if not sockets:
        return []
    with selectors.DefaultSelector() as sel:
        for s in sockets:
            sel.register(s, selectors.EVENT_READ)
        events = sel.select(timeout=timeout)
        return [key.fileobj for key, _ in events]  # type: ignore[misc]


def wait_writable(
    sockets: list[socket.socket],
    timeout: float = 1.0,
) -> list[socket.socket]:
    """
    Return the subset of sockets ready for writing.

    Example:
        writable = wait_writable([sock], timeout=2.0)
        if sock in writable:
            sock.send(data)
    """
    if not sockets:
        return []
    with selectors.DefaultSelector() as sel:
        for s in sockets:
            sel.register(s, selectors.EVENT_WRITE)
        events = sel.select(timeout=timeout)
        return [key.fileobj for key, _ in events]  # type: ignore[misc]


# ─────────────────────────────────────────────────────────────────────────────
# 2. Callback-based SelectorEventLoop
# ─────────────────────────────────────────────────────────────────────────────

class SelectorEventLoop:
    """
    Minimal callback-driven event loop backed by selectors.DefaultSelector.

    Example:
        loop = SelectorEventLoop()
        loop.register_read(sock, lambda s: print(s.recv(64)))
        loop.run(timeout=5.0, max_iterations=100)
        loop.close()
    """

    def __init__(self) -> None:
        self._sel = selectors.DefaultSelector()
        self._running = False

    def register_read(
        self,
        sock: socket.socket,
        callback: Callable[[socket.socket], None],
    ) -> None:
        """Register sock for read events; callback(sock) fired on readiness."""
        self._sel.register(sock, selectors.EVENT_READ, data=callback)

    def register_write(
        self,
        sock: socket.socket,
        callback: Callable[[socket.socket], None],
    ) -> None:
        """Register sock for write events; callback(sock) fired on readiness."""
        self._sel.register(sock, selectors.EVENT_WRITE, data=callback)

    def unregister(self, sock: socket.socket) -> None:
        """Remove sock from the selector (ignore if not registered)."""
        try:
            self._sel.unregister(sock)
        except KeyError:
            pass

    def run_once(self, timeout: float = 1.0) -> int:
        """
        Run one iteration: fire callbacks for all ready events.
        Returns the number of events handled.
        """
        ready = self._sel.select(timeout=timeout)
        for key, _ in ready:
            cb = key.data
            if cb is not None:
                cb(key.fileobj)
        return len(ready)

    def run(
        self,
        timeout: float = 1.0,
        max_iterations: int | None = None,
    ) -> None:
        """
        Run the event loop until stop() is called or max_iterations reached.

        Example:
            loop.run(timeout=0.5, max_iterations=200)
        """
        self._running = True
        iters = 0
        while self._running:
            if max_iterations is not None and iters >= max_iterations:
                break
            self.run_once(timeout=timeout)
            iters += 1

    def stop(self) -> None:
        """Signal the loop to stop after the current iteration."""
        self._running = False

    def close(self) -> None:
        """Close the underlying selector."""
        self._sel.close()

    def __enter__(self) -> "SelectorEventLoop":
        return self

    def __exit__(self, *_: object) -> None:
        self.close()


# ─────────────────────────────────────────────────────────────────────────────
# 3. selectors-based single-threaded echo server
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class EchoServer:
    """
    Non-blocking echo server using selectors.DefaultSelector.
    Handles multiple clients in a single thread via callbacks.

    Example:
        server = EchoServer("127.0.0.1", 9200)
        t = server.run_in_thread(max_iters=100)
        t.join(timeout=5)
    """
    host:    str = "127.0.0.1"
    port:    int = 9200
    bufsize: int = 4096
    _sel:    selectors.BaseSelector = field(default=None, repr=False)  # type: ignore
    _server: socket.socket          = field(default=None, repr=False)  # type: ignore
    _running: bool                  = field(default=False, repr=False)

    def _accept(self, server: socket.socket) -> None:
        conn, _ = server.accept()
        conn.setblocking(False)
        self._sel.register(conn, selectors.EVENT_READ, data=self._echo)

    def _echo(self, conn: socket.socket) -> None:
        try:
            data = conn.recv(self.bufsize)
        except OSError:
            data = b""
        if data:
            conn.sendall(data)
        else:
            self._sel.unregister(conn)
            conn.close()

    def start(self) -> None:
        """Bind, listen, and set up selector."""
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((self.host, self.port))
        s.listen(50)
        s.setblocking(False)
        self._server = s
        self._sel = selectors.DefaultSelector()
        self._sel.register(s, selectors.EVENT_READ, data=self._accept)
        self._running = True

    def run(self, max_iters: int | None = None) -> None:
        """
        Run the event loop: accept + echo until stop() or max_iters.
        """
        if self._server is None:
            self.start()
        iters = 0
        while self._running:
            if max_iters is not None and iters >= max_iters:
                break
            iters += 1
            try:
                ready = self._sel.select(timeout=1.0)
            except OSError:
                break
            for key, _ in ready:
                key.data(key.fileobj)
        for key in list(self._sel.get_map().values()):
            key.fileobj.close()  # type: ignore[union-attr]
        self._sel.close()

    def stop(self) -> None:
        self._running = False

    def run_in_thread(self, max_iters: int | None = None) -> threading.Thread:
        self.start()
        t = threading.Thread(target=self.run, args=(max_iters,), daemon=True)
        t.start()
        return t


# ─────────────────────────────────────────────────────────────────────────────
# 4. Non-blocking connect with timeout
# ─────────────────────────────────────────────────────────────────────────────

def connect_with_timeout(
    host: str,
    port: int,
    timeout: float = 5.0,
) -> socket.socket:
    """
    Open a non-blocking TCP connection, waiting up to timeout seconds.
    Returns the connected socket (in blocking mode).
    Raises TimeoutError if not ready within timeout, OSError on failure.

    Example:
        sock = connect_with_timeout("127.0.0.1", 9200, timeout=3.0)
        sock.sendall(b"hello")
        print(sock.recv(64))
        sock.close()
    """
    import errno as _errno
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setblocking(False)
    try:
        s.connect((host, port))
    except BlockingIOError:
        pass
    except OSError as exc:
        if exc.errno not in (_errno.EINPROGRESS, _errno.EWOULDBLOCK):
            s.close()
            raise

    with selectors.DefaultSelector() as sel:
        sel.register(s, selectors.EVENT_WRITE)
        ready = sel.select(timeout=timeout)
        if not ready:
            s.close()
            raise TimeoutError(f"connect to {host}:{port} timed out after {timeout}s")

    err = s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    if err != 0:
        s.close()
        raise OSError(err, f"connect to {host}:{port} failed")
    s.setblocking(True)
    return s


# ─────────────────────────────────────────────────────────────────────────────
# 5. Timeout-aware read
# ─────────────────────────────────────────────────────────────────────────────

def read_with_timeout(sock: socket.socket, n: int, timeout: float = 5.0) -> bytes:
    """
    Read exactly n bytes from a non-blocking socket using selectors.
    Raises TimeoutError or ConnectionError on failure.

    Example:
        sock.setblocking(False)
        data = read_with_timeout(sock, 1024, timeout=3.0)
    """
    buf = bytearray()
    deadline = time.monotonic() + timeout
    with selectors.DefaultSelector() as sel:
        sel.register(sock, selectors.EVENT_READ)
        while len(buf) < n:
            remaining = deadline - time.monotonic()
            if remaining <= 0:
                raise TimeoutError(f"read_with_timeout: got {len(buf)} of {n} bytes")
            ready = sel.select(timeout=remaining)
            if not ready:
                raise TimeoutError(f"read_with_timeout: got {len(buf)} of {n} bytes")
            chunk = sock.recv(n - len(buf))
            if not chunk:
                raise ConnectionError("Connection closed")
            buf.extend(chunk)
    return bytes(buf)


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== selectors demo ===")
    print(f"  DefaultSelector backend: {selectors.DefaultSelector().__class__.__name__}")

    # ── EchoServer ────────────────────────────────────────────────────────────
    print("\n--- EchoServer ---")
    with socket.socket() as tmp:
        tmp.bind(("127.0.0.1", 0))
        port = tmp.getsockname()[1]

    server = EchoServer("127.0.0.1", port)
    t = server.run_in_thread(max_iters=60)
    time.sleep(0.05)

    with socket.create_connection(("127.0.0.1", port)) as client:
        for msg in [b"hello selectors", b"cross-platform", b"echo test"]:
            client.sendall(msg)
            client.settimeout(2.0)
            echo = client.recv(4096)
            print(f"  sent={msg!r}  echo={echo!r}  match={msg == echo}")

    server.stop()
    t.join(timeout=3)

    # ── wait_readable ─────────────────────────────────────────────────────────
    print("\n--- wait_readable (timeout=0) ---")
    with socket.socketpair() as (a, b):
        before = wait_readable([a], timeout=0)
        print(f"  before send: {len(before)} ready")
        b.send(b"ping")
        after = wait_readable([a], timeout=0.1)
        print(f"  after send:  {len(after)} ready")
        if after:
            print(f"  recv: {after[0].recv(64)!r}")

    # ── wait_writable ─────────────────────────────────────────────────────────
    print("\n--- wait_writable ---")
    with socket.socketpair() as (a, b):
        writable = wait_writable([a], timeout=0.1)
        print(f"  writable immediately: {len(writable)} socket(s)")

    # ── SelectorEventLoop callback ────────────────────────────────────────────
    print("\n--- SelectorEventLoop callback ---")
    received: list[bytes] = []
    with socket.socketpair() as (a, b):
        b.send(b"callback-data")
        with SelectorEventLoop() as loop:
            def on_read(s: socket.socket) -> None:
                received.append(s.recv(64))
                loop.stop()
            loop.register_read(a, on_read)
            loop.run(timeout=1.0, max_iterations=10)
    print(f"  received via callback: {received}")

    # ── connect_with_timeout pattern ──────────────────────────────────────────
    print("\n--- connect_with_timeout ---")
    print("  (live connect skipped in demo; pattern shown)")
    print("  sock = connect_with_timeout('127.0.0.1', port, timeout=3.0)")

    print("\n=== done ===")

For the select module alternative — select.select(), select.poll(), and select.epoll() are the lower-level platform-specific primitives that selectors wraps; selectors is always preferred over raw select because it normalises the API, picks the best available mechanism, and works on all platforms — use select only when you need direct access to epoll flags like EPOLLET (edge-triggered mode) or kqueue-specific filter parameters that selectors abstracts away. For the asyncio alternative — asyncio.start_server(), asyncio.open_connection(), and await asyncio.wait_for(coro, timeout) build high-level coroutine-based servers that use selectors (or uvloop) under the hood — use asyncio for all new concurrent I/O code where coroutines and async/await fit naturally; use selectors when writing a custom event loop, embedding Python I/O into a C extension, or retrofitting callback-style multiplexing into existing synchronous code that cannot adopt coroutines. The Claude Skills 360 bundle includes selectors skill sets covering wait_readable()/wait_writable() portable helpers, SelectorEventLoop with register_read()/register_write()/run_once()/run()/stop(), EchoServer callback-driven multi-client echo server, connect_with_timeout() non-blocking TCP connect, and read_with_timeout() selector-backed timed read. Start with the free tier to try cross-platform I/O multiplexing patterns and selectors pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free