Claude Code for asyncore: Python Async Socket Event Loop — Claude Skills 360 Blog
Blog / AI / Claude Code for asyncore: Python Async Socket Event Loop
AI

Claude Code for asyncore: Python Async Socket Event Loop

Published: January 18, 2029
Read time: 5 min read
By: Claude Skills 360

Python’s asyncore module (deprecated Python 3.6, removed Python 3.12) is a pre-asyncio single-threaded event loop for non-blocking socket I/O. import asyncore. Subclass asyncore.dispatcher: override handle_read(), handle_write(), handle_connect(), handle_accept(), handle_close(), readable(), writable(). Start: asyncore.loop(timeout=30.0) — runs until the socket map is empty; asyncore.loop(count=N) — run N iterations then return; asyncore.loop(use_poll=True) — use poll() instead of select(). Create connection: self.create_socket(); self.connect((host, port)). Accept: conn, addr = self.accept(); handler = MyHandler(conn). Send: self.send(data) in handle_write(). Receive: data = self.recv(4096) in handle_read(). Map: asyncore.socket_map is the global dict of active dispatchers. Every asyncore program can be translated line-for-line to asyncio coroutines; the module is preserved here as a reference for legacy codebases and protocol-design study. Claude Code generates non-blocking TCP clients, connection pools, port scanners, logging aggregators, and custom protocol dispatchers.

CLAUDE.md for asyncore

## asyncore Stack
- Stdlib: import asyncore, socket  (removed Python 3.12)
- Base:   class D(asyncore.dispatcher):
-             def handle_read(self): data = self.recv(4096)
-             def handle_write(self): self.send(self._obuf)
-             def handle_connect(self): ...
-             def handle_close(self): self.close()
-             def readable(self): return True
-             def writable(self): return bool(self._obuf)
- Loop:   asyncore.loop(timeout=1.0)         # until map empty
-         asyncore.loop(count=10)            # N iterations
-         asyncore.loop(use_poll=True)       # poll() backend
- Map:    asyncore.socket_map               # active dispatchers
- Note:   Removed 3.12; use asyncio instead

asyncore Event-Loop Pipeline

# app/asyncoreutil.py — client, server, conn pool, port scanner, asyncio equiv
from __future__ import annotations

import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable

_ASYNCORE_AVAILABLE = False
try:
    import asyncore
    _ASYNCORE_AVAILABLE = True
except ImportError:
    pass


# ─────────────────────────────────────────────────────────────────────────────
# 1. Low-level TCP client dispatcher
# ─────────────────────────────────────────────────────────────────────────────

if _ASYNCORE_AVAILABLE:

    class TCPClient(asyncore.dispatcher):
        """
        A non-blocking TCP client that sends a request and collects the reply.

        Example:
            client = TCPClient("httpbin.org", 80, b"GET / HTTP/1.0\\r\\n\\r\\n")
            asyncore.loop(timeout=1.0, count=50)
            print(client.response[:200])
        """

        def __init__(self, host: str, port: int, request: bytes) -> None:
            super().__init__()
            self._request = request
            self._sent = False
            self.response: bytes = b""
            self.create_socket()
            self.connect((host, port))

        def handle_connect(self) -> None:
            pass  # connection complete; send in handle_write

        def handle_write(self) -> None:
            if not self._sent:
                sent = self.send(self._request)
                self._request = self._request[sent:]
                if not self._request:
                    self._sent = True

        def handle_read(self) -> None:
            chunk = self.recv(4096)
            if chunk:
                self.response += chunk

        def handle_close(self) -> None:
            self.close()

        def readable(self) -> bool:
            return True

        def writable(self) -> bool:
            return not self._sent


    class EchoServer(asyncore.dispatcher):
        """
        A minimal asyncore TCP echo server.

        Example:
            srv = EchoServer("127.0.0.1", 0)
            port = srv.socket.getsockname()[1]
            asyncore.loop(timeout=0.05, count=30)
        """

        def __init__(self, host: str = "127.0.0.1", port: int = 0) -> None:
            super().__init__()
            self.create_socket()
            self.set_reuse_addr()
            self.bind((host, port))
            self.listen(5)
            self.port = self.socket.getsockname()[1]

        def handle_accepted(self, conn: socket.socket, addr) -> None:
            EchoHandler(conn)


    class EchoHandler(asyncore.dispatcher):
        def __init__(self, sock: socket.socket) -> None:
            super().__init__(sock=sock)
            self._outbuf: bytes = b""

        def handle_read(self) -> None:
            data = self.recv(4096)
            if data:
                self._outbuf += data

        def handle_write(self) -> None:
            if self._outbuf:
                sent = self.send(self._outbuf)
                self._outbuf = self._outbuf[sent:]

        def handle_close(self) -> None:
            self.close()

        def readable(self) -> bool:
            return True

        def writable(self) -> bool:
            return bool(self._outbuf)


    class ConnectionPool(asyncore.dispatcher):
        """
        Manage multiple outbound connections via a shared asyncore loop.

        Example:
            pool = ConnectionPool()
            c = pool.connect("example.com", 80, b"GET / HTTP/1.0\\r\\n\\r\\n")
            asyncore.loop(timeout=1.0, count=100)
            print(c.response[:100])
        """

        def __init__(self) -> None:
            super().__init__()          # no socket; used only as namespace
            self._clients: list[TCPClient] = []
            # Don't call create_socket — this is a controller, not a socket
            # Override dispatcher so __init__ doesn't try to register
            self._map: dict = {}

        def connect(self, host: str, port: int, request: bytes) -> TCPClient:
            c = TCPClient(host, port, request)
            self._clients.append(c)
            return c

        def all_done(self) -> bool:
            return all(not c._map for c in self._clients)  # type: ignore


# ─────────────────────────────────────────────────────────────────────────────
# 2. Non-blocking port scanner (asyncore)
# ─────────────────────────────────────────────────────────────────────────────

if _ASYNCORE_AVAILABLE:

    class PortProbe(asyncore.dispatcher):
        """
        Probe one TCP port; records open/closed result in .result.

        Example:
            probe = PortProbe("127.0.0.1", 22)
            asyncore.loop(timeout=0.5, count=10)
            print(probe.result)   # "open" or "closed"
        """

        def __init__(self, host: str, port: int) -> None:
            super().__init__()
            self.host = host
            self.port = port
            self.result: str = "unknown"
            self.create_socket()
            try:
                self.connect((host, port))
            except OSError:
                self.result = "closed"
                self.close()

        def handle_connect(self) -> None:
            self.result = "open"
            self.close()

        def handle_close(self) -> None:
            if self.result == "unknown":
                self.result = "closed"
            self.close()

        def handle_error(self) -> None:
            self.result = "closed"
            self.close()

        def readable(self) -> bool:
            return False

        def writable(self) -> bool:
            return True


def scan_ports_asyncore(host: str, ports: list[int],
                         timeout: float = 1.0) -> dict[int, str]:
    """
    Scan a list of ports using asyncore PortProbe dispatchers.
    Returns {port: "open"|"closed"|"unknown"}.

    Example:
        results = scan_ports_asyncore("127.0.0.1", [22, 80, 443, 8080])
        for port, status in results.items():
            print(f"  {port}: {status}")
    """
    if not _ASYNCORE_AVAILABLE:
        return {p: "unknown" for p in ports}
    probes = {p: PortProbe(host, p) for p in ports}
    asyncore.loop(timeout=timeout, count=20)
    return {p: probe.result for p, probe in probes.items()}


# ─────────────────────────────────────────────────────────────────────────────
# 3. asyncio modern equivalent (always available)
# ─────────────────────────────────────────────────────────────────────────────

import asyncio


async def asyncio_echo_server(host: str = "127.0.0.1",
                               port: int = 0) -> asyncio.Server:
    """
    asyncio equivalent of EchoServer above — use for new code.

    Example:
        server = asyncio.run(asyncio_echo_server(port=9100))
    """
    async def echo(reader: asyncio.StreamReader,
                   writer: asyncio.StreamWriter) -> None:
        try:
            while True:
                data = await reader.read(4096)
                if not data:
                    break
                writer.write(data)
                await writer.drain()
        except ConnectionResetError:
            pass
        finally:
            writer.close()

    return await asyncio.start_server(echo, host, port)


async def asyncio_scan_port(host: str, port: int,
                             timeout: float = 0.5) -> str:
    """
    asyncio equivalent of PortProbe — returns "open" or "closed".

    Example:
        status = asyncio.run(asyncio_scan_port("127.0.0.1", 22))
    """
    try:
        _, writer = await asyncio.wait_for(
            asyncio.open_connection(host, port), timeout=timeout
        )
        writer.close()
        return "open"
    except (OSError, asyncio.TimeoutError):
        return "closed"


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=== asyncore demo ===")
    print(f"  asyncore available: {_ASYNCORE_AVAILABLE}")

    if _ASYNCORE_AVAILABLE:
        # ── EchoServer ─────────────────────────────────────────────────────────
        print("\n--- asyncore EchoServer ---")
        srv = EchoServer()
        port = srv.port
        print(f"  server port: {port}")

        responses: list[bytes] = []

        def run_loop():
            asyncore.loop(timeout=0.05, count=40)

        t = threading.Thread(target=run_loop, daemon=True)
        t.start()

        with socket.create_connection(("127.0.0.1", port)) as s:
            s.sendall(b"hello asyncore")
            time.sleep(0.3)
            s.settimeout(0.3)
            try:
                responses.append(s.recv(256))
            except Exception:
                pass

        t.join(timeout=2.0)
        print(f"  echo response: {responses}")

    # ── asyncio echo server ────────────────────────────────────────────────────
    print("\n--- asyncio echo server (modern) ---")

    async def demo_asyncio():
        srv = await asyncio_echo_server()
        port = srv.sockets[0].getsockname()[1]
        print(f"  asyncio echo on port {port}")
        r, w = await asyncio.open_connection("127.0.0.1", port)
        w.write(b"hello asyncio")
        await w.drain()
        data = await asyncio.wait_for(r.read(256), timeout=1.0)
        print(f"  echo: {data!r}")
        w.close()
        srv.close()
        await srv.wait_closed()

    asyncio.run(demo_asyncio())

    # ── port scan ──────────────────────────────────────────────────────────────
    print("\n--- asyncio port scan (127.0.0.1:22,80,9999) ---")

    async def scan():
        results = {}
        for p in [22, 80, 9999]:
            results[p] = await asyncio_scan_port("127.0.0.1", p, timeout=0.3)
        return results

    scan_results = asyncio.run(scan())
    for p, status in scan_results.items():
        print(f"  port {p}: {status}")

    print("\n=== done ===")

For the asyncio stdlib replacement — asyncio.start_server(), asyncio.open_connection(), asyncio.StreamReader, and asyncio.Protocol provide a coroutine-native, TLS-capable, high-performance replacement for the entire asyncore/asynchat stack — use asyncio for all new event-driven network code; asyncore was removed in Python 3.12. For the selectors stdlib alternative — selectors.DefaultSelector with register(fd, EVENT_READ | EVENT_WRITE) and select(timeout) gives direct access to the OS multiplexer (epoll/kqueue/select) without the dispatcher class hierarchy — use selectors when you need a thin, dependency-free event loop that works in Python 3.4+ and want full control of dispatch logic without asyncio overhead. The Claude Skills 360 bundle includes asyncore skill sets covering TCPClient non-blocking outbound connector, EchoServer/EchoHandler dispatcher pair, ConnectionPool multi-client manager, PortProbe/scan_ports_asyncore() port scanner, and asyncio_echo_server()/asyncio_scan_port() modern async equivalents. Start with the free tier to try event-loop networking patterns and asyncore pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free