Python’s asyncore module (deprecated Python 3.6, removed Python 3.12) is a pre-asyncio single-threaded event loop for non-blocking socket I/O. import asyncore. Subclass asyncore.dispatcher: override handle_read(), handle_write(), handle_connect(), handle_accept(), handle_close(), readable(), writable(). Start: asyncore.loop(timeout=30.0) — runs until the socket map is empty; asyncore.loop(count=N) — run N iterations then return; asyncore.loop(use_poll=True) — use poll() instead of select(). Create connection: self.create_socket(); self.connect((host, port)). Accept: conn, addr = self.accept(); handler = MyHandler(conn). Send: self.send(data) in handle_write(). Receive: data = self.recv(4096) in handle_read(). Map: asyncore.socket_map is the global dict of active dispatchers. Every asyncore program can be translated line-for-line to asyncio coroutines; the module is preserved here as a reference for legacy codebases and protocol-design study. Claude Code generates non-blocking TCP clients, connection pools, port scanners, logging aggregators, and custom protocol dispatchers.
CLAUDE.md for asyncore
## asyncore Stack
- Stdlib: import asyncore, socket (removed Python 3.12)
- Base: class D(asyncore.dispatcher):
- def handle_read(self): data = self.recv(4096)
- def handle_write(self): self.send(self._obuf)
- def handle_connect(self): ...
- def handle_close(self): self.close()
- def readable(self): return True
- def writable(self): return bool(self._obuf)
- Loop: asyncore.loop(timeout=1.0) # until map empty
- asyncore.loop(count=10) # N iterations
- asyncore.loop(use_poll=True) # poll() backend
- Map: asyncore.socket_map # active dispatchers
- Note: Removed 3.12; use asyncio instead
asyncore Event-Loop Pipeline
# app/asyncoreutil.py — client, server, conn pool, port scanner, asyncio equiv
from __future__ import annotations
import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable
_ASYNCORE_AVAILABLE = False
try:
import asyncore
_ASYNCORE_AVAILABLE = True
except ImportError:
pass
# ─────────────────────────────────────────────────────────────────────────────
# 1. Low-level TCP client dispatcher
# ─────────────────────────────────────────────────────────────────────────────
if _ASYNCORE_AVAILABLE:
class TCPClient(asyncore.dispatcher):
"""
A non-blocking TCP client that sends a request and collects the reply.
Example:
client = TCPClient("httpbin.org", 80, b"GET / HTTP/1.0\\r\\n\\r\\n")
asyncore.loop(timeout=1.0, count=50)
print(client.response[:200])
"""
def __init__(self, host: str, port: int, request: bytes) -> None:
super().__init__()
self._request = request
self._sent = False
self.response: bytes = b""
self.create_socket()
self.connect((host, port))
def handle_connect(self) -> None:
pass # connection complete; send in handle_write
def handle_write(self) -> None:
if not self._sent:
sent = self.send(self._request)
self._request = self._request[sent:]
if not self._request:
self._sent = True
def handle_read(self) -> None:
chunk = self.recv(4096)
if chunk:
self.response += chunk
def handle_close(self) -> None:
self.close()
def readable(self) -> bool:
return True
def writable(self) -> bool:
return not self._sent
class EchoServer(asyncore.dispatcher):
"""
A minimal asyncore TCP echo server.
Example:
srv = EchoServer("127.0.0.1", 0)
port = srv.socket.getsockname()[1]
asyncore.loop(timeout=0.05, count=30)
"""
def __init__(self, host: str = "127.0.0.1", port: int = 0) -> None:
super().__init__()
self.create_socket()
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.port = self.socket.getsockname()[1]
def handle_accepted(self, conn: socket.socket, addr) -> None:
EchoHandler(conn)
class EchoHandler(asyncore.dispatcher):
def __init__(self, sock: socket.socket) -> None:
super().__init__(sock=sock)
self._outbuf: bytes = b""
def handle_read(self) -> None:
data = self.recv(4096)
if data:
self._outbuf += data
def handle_write(self) -> None:
if self._outbuf:
sent = self.send(self._outbuf)
self._outbuf = self._outbuf[sent:]
def handle_close(self) -> None:
self.close()
def readable(self) -> bool:
return True
def writable(self) -> bool:
return bool(self._outbuf)
class ConnectionPool(asyncore.dispatcher):
"""
Manage multiple outbound connections via a shared asyncore loop.
Example:
pool = ConnectionPool()
c = pool.connect("example.com", 80, b"GET / HTTP/1.0\\r\\n\\r\\n")
asyncore.loop(timeout=1.0, count=100)
print(c.response[:100])
"""
def __init__(self) -> None:
super().__init__() # no socket; used only as namespace
self._clients: list[TCPClient] = []
# Don't call create_socket — this is a controller, not a socket
# Override dispatcher so __init__ doesn't try to register
self._map: dict = {}
def connect(self, host: str, port: int, request: bytes) -> TCPClient:
c = TCPClient(host, port, request)
self._clients.append(c)
return c
def all_done(self) -> bool:
return all(not c._map for c in self._clients) # type: ignore
# ─────────────────────────────────────────────────────────────────────────────
# 2. Non-blocking port scanner (asyncore)
# ─────────────────────────────────────────────────────────────────────────────
if _ASYNCORE_AVAILABLE:
class PortProbe(asyncore.dispatcher):
"""
Probe one TCP port; records open/closed result in .result.
Example:
probe = PortProbe("127.0.0.1", 22)
asyncore.loop(timeout=0.5, count=10)
print(probe.result) # "open" or "closed"
"""
def __init__(self, host: str, port: int) -> None:
super().__init__()
self.host = host
self.port = port
self.result: str = "unknown"
self.create_socket()
try:
self.connect((host, port))
except OSError:
self.result = "closed"
self.close()
def handle_connect(self) -> None:
self.result = "open"
self.close()
def handle_close(self) -> None:
if self.result == "unknown":
self.result = "closed"
self.close()
def handle_error(self) -> None:
self.result = "closed"
self.close()
def readable(self) -> bool:
return False
def writable(self) -> bool:
return True
def scan_ports_asyncore(host: str, ports: list[int],
timeout: float = 1.0) -> dict[int, str]:
"""
Scan a list of ports using asyncore PortProbe dispatchers.
Returns {port: "open"|"closed"|"unknown"}.
Example:
results = scan_ports_asyncore("127.0.0.1", [22, 80, 443, 8080])
for port, status in results.items():
print(f" {port}: {status}")
"""
if not _ASYNCORE_AVAILABLE:
return {p: "unknown" for p in ports}
probes = {p: PortProbe(host, p) for p in ports}
asyncore.loop(timeout=timeout, count=20)
return {p: probe.result for p, probe in probes.items()}
# ─────────────────────────────────────────────────────────────────────────────
# 3. asyncio modern equivalent (always available)
# ─────────────────────────────────────────────────────────────────────────────
import asyncio
async def asyncio_echo_server(host: str = "127.0.0.1",
port: int = 0) -> asyncio.Server:
"""
asyncio equivalent of EchoServer above — use for new code.
Example:
server = asyncio.run(asyncio_echo_server(port=9100))
"""
async def echo(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
try:
while True:
data = await reader.read(4096)
if not data:
break
writer.write(data)
await writer.drain()
except ConnectionResetError:
pass
finally:
writer.close()
return await asyncio.start_server(echo, host, port)
async def asyncio_scan_port(host: str, port: int,
timeout: float = 0.5) -> str:
"""
asyncio equivalent of PortProbe — returns "open" or "closed".
Example:
status = asyncio.run(asyncio_scan_port("127.0.0.1", 22))
"""
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(host, port), timeout=timeout
)
writer.close()
return "open"
except (OSError, asyncio.TimeoutError):
return "closed"
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== asyncore demo ===")
print(f" asyncore available: {_ASYNCORE_AVAILABLE}")
if _ASYNCORE_AVAILABLE:
# ── EchoServer ─────────────────────────────────────────────────────────
print("\n--- asyncore EchoServer ---")
srv = EchoServer()
port = srv.port
print(f" server port: {port}")
responses: list[bytes] = []
def run_loop():
asyncore.loop(timeout=0.05, count=40)
t = threading.Thread(target=run_loop, daemon=True)
t.start()
with socket.create_connection(("127.0.0.1", port)) as s:
s.sendall(b"hello asyncore")
time.sleep(0.3)
s.settimeout(0.3)
try:
responses.append(s.recv(256))
except Exception:
pass
t.join(timeout=2.0)
print(f" echo response: {responses}")
# ── asyncio echo server ────────────────────────────────────────────────────
print("\n--- asyncio echo server (modern) ---")
async def demo_asyncio():
srv = await asyncio_echo_server()
port = srv.sockets[0].getsockname()[1]
print(f" asyncio echo on port {port}")
r, w = await asyncio.open_connection("127.0.0.1", port)
w.write(b"hello asyncio")
await w.drain()
data = await asyncio.wait_for(r.read(256), timeout=1.0)
print(f" echo: {data!r}")
w.close()
srv.close()
await srv.wait_closed()
asyncio.run(demo_asyncio())
# ── port scan ──────────────────────────────────────────────────────────────
print("\n--- asyncio port scan (127.0.0.1:22,80,9999) ---")
async def scan():
results = {}
for p in [22, 80, 9999]:
results[p] = await asyncio_scan_port("127.0.0.1", p, timeout=0.3)
return results
scan_results = asyncio.run(scan())
for p, status in scan_results.items():
print(f" port {p}: {status}")
print("\n=== done ===")
For the asyncio stdlib replacement — asyncio.start_server(), asyncio.open_connection(), asyncio.StreamReader, and asyncio.Protocol provide a coroutine-native, TLS-capable, high-performance replacement for the entire asyncore/asynchat stack — use asyncio for all new event-driven network code; asyncore was removed in Python 3.12. For the selectors stdlib alternative — selectors.DefaultSelector with register(fd, EVENT_READ | EVENT_WRITE) and select(timeout) gives direct access to the OS multiplexer (epoll/kqueue/select) without the dispatcher class hierarchy — use selectors when you need a thin, dependency-free event loop that works in Python 3.4+ and want full control of dispatch logic without asyncio overhead. The Claude Skills 360 bundle includes asyncore skill sets covering TCPClient non-blocking outbound connector, EchoServer/EchoHandler dispatcher pair, ConnectionPool multi-client manager, PortProbe/scan_ports_asyncore() port scanner, and asyncio_echo_server()/asyncio_scan_port() modern async equivalents. Start with the free tier to try event-loop networking patterns and asyncore pipeline code generation.