Python’s socketserver module provides a framework for building synchronous and concurrent TCP/UDP servers. import socketserver. TCPServer: socketserver.TCPServer((host, port), HandlerClass) — creates a TCP server; allow_reuse_address = True avoids Address already in use. UDPServer: socketserver.UDPServer((host, port), HandlerClass). Concurrency mixins: socketserver.ThreadingMixIn handles each request in a new thread; socketserver.ForkingMixIn uses fork() (Unix only). Combined: class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): daemon_threads = True. Handler: subclass socketserver.BaseRequestHandler and override handle(); self.request = socket (TCP) or data+socket (UDP); self.client_address = (host, port). StreamRequestHandler: self.rfile (readable file-like) and self.wfile (writable) for line-by-line TCP I/O. DatagramRequestHandler: self.packet = received bytes; self.socket = server socket for reply. Lifecycle: server.serve_forever() — blocks; server.shutdown() from another thread stops it; server.server_close() releases the port. Context manager: with socketserver.TCPServer(addr, Handler) as server:. Timeout: server.timeout = 5.0 — handle_timeout() called on idle. server.socket.setsockopt(...) for socket tuning. Claude Code generates echo servers, line-protocol daemons, UDP log receivers, and multiplexed request dispatchers.
CLAUDE.md for socketserver
## socketserver Stack
- Stdlib: import socketserver, threading
- TCP: class Handler(socketserver.StreamRequestHandler): def handle(self): ...
- Concurrent: class Srv(socketserver.ThreadingMixIn, socketserver.TCPServer):
- daemon_threads = True; allow_reuse_address = True
- Run: with Srv(("", 9000), Handler) as s: s.serve_forever()
- Stop: threading.Thread(target=s.shutdown).start() # from outside serve_forever
- UDP: class UHandler(socketserver.DatagramRequestHandler): def handle(self): ...
socketserver TCP/UDP Server Pipeline
# app/srvutil.py — TCP echo, framed protocol, UDP, managed lifecycle, dispatcher
from __future__ import annotations
import io
import json
import socketserver
import struct
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Callable
# ─────────────────────────────────────────────────────────────────────────────
# 1. Managed server lifecycle
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ManagedServer:
"""
Wraps a socketserver.BaseServer with thread-backed serve_forever() lifecycle.
Example:
srv = ManagedServer.start_tcp("127.0.0.1", 9100, EchoHandler)
# ... run tests ...
srv.stop()
"""
_server: socketserver.BaseServer
_thread: threading.Thread | None = field(default=None, repr=False)
@classmethod
def start_tcp(
cls,
host: str,
port: int,
handler: type[socketserver.BaseRequestHandler],
*,
threaded: bool = True,
) -> "ManagedServer":
"""Start a TCP server in a background thread."""
if threaded:
class _Srv(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
else:
class _Srv(socketserver.TCPServer): # type: ignore[no-redef]
allow_reuse_address = True
srv = _Srv((host, port), handler)
return cls(_server=srv)._run()
@classmethod
def start_udp(
cls,
host: str,
port: int,
handler: type[socketserver.BaseRequestHandler],
) -> "ManagedServer":
"""Start a UDP server in a background thread."""
class _Srv(socketserver.ThreadingMixIn, socketserver.UDPServer):
allow_reuse_address = True
srv = _Srv((host, port), handler)
return cls(_server=srv)._run()
def _run(self) -> "ManagedServer":
t = threading.Thread(target=self._server.serve_forever, daemon=True)
t.start()
self._thread = t
return self
@property
def address(self) -> tuple[str, int]:
return self._server.server_address # type: ignore[return-value]
def stop(self, timeout: float = 2.0) -> None:
self._server.shutdown()
self._server.server_close()
if self._thread:
self._thread.join(timeout=timeout)
# ─────────────────────────────────────────────────────────────────────────────
# 2. TCP handlers
# ─────────────────────────────────────────────────────────────────────────────
class EchoHandler(socketserver.StreamRequestHandler):
"""
Reads lines and echoes them back with an ECHO: prefix.
Example:
srv = ManagedServer.start_tcp("127.0.0.1", 9101, EchoHandler)
"""
def handle(self) -> None:
for raw in self.rfile:
line = raw.rstrip(b"\r\n")
if not line:
break
self.wfile.write(b"ECHO: " + line + b"\n")
self.wfile.flush()
_FRAME_HDR = struct.Struct(">I")
class FramedJSONHandler(socketserver.StreamRequestHandler):
"""
Length-prefixed JSON protocol handler.
Reads [4-byte length][JSON bytes] frames, processes each message, and responds.
Subclass and override process_message() to implement business logic.
Example:
class PingHandler(FramedJSONHandler):
def process_message(self, msg):
return {"pong": msg.get("seq", 0)}
srv = ManagedServer.start_tcp("127.0.0.1", 9102, PingHandler)
"""
def handle(self) -> None:
while True:
header = self._read_exactly(4)
if not header:
break
(length,) = _FRAME_HDR.unpack(header)
body = self._read_exactly(length)
if not body:
break
try:
msg = json.loads(body.decode())
response = self.process_message(msg)
except Exception as exc:
response = {"error": str(exc)}
encoded = json.dumps(response).encode()
self.wfile.write(_FRAME_HDR.pack(len(encoded)) + encoded)
self.wfile.flush()
def process_message(self, msg: dict) -> dict:
"""Override to implement custom protocol logic."""
return {"echo": msg}
def _read_exactly(self, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = self.rfile.read(n - len(buf))
if not chunk:
return b""
buf.extend(chunk)
return bytes(buf)
# ─────────────────────────────────────────────────────────────────────────────
# 3. UDP handler
# ─────────────────────────────────────────────────────────────────────────────
class UDPEchoHandler(socketserver.DatagramRequestHandler):
"""
UDP echo — sends every datagram back with a timestamp prefix.
Example:
srv = ManagedServer.start_udp("127.0.0.1", 9103, UDPEchoHandler)
"""
def handle(self) -> None:
data = self.packet
reply = f"[{time.time():.3f}] ".encode() + data
self.socket.sendto(reply, self.client_address)
class UDPLogReceiver(socketserver.DatagramRequestHandler):
"""
UDP syslog-style receiver — appends received datagrams to an in-memory list.
Access `UDPLogReceiver.received` to read collected messages.
Example:
srv = ManagedServer.start_udp("127.0.0.1", 9104, UDPLogReceiver)
# send UDP datagrams, then:
print(UDPLogReceiver.received)
"""
received: list[bytes] = []
_lock = threading.Lock()
def handle(self) -> None:
with UDPLogReceiver._lock:
UDPLogReceiver.received.append(self.packet)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Dispatch router
# ─────────────────────────────────────────────────────────────────────────────
class DispatchHandler(socketserver.StreamRequestHandler):
"""
Reads a one-line command, dispatches to a registered handler function, and
writes the response as a JSON line.
Subclass or configure routes = {"CMD": fn} before starting.
Example:
class MyHandler(DispatchHandler):
routes = {
"PING": lambda _: "pong",
"TIME": lambda _: str(time.time()),
"UPPER": lambda args: args.upper(),
}
srv = ManagedServer.start_tcp("127.0.0.1", 9105, MyHandler)
"""
routes: dict[str, Callable[[str], Any]] = {}
def handle(self) -> None:
for raw in self.rfile:
line = raw.decode(errors="replace").strip()
if not line:
continue
parts = line.split(" ", 1)
cmd = parts[0].upper()
args = parts[1] if len(parts) > 1 else ""
handler_fn = self.routes.get(cmd)
if handler_fn:
try:
result = handler_fn(args)
reply = json.dumps({"ok": True, "result": result})
except Exception as exc:
reply = json.dumps({"ok": False, "error": str(exc)})
else:
reply = json.dumps({"ok": False, "error": f"unknown command: {cmd!r}"})
self.wfile.write(reply.encode() + b"\n")
self.wfile.flush()
# ─────────────────────────────────────────────────────────────────────────────
# 5. SSL wrapper helper
# ─────────────────────────────────────────────────────────────────────────────
def wrap_tcp_with_tls(
host: str,
port: int,
handler: type[socketserver.BaseRequestHandler],
certfile: str,
keyfile: str,
) -> ManagedServer:
"""
Start a TLS-wrapped TCP server. Requires a cert and key on disk.
Example:
srv = wrap_tcp_with_tls("0.0.0.0", 9443, FramedJSONHandler,
"server.pem", "server.key")
"""
import ssl
class TLSServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
def get_request(self):
conn, addr = super().get_request()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(certfile, keyfile)
return ctx.wrap_socket(conn, server_side=True), addr
srv = TLSServer((host, port), handler)
return ManagedServer(_server=srv)._run()
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import socket
print("=== socketserver demo ===")
# ── Echo server ───────────────────────────────────────────────────────────
print("\n--- EchoHandler ---")
echo_srv = ManagedServer.start_tcp("127.0.0.1", 19200, EchoHandler)
time.sleep(0.05)
with socket.create_connection(echo_srv.address, timeout=2) as s:
s.sendall(b"hello world\n")
resp = s.recv(256)
echo_srv.stop()
print(f" sent: b'hello world'")
print(f" recv: {resp!r}")
# ── Framed JSON handler ───────────────────────────────────────────────────
print("\n--- FramedJSONHandler ---")
class PingHandler(FramedJSONHandler):
def process_message(self, msg: dict) -> dict:
return {"pong": msg.get("seq", 0), "ts": time.time()}
json_srv = ManagedServer.start_tcp("127.0.0.1", 19201, PingHandler)
time.sleep(0.05)
with socket.create_connection(json_srv.address, timeout=2) as s:
payload = json.dumps({"seq": 42}).encode()
s.sendall(_FRAME_HDR.pack(len(payload)) + payload)
hdr = bytearray()
while len(hdr) < 4:
hdr.extend(s.recv(4 - len(hdr)))
(length,) = _FRAME_HDR.unpack(bytes(hdr))
body = bytearray()
while len(body) < length:
body.extend(s.recv(length - len(body)))
result = json.loads(body.decode())
json_srv.stop()
print(f" ping seq=42 → {result}")
# ── UDP echo ──────────────────────────────────────────────────────────────
print("\n--- UDPEchoHandler ---")
udp_srv = ManagedServer.start_udp("127.0.0.1", 19202, UDPEchoHandler)
time.sleep(0.05)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(2.0)
s.sendto(b"test datagram", udp_srv.address)
reply, _ = s.recvfrom(512)
udp_srv.stop()
print(f" UDP reply: {reply!r}")
# ── Dispatch router ───────────────────────────────────────────────────────
print("\n--- DispatchHandler ---")
class TimeHandler(DispatchHandler):
routes = {
"PING": lambda _: "pong",
"UPPER": lambda s: s.upper(),
}
disp_srv = ManagedServer.start_tcp("127.0.0.1", 19203, TimeHandler)
time.sleep(0.05)
with socket.create_connection(disp_srv.address, timeout=2) as s:
for cmd in [b"PING\n", b"UPPER hello world\n", b"UNKNOWN\n"]:
s.sendall(cmd)
resp = s.recv(512)
print(f" {cmd.strip()!r} → {json.loads(resp)}")
disp_srv.stop()
print("\n=== done ===")
For the asyncio streams alternative — asyncio.start_server() provides the same TCP server pattern with async/await coroutines instead of threads; a single-threaded event loop can handle thousands of simultaneous idle connections efficiently, while socketserver.ThreadingMixIn creates one OS thread per connection — use asyncio for high-concurrency servers with many long-lived connections; use socketserver for modest-concurrency services, test fixtures, or when you want a synchronous handler with low framework overhead. For the http.server alternative — http.server.HTTPServer subclasses socketserver.TCPServer and pairs it with BaseHTTPRequestHandler; it adds HTTP parsing (method, path, headers) on top of the raw stream that socketserver provides — use http.server when you need to speak HTTP; use socketserver for raw TCP/UDP binary protocols, line-oriented text protocols, or when HTTP would be unnecessary overhead. The Claude Skills 360 bundle includes socketserver skill sets covering ManagedServer with start_tcp()/start_udp()/stop() lifecycle, EchoHandler line-echo server, FramedJSONHandler length-prefixed JSON protocol with subclassable process_message(), UDPEchoHandler/UDPLogReceiver UDP helpers, DispatchHandler command-router, and wrap_tcp_with_tls() TLS upgrade utility. Start with the free tier to try TCP/UDP server patterns and socketserver pipeline code generation.