Python’s socket module provides low-level BSD socket network I/O. import socket. socket: s = socket.socket(family=AF_INET, type=SOCK_STREAM) — TCP socket; use SOCK_DGRAM for UDP. connect: s.connect((host, port)). bind: s.bind((host, port)) — host="" or "0.0.0.0" for all interfaces. listen: s.listen(backlog). accept: conn, addr = s.accept() — blocks until client connects. send/recv: s.send(data) → bytes sent (may be partial); s.sendall(data) → sends all or raises; s.recv(bufsize) → bytes (empty = closed). UDP: s.sendto(data, (host, port)) / data, addr = s.recvfrom(bufsize). setsockopt: s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) — allow port reuse; TCP_NODELAY disables Nagle. settimeout: s.settimeout(5.0) — raises socket.timeout on blocking ops; settimeout(0) = non-blocking. makefile: f = s.makefile("rb") — file-like wrapper. create_connection: socket.create_connection((host, port), timeout=10) — high-level convenience with getaddrinfo fallback. create_server (3.8+): socket.create_server((host, port), reuse_port=True). getaddrinfo: socket.getaddrinfo(host, port, AF_INET, SOCK_STREAM) → list of (family, type, proto, canonname, sockaddr). Context manager: with socket.socket() as s: — auto-closes. Claude Code generates TCP echo servers, protocol parsers, port scanners, and service health-check clients.
CLAUDE.md for socket
## socket Stack
- Stdlib: import socket
- TCP client: socket.create_connection((host, port), timeout=10)
- TCP server: s = socket.create_server(("", 8080), reuse_port=True)
- UDP: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- Opts: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- Recv: data = recv_all(conn, n) # loop until n bytes received
socket Network I/O Pipeline
# app/sockutil.py — TCP client/server, framing, UDP, DNS, health check
from __future__ import annotations
import socket
import struct
import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Callable, Generator
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
def tcp_connect(host: str, port: int, timeout: float = 10.0) -> socket.socket:
"""
Open a TCP connection and return the socket.
Raises socket.timeout or ConnectionRefusedError on failure.
Example:
with tcp_connect("httpbin.org", 80) as s:
s.sendall(b"GET / HTTP/1.0\\r\\nHost: httpbin.org\\r\\n\\r\\n")
print(recv_until(s, b"\\r\\n\\r\\n"))
"""
s = socket.create_connection((host, port), timeout=timeout)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return s
@contextmanager
def tcp_client(host: str, port: int, timeout: float = 10.0) -> Generator[socket.socket, None, None]:
"""Context manager TCP client that auto-closes the socket."""
s = tcp_connect(host, port, timeout=timeout)
with s:
yield s
def recv_exactly(s: socket.socket, n: int) -> bytes:
"""
Receive exactly n bytes from socket s (loops on partial reads).
Raises ConnectionError if the connection is closed early.
Example:
header = recv_exactly(conn, 4)
"""
buf = bytearray()
while len(buf) < n:
chunk = s.recv(n - len(buf))
if not chunk:
raise ConnectionError(f"Connection closed after {len(buf)} of {n} bytes")
buf.extend(chunk)
return bytes(buf)
def recv_until(s: socket.socket, delimiter: bytes, max_bytes: int = 65536) -> bytes:
"""
Receive bytes until delimiter is found; return data including delimiter.
Raises ValueError if max_bytes exceeded.
Example:
response_header = recv_until(s, b"\\r\\n\\r\\n")
"""
buf = bytearray()
while True:
if len(buf) > max_bytes:
raise ValueError(f"recv_until: exceeded {max_bytes} bytes without finding delimiter")
chunk = s.recv(1024)
if not chunk:
break
buf.extend(chunk)
pos = buf.find(delimiter)
if pos != -1:
return bytes(buf[: pos + len(delimiter)])
return bytes(buf)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Length-prefixed framing
# ─────────────────────────────────────────────────────────────────────────────
_FRAME_HEADER = struct.Struct(">I") # 4-byte big-endian length
def send_frame(s: socket.socket, data: bytes) -> None:
"""
Send data with a 4-byte length prefix.
Example:
send_frame(conn, json.dumps({"action": "ping"}).encode())
"""
s.sendall(_FRAME_HEADER.pack(len(data)) + data)
def recv_frame(s: socket.socket) -> bytes:
"""
Receive a length-prefixed frame.
Example:
data = recv_frame(conn)
msg = json.loads(data)
"""
header = recv_exactly(s, _FRAME_HEADER.size)
(length,) = _FRAME_HEADER.unpack(header)
return recv_exactly(s, length)
# ─────────────────────────────────────────────────────────────────────────────
# 3. TCP server
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class TCPServer:
"""
Simple threaded TCP server. Dispatches each connection to a handler function.
Example:
def handler(conn, addr):
data = recv_frame(conn)
send_frame(conn, b"ACK:" + data)
server = TCPServer("127.0.0.1", 9000, handler)
server.start()
# ... run client ...
server.stop()
"""
host: str
port: int
handler: Callable[[socket.socket, Any], None]
_sock: socket.socket | None = None
_thread: threading.Thread | None = None
_running: bool = False
def start(self) -> "TCPServer":
self._sock = socket.create_server((self.host, self.port), reuse_port=False)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.settimeout(0.5)
self._running = True
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()
return self
def _serve(self) -> None:
while self._running and self._sock:
try:
conn, addr = self._sock.accept()
except socket.timeout:
continue
except OSError:
break
t = threading.Thread(target=self._handle_one, args=(conn, addr), daemon=True)
t.start()
def _handle_one(self, conn: socket.socket, addr: Any) -> None:
with conn:
try:
self.handler(conn, addr)
except Exception:
pass
def stop(self) -> None:
self._running = False
if self._sock:
self._sock.close()
if self._thread:
self._thread.join(timeout=2.0)
# ─────────────────────────────────────────────────────────────────────────────
# 4. UDP helpers
# ─────────────────────────────────────────────────────────────────────────────
def udp_send(host: str, port: int, data: bytes, timeout: float = 2.0) -> None:
"""
Send a UDP datagram.
Example:
udp_send("localhost", 514, b"<14>test log message")
"""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(timeout)
s.sendto(data, (host, port))
def udp_request(host: str, port: int, data: bytes, bufsize: int = 4096, timeout: float = 2.0) -> bytes:
"""
Send a UDP datagram and receive a response.
Example:
response = udp_request("8.8.8.8", 53, dns_query_bytes)
"""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(timeout)
s.sendto(data, (host, port))
data, _ = s.recvfrom(bufsize)
return data
# ─────────────────────────────────────────────────────────────────────────────
# 5. DNS and health-check utilities
# ─────────────────────────────────────────────────────────────────────────────
def resolve(host: str, port: int = 80, family: int = socket.AF_INET) -> list[str]:
"""
Resolve a hostname to a list of IP addresses.
Example:
resolve("google.com") # ['142.250.x.x', ...]
"""
addrs = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM)
return list({addr[4][0] for addr in addrs})
def is_port_open(host: str, port: int, timeout: float = 1.0) -> bool:
"""
Return True if a TCP connection to host:port succeeds within timeout.
Example:
is_port_open("localhost", 5432) # True if postgres is up
"""
try:
with socket.create_connection((host, port), timeout=timeout):
return True
except (socket.timeout, ConnectionRefusedError, OSError):
return False
@dataclass
class ServiceStatus:
host: str
port: int
latency_ms: float | None # None if unreachable
reachable: bool
def __str__(self) -> str:
if self.reachable:
return f"{self.host}:{self.port} UP {self.latency_ms:.1f}ms"
return f"{self.host}:{self.port} DOWN"
def check_service(host: str, port: int, timeout: float = 2.0) -> ServiceStatus:
"""
Check if a TCP service is reachable and measure connect latency.
Example:
status = check_service("db.example.com", 5432)
print(status)
"""
t0 = time.monotonic()
try:
with socket.create_connection((host, port), timeout=timeout):
latency = (time.monotonic() - t0) * 1000
return ServiceStatus(host=host, port=port, latency_ms=latency, reachable=True)
except (socket.timeout, ConnectionRefusedError, OSError):
return ServiceStatus(host=host, port=port, latency_ms=None, reachable=False)
def check_services(endpoints: list[tuple[str, int]], timeout: float = 2.0) -> list[ServiceStatus]:
"""
Check multiple services in parallel; return list of ServiceStatus.
Example:
statuses = check_services([("localhost", 5432), ("localhost", 6379), ("api.svc", 8080)])
for s in statuses:
print(s)
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=len(endpoints)) as ex:
futures = {ex.submit(check_service, h, p, timeout): (h, p) for h, p in endpoints}
return [fut.result() for fut in as_completed(futures)]
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== socket demo ===")
# ── Framed echo server ────────────────────────────────────────────────────
print("\n--- TCPServer frame echo ---")
def echo_handler(conn: socket.socket, addr: Any) -> None:
while True:
try:
data = recv_frame(conn)
send_frame(conn, b"ECHO:" + data)
except (ConnectionError, EOFError):
break
server = TCPServer("127.0.0.1", 19100, echo_handler)
server.start()
time.sleep(0.05)
with tcp_client("127.0.0.1", 19100) as s:
msg = json.dumps({"hello": "world"}).encode()
send_frame(s, msg)
response = recv_frame(s)
server.stop()
print(f" sent: {msg[:30]}")
print(f" echo: {response[:35]}")
# ── DNS resolution ────────────────────────────────────────────────────────
print("\n--- resolve ---")
try:
ips = resolve("localhost")
print(f" localhost → {ips}")
except Exception as e:
print(f" resolve: {e}")
# ── Service check ─────────────────────────────────────────────────────────
print("\n--- check_service ---")
endpoints = [("127.0.0.1", 19100), ("127.0.0.1", 19101)] # second should be closed
for status in check_services(endpoints):
print(f" {status}")
# ── is_port_open ──────────────────────────────────────────────────────────
print("\n--- is_port_open ---")
print(f" localhost:65000 (unlikely open): {is_port_open('127.0.0.1', 65000, timeout=0.1)}")
# ── UDP send ──────────────────────────────────────────────────────────────
print("\n--- UDP (fire and forget) ---")
try:
udp_send("127.0.0.1", 19200, b"hello udp", timeout=0.1)
print(" udp_send: ok (packet sent)")
except Exception as e:
print(f" udp_send: {type(e).__name__} (expected if no listener)")
print("\n=== done ===")
For the http.client alternative — http.client.HTTPConnection / HTTPSConnection implement the HTTP protocol on top of raw TCP sockets and handle chunked transfer, headers, and status parsing; use http.client or higher-level urllib.request / requests for HTTP; use socket for non-HTTP protocols (SMTP, custom binary protocols, TCP health checks) or when you need raw byte-level control. For the asyncio.streams alternative — asyncio.open_connection() and asyncio.start_server() provide the same TCP client/server pattern but with async / await coroutines instead of threads; they are more efficient for high-concurrency scenarios where many connections are idle simultaneously — use asyncio.streams for building async servers that hold many simultaneous long-lived connections; use socket with threading for simpler low-concurrency services or when you need synchronous blocking I/O. The Claude Skills 360 bundle includes socket skill sets covering tcp_connect()/tcp_client()/recv_exactly()/recv_until() connection helpers, send_frame()/recv_frame() length-prefixed framing, TCPServer threaded server class, udp_send()/udp_request() UDP helpers, and resolve()/is_port_open()/check_service()/check_services() DNS and health-check utilities. Start with the free tier to try TCP/UDP networking patterns and socket pipeline code generation.