Python’s asynchat module (deprecated Python 3.6, removed Python 3.12) builds on asyncore to add message-framing: it buffers incoming bytes and calls found_terminator() when a complete message arrives. import asynchat. Subclass: class Handler(asynchat.async_chat):; implement collect_incoming_data(data) (buffer bytes) and found_terminator() (process complete message). Set delimiter: self.set_terminator(b"\r\n") — byte string; self.set_terminator(4) — fixed length; self.set_terminator(None) — disable. Send: self.push(data) — queue bytes for async write. self.close_when_done() — autoclose after output drains. Server setup: create a dispatcher subclass, call asyncore.loop() to run the event loop. asynchat.simple_producer(data) wraps bytes as a producer object. The module is valuable for studying protocol handling patterns and legacy TCP proxy work, but all new code should use asyncio streams. Claude Code generates line-protocol servers, chat servers, HTTP-like parsers, SMTP relay handlers, and TCP protocol state machines.
CLAUDE.md for asynchat
## asynchat Stack
- Stdlib: import asynchat, asyncore, socket (removed Python 3.12)
- Base: class H(asynchat.async_chat):
- def collect_incoming_data(self, data): self._buf += data
- def found_terminator(self): process(self._buf); self._buf = b""
- Term: self.set_terminator(b"\\r\\n") # byte pattern
- self.set_terminator(4) # fixed length (int)
- self.set_terminator(None) # manual / disable
- Send: self.push(data) # queue bytes
- self.close_when_done() # flush then close
- Loop: asyncore.loop(timeout=1, count=n)
- Note: Removed 3.12; use asyncio.StreamReader / StreamWriter instead
asynchat Protocol Handler Pipeline
# app/asynchatutil.py — line protocol, fixed-length, server, echo, graceful-close
from __future__ import annotations
import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable
# asynchat removed in Python 3.12
_ASYNCHAT_AVAILABLE = False
try:
import asynchat
import asyncore
_ASYNCHAT_AVAILABLE = True
except ImportError:
pass
# ─────────────────────────────────────────────────────────────────────────────
# 1. Line-oriented echo handler
# ─────────────────────────────────────────────────────────────────────────────
if _ASYNCHAT_AVAILABLE:
class LineHandler(asynchat.async_chat):
"""
An async_chat subclass that collects lines terminated by b"\\r\\n"
and echoes each back with a prefix.
Example:
server = LineServer(handler_class=LineHandler)
server.start(port=0)
# connect via telnet or socket
"""
def __init__(self, sock: socket.socket,
on_line: Callable[[bytes], None] | None = None) -> None:
super().__init__(sock=sock)
self._buf: bytes = b""
self._on_line = on_line
self.set_terminator(b"\r\n")
def collect_incoming_data(self, data: bytes) -> None:
self._buf += data
def found_terminator(self) -> None:
line = self._buf
self._buf = b""
if self._on_line:
self._on_line(line)
else:
self.push(b"ECHO: " + line + b"\r\n")
def handle_close(self) -> None:
self.close()
class FixedLengthHandler(asynchat.async_chat):
"""
Receives fixed-length 4-byte frames. Each frame is treated as a
big-endian uint32 and echoed back doubled.
Example:
# Sends 4 bytes, receives 4 bytes back with value * 2
"""
FRAME = 4
def __init__(self, sock: socket.socket) -> None:
super().__init__(sock=sock)
self._buf: bytes = b""
self.set_terminator(self.FRAME)
def collect_incoming_data(self, data: bytes) -> None:
self._buf += data
def found_terminator(self) -> None:
frame = self._buf
self._buf = b""
if len(frame) == 4:
import struct
val = struct.unpack(">I", frame)[0]
self.push(struct.pack(">I", val * 2))
self.close_when_done()
class SimpleServer(asyncore.dispatcher):
"""
A minimal asyncore-based TCP server that creates a handler for
each new connection.
Example:
server = SimpleServer(LineHandler, port=0)
port = server.socket.getsockname()[1]
asyncore.loop(timeout=0.1, count=20)
"""
def __init__(self, handler_class: type,
host: str = "127.0.0.1",
port: int = 0) -> None:
super().__init__()
self.handler_class = handler_class
self.create_socket()
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.port = self.socket.getsockname()[1]
def handle_accepted(self, conn: socket.socket, addr) -> None:
self.handler_class(conn)
# ─────────────────────────────────────────────────────────────────────────────
# 2. asyncio modern equivalent (always available)
# ─────────────────────────────────────────────────────────────────────────────
import asyncio
async def asyncio_echo_server(host: str = "127.0.0.1",
port: int = 0) -> asyncio.Server:
"""
asyncio equivalent of asynchat echo server.
Reads lines terminated by '\\n' and echoes them back.
Example:
server = asyncio.run(asyncio_echo_server(port=9000))
# connect: echo "hello" | nc 127.0.0.1 9000
"""
async def handle(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
try:
while True:
line = await reader.readline()
if not line:
break
writer.write(b"ECHO: " + line)
await writer.drain()
except (asyncio.IncompleteReadError, ConnectionResetError):
pass
finally:
writer.close()
server = await asyncio.start_server(handle, host, port)
return server
async def asyncio_fixed_length_server(frame_size: int = 4,
host: str = "127.0.0.1",
port: int = 0) -> asyncio.Server:
"""
asyncio equivalent of FixedLengthHandler — reads fixed-size frames.
Example:
server = asyncio.run(asyncio_fixed_length_server(frame_size=4))
"""
import struct
async def handle(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
try:
frame = await reader.readexactly(frame_size)
val = struct.unpack(">I", frame)[0]
writer.write(struct.pack(">I", val * 2))
await writer.drain()
except asyncio.IncompleteReadError:
pass
finally:
writer.close()
server = await asyncio.start_server(handle, host, port)
return server
# ─────────────────────────────────────────────────────────────────────────────
# 3. Protocol state machine helper (asyncio-based, modern)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ProtocolStats:
connections: int = 0
messages: int = 0
bytes_recv: int = 0
bytes_sent: int = 0
async def run_stats_server(host: str = "127.0.0.1",
port: int = 0,
stats: ProtocolStats | None = None,
max_connections: int = 5) -> asyncio.Server:
"""
A stats-collecting echo server using asyncio streams.
Example:
stats = ProtocolStats()
server = asyncio.run(run_stats_server(port=9001, stats=stats))
"""
if stats is None:
stats = ProtocolStats()
sem = asyncio.Semaphore(max_connections)
async def handle(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
async with sem:
stats.connections += 1
try:
while True:
line = await reader.readline()
if not line:
break
stats.bytes_recv += len(line)
stats.messages += 1
response = b"ACK: " + line
writer.write(response)
await writer.drain()
stats.bytes_sent += len(response)
except (asyncio.IncompleteReadError, ConnectionResetError):
pass
finally:
writer.close()
return await asyncio.start_server(handle, host, port)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== asynchat demo ===")
print(f" asynchat available: {_ASYNCHAT_AVAILABLE}")
if _ASYNCHAT_AVAILABLE:
# ── asyncore + asynchat line-echo server ──────────────────────────────
print("\n--- asyncore/asynchat line-echo server ---")
server = SimpleServer(LineHandler)
port = server.port
print(f" server listening on port {port}")
received: list[bytes] = []
def bg_loop():
asyncore.loop(timeout=0.05, count=30)
t = threading.Thread(target=bg_loop, daemon=True)
t.start()
with socket.create_connection(("127.0.0.1", port)) as s:
s.sendall(b"hello\r\n")
time.sleep(0.2)
s.settimeout(0.3)
try:
data = s.recv(256)
received.append(data)
except Exception:
pass
server.close()
t.join(timeout=1.0)
print(f" received: {received}")
# ── asyncio echo server ──────────────────────────────────────────────────
print("\n--- asyncio echo server (modern equivalent) ---")
async def demo_asyncio():
server = await asyncio_echo_server()
port = server.sockets[0].getsockname()[1]
print(f" asyncio server on port {port}")
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(b"world\n")
await writer.drain()
response = await asyncio.wait_for(reader.readline(), timeout=1.0)
print(f" response: {response!r}")
writer.close()
server.close()
await server.wait_closed()
asyncio.run(demo_asyncio())
print("\n=== done ===")
For the asyncio stdlib replacement — asyncio.StreamReader / StreamReader.readline() / StreamReader.readexactly(n) plus asyncio.StreamWriter.write() provide coroutine-based, non-blocking protocol handling without the single-threaded polling loop — use asyncio streams for all new network protocol code; asynchat and asyncore were removed in Python 3.12. For the twisted (PyPI) alternative — twisted.internet.protocol.Protocol and LineReceiver provide the same collect/found pattern as asynchat but on a reactor that supports TLS, HTTP, SSH, DNS, and thousands of concurrent connections — use Twisted for production line-protocol servers that need battle-tested TLS and connection management; use asyncio for new greenfield async servers. The Claude Skills 360 bundle includes asynchat skill sets covering LineHandler/FixedLengthHandler async_chat subclasses, SimpleServer asyncore dispatcher, asyncio_echo_server()/asyncio_fixed_length_server() modern equivalents, and ProtocolStats/run_stats_server() stats-collecting asyncio server. Start with the free tier to try async protocol patterns and asynchat pipeline code generation.