Python’s smtpd module implements an asynchronous SMTP server using the older asynchat / asyncore I/O model. import smtpd. Base classes: smtpd.SMTPServer(localaddr, remoteaddr) — override process_message(peer, mailfrom, rcpttos, data, **kwargs) to handle received mail; smtpd.DebuggingServer(localaddr, remoteaddr) — prints each message to stdout; smtpd.PureProxy(localaddr, remoteaddr) — forwards to an upstream relay. Start server: server = smtpd.DebuggingServer(('127.0.0.1', 8025), None) then asyncore.loop(). process_message parameters: peer = (ip, port) tuple; mailfrom = sender envelope address string; rcpttos = list of recipient strings; data = raw message bytes (RFC 2822); kwargs may include mail_options and rcpt_options from ESMTP extensions. Return None to accept; return a string to reject (RFC 2821 error message). smtpd.NEWLINE constant. Note: deprecated 3.6, removed 3.12 — for Python 3.12+ use aiosmtpd (PyPI) or asyncio-based raw TCP listeners; always include compatibility guard. Claude Code generates local mail capture servers, test SMTP sinks, development mail interceptors, and forwarding relays.
CLAUDE.md for smtpd
## smtpd Stack
- Stdlib: import smtpd, asyncore (deprecated 3.6, removed 3.12 — guard!)
- Quick: s = smtpd.DebuggingServer(('127.0.0.1', 8025), None)
- asyncore.loop() # blocks, prints all mail to stdout
- Custom: class MySink(smtpd.SMTPServer):
- def process_message(self, peer, mailfrom, rcpttos, data, **kw):
- print(data.decode()) # None=accept, str=reject message
- Modern: use aiosmtpd (PyPI) for Python 3.12+
smtpd SMTP Server Pipeline
# app/smtpdutil.py — sink, capture, filter, aiosmtpd bridge
from __future__ import annotations
import asyncio
import email
import email.message
import email.policy
import io
import logging
import re
import socket
import threading
import time
from dataclasses import dataclass, field
from typing import Callable
logger = logging.getLogger(__name__)
# Guard for Python 3.12+ where smtpd is removed
try:
import smtpd as _smtpd
import asyncore as _asyncore
_SMTPD_AVAILABLE = True
except ImportError:
_SMTPD_AVAILABLE = False
# ─────────────────────────────────────────────────────────────────────────────
# 1. Message capture dataclass
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class CapturedMessage:
peer: tuple[str, int] # (ip, port)
mailfrom: str
rcpttos: list[str]
data: bytes
received_at: float = field(default_factory=time.time)
@property
def message(self) -> email.message.EmailMessage:
"""Parse the raw RFC 2822 bytes as an EmailMessage."""
return email.message_from_bytes(self.data, policy=email.policy.default)
@property
def subject(self) -> str:
return self.message.get("subject", "(no subject)")
@property
def body(self) -> str:
msg = self.message
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
return part.get_content()
return ""
return msg.get_content()
def __str__(self) -> str:
return (f"From: {self.mailfrom} "
f"To: {', '.join(self.rcpttos)} "
f"Subject: {self.subject[:50]}")
# ─────────────────────────────────────────────────────────────────────────────
# 2. smtpd-based servers (Python < 3.12)
# ─────────────────────────────────────────────────────────────────────────────
if _SMTPD_AVAILABLE:
class CaptureSink(_smtpd.SMTPServer):
"""
SMTP server that captures incoming messages to a list.
Thread-safe: messages list can be read from outside the asyncore loop.
Example:
sink = CaptureSink(('127.0.0.1', 8025))
# Run sink.loop() in a thread, then send mail via smtplib
"""
def __init__(self, localaddr: tuple[str, int]) -> None:
super().__init__(localaddr, None)
self.messages: list[CapturedMessage] = []
self._lock = threading.Lock()
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
msg = CapturedMessage(
peer=peer, mailfrom=mailfrom,
rcpttos=list(rcpttos), data=data,
)
with self._lock:
self.messages.append(msg)
logger.debug("Captured message from %s", mailfrom)
return None # accept
def get_messages(self) -> list[CapturedMessage]:
with self._lock:
return list(self.messages)
def clear(self) -> None:
with self._lock:
self.messages.clear()
class FilterSink(_smtpd.SMTPServer):
"""
SMTP server that calls a user-supplied handler for each message.
The handler returns None to accept or a rejection string.
Example:
def my_handler(msg: CapturedMessage):
if "spam" in msg.subject.lower():
return "550 Spam rejected"
print(msg)
return None
sink = FilterSink(('127.0.0.1', 8025), handler=my_handler)
"""
def __init__(
self,
localaddr: tuple[str, int],
handler: Callable[[CapturedMessage], str | None] | None = None,
) -> None:
super().__init__(localaddr, None)
self._handler = handler or (lambda m: None)
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
msg = CapturedMessage(
peer=peer, mailfrom=mailfrom,
rcpttos=list(rcpttos), data=data,
)
return self._handler(msg)
def start_sink_in_thread(
addr: tuple[str, int],
) -> "tuple[CaptureSink, threading.Thread]":
"""
Start a CaptureSink in a background daemon thread.
Returns (sink, thread). The thread runs asyncore.loop() indefinitely.
Example:
sink, thread = start_sink_in_thread(('127.0.0.1', 8025))
# send mail...
msgs = sink.get_messages()
"""
sink = CaptureSink(addr)
def _run():
_asyncore.loop(timeout=1.0, count=None, use_poll=True)
t = threading.Thread(target=_run, daemon=True, name="smtpd-sink")
t.start()
return sink, t
# ─────────────────────────────────────────────────────────────────────────────
# 3. aiosmtpd bridge (Python 3.12+ — requires: pip install aiosmtpd)
# ─────────────────────────────────────────────────────────────────────────────
try:
from aiosmtpd.controller import Controller as _AioController
from aiosmtpd.handlers import Message as _AioMessage
_AIOSMTPD_AVAILABLE = True
except ImportError:
_AIOSMTPD_AVAILABLE = False
if _AIOSMTPD_AVAILABLE:
class AioCaptureHandler(_AioMessage):
"""
aiosmtpd handler that captures all incoming messages.
Example:
handler = AioCaptureHandler()
ctrl = Controller(handler, hostname='127.0.0.1', port=8025)
ctrl.start()
# send mail ...
msgs = handler.get_messages()
ctrl.stop()
"""
def __init__(self) -> None:
super().__init__(message_class=email.message.EmailMessage)
self._messages: list[CapturedMessage] = []
self._lock = threading.Lock()
def handle_message(self, message: email.message.EmailMessage) -> None:
# aiosmtpd gives us a parsed message; reconstruct CapturedMessage
raw = message.as_bytes()
sender = message.get("from", "")
rcptlist = message.get_all("to", [])
msg = CapturedMessage(
peer=("127.0.0.1", 0),
mailfrom=sender,
rcpttos=rcptlist,
data=raw,
)
with self._lock:
self._messages.append(msg)
def get_messages(self) -> list[CapturedMessage]:
with self._lock:
return list(self._messages)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Pure asyncio minimal SMTP sink (zero extra deps, Python 3.12+ compatible)
# ─────────────────────────────────────────────────────────────────────────────
class AsyncioSmtpSink:
"""
Minimal SMTP sink using asyncio streams — no external dependencies.
Supports HELO/EHLO, MAIL FROM, RCPT TO, DATA, QUIT.
Does NOT support AUTH or TLS (development use only).
Example:
sink = AsyncioSmtpSink()
asyncio.run(sink.run_forever('127.0.0.1', 8025))
"""
def __init__(self, handler: Callable[[CapturedMessage], None] | None = None):
self._handler = handler
self.messages: list[CapturedMessage] = []
async def _handle(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
peer = writer.get_extra_info("peername")
async def send(line: str) -> None:
writer.write((line + "\r\n").encode())
await writer.drain()
await send("220 localhost ESMTP AsyncioSmtpSink")
mailfrom = ""
rcpttos: list[str] = []
in_data = False
data_lines: list[bytes] = []
try:
while True:
raw = await asyncio.wait_for(reader.readline(), timeout=60)
if not raw:
break
line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
upper = line.upper()
if upper.startswith("HELO") or upper.startswith("EHLO"):
await send(f"250 Hello")
elif upper.startswith("MAIL FROM:"):
mailfrom = re.sub(r"[<>]", "", line[10:].strip())
rcpttos = []
data_lines = []
await send("250 OK")
elif upper.startswith("RCPT TO:"):
rcpttos.append(re.sub(r"[<>]", "", line[8:].strip()))
await send("250 OK")
elif upper == "DATA":
await send("354 End data with <CR><LF>.<CR><LF>")
in_data = True
while True:
dline = await asyncio.wait_for(reader.readline(), timeout=300)
if dline in (b".\r\n", b".\n"):
break
# Un-dot-stuffing
if dline.startswith(b".."):
dline = dline[1:]
data_lines.append(dline.rstrip(b"\r\n"))
msg = CapturedMessage(
peer=peer, mailfrom=mailfrom,
rcpttos=list(rcpttos),
data=b"\n".join(data_lines),
)
self.messages.append(msg)
if self._handler:
self._handler(msg)
await send("250 Message accepted")
elif upper == "QUIT":
await send("221 Bye")
break
elif upper == "RSET":
mailfrom = ""; rcpttos = []; data_lines = []
await send("250 OK")
elif upper == "NOOP":
await send("250 OK")
else:
await send("502 Not implemented")
except (asyncio.TimeoutError, ConnectionError):
pass
finally:
writer.close()
async def run_forever(self, host: str = "127.0.0.1", port: int = 8025) -> None:
"""Start the asyncio SMTP sink and run until cancelled."""
server = await asyncio.start_server(self._handle, host, port)
print(f"AsyncioSmtpSink listening on {host}:{port}")
async with server:
await server.serve_forever()
def start_in_thread(self, host: str = "127.0.0.1", port: int = 8025) -> None:
"""Start the SMTP sink in a background daemon thread."""
loop = asyncio.new_event_loop()
def _run():
asyncio.set_event_loop(loop)
loop.run_until_complete(self.run_forever(host, port))
t = threading.Thread(target=_run, daemon=True, name="asyncio-smtp-sink")
t.start()
time.sleep(0.15) # let the event loop start
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import smtplib
print("=== smtpd demo ===")
HOST, PORT = "127.0.0.1", 18025
# ── pick the best available server ────────────────────────────────────────
if _SMTPD_AVAILABLE:
print("\n--- using smtpd.CaptureSink ---")
sink, _thread = start_sink_in_thread((HOST, PORT))
time.sleep(0.1) # let asyncore start
else:
print("\n--- smtpd removed (Python 3.12+); using AsyncioSmtpSink ---")
aio_sink = AsyncioSmtpSink()
aio_sink.start_in_thread(HOST, PORT)
sink = aio_sink
# ── send a test message via smtplib ────────────────────────────────────────
print(f"\n--- sending test email to {HOST}:{PORT} ---")
try:
with smtplib.SMTP(HOST, PORT, timeout=5) as s:
s.sendmail(
"[email protected]",
["[email protected]", "[email protected]"],
b"From: [email protected]\r\n"
b"To: [email protected]\r\n"
b"Subject: Test from smtpd demo\r\n"
b"\r\n"
b"Hello from the smtpd demo!\r\n",
)
print(" message sent successfully")
except Exception as e:
print(f" send error: {e}")
time.sleep(0.2)
# ── inspect captured messages ──────────────────────────────────────────────
print("\n--- captured messages ---")
msgs = (sink.get_messages() if _SMTPD_AVAILABLE
else getattr(sink, "messages", []))
for m in msgs:
print(f" {m}")
print(f" body: {m.body[:80]!r}")
if not msgs:
print(" (no messages captured — network or timing issue)")
# ── CapturedMessage.message parsing demo ──────────────────────────────────
print("\n--- CapturedMessage email parsing ---")
raw = (
b"From: [email protected]\r\n"
b"To: [email protected]\r\n"
b"Subject: Hello world\r\n"
b"\r\n"
b"This is the body.\r\n"
)
cm = CapturedMessage(peer=("127.0.0.1", 9000),
mailfrom="[email protected]",
rcpttos=["[email protected]"],
data=raw)
print(f" subject: {cm.subject!r}")
print(f" body: {cm.body!r}")
print("\n=== done ===")
For the aiosmtpd (PyPI) alternative — aiosmtpd.controller.Controller(handler, hostname, port) is the official replacement for smtpd, built on asyncio with support for AUTH, STARTTLS, SIZE, and SMTPUTF8 extensions — use aiosmtpd for all Python 3.12+ SMTP server code; the AioCaptureHandler above shows how to bridge from aiosmtpd to the same CapturedMessage dataclass. For the AsyncioSmtpSink (zero deps) alternative — the AsyncioSmtpSink class above implements a minimal SMTP server using only asyncio.start_server(), covering HELO/EHLO, MAIL FROM, RCPT TO, DATA (with dot-stuffing), QUIT, RSET, and NOOP — use this approach when you want a development mail sink with no external dependencies, need Python 3.12+ compatibility, and don’t need ESMTP extensions like AUTH or STARTTLS. The Claude Skills 360 bundle includes smtpd skill sets covering CapturedMessage with message/subject/body parsing, CaptureSink/FilterSink smtpd-based servers with start_sink_in_thread(), AioCaptureHandler aiosmtpd bridge, and AsyncioSmtpSink zero-dependency asyncio implementation with start_in_thread(). Start with the free tier to try SMTP server patterns and smtpd pipeline code generation.