Python’s email.parser module provides classes for parsing raw RFC 5322 email messages into email.message.Message or EmailMessage objects. from email.parser import BytesParser, Parser, BytesHeaderParser, HeaderParser, BytesFeedParser, FeedParser. BytesParser(policy=policy.default).parsebytes(raw) — parse bytes in one shot (preferred, production). Parser(policy=policy.default).parsestr(text) — parse str. BytesHeaderParser / HeaderParser — parse headers only, skip body (fast path for metadata). FeedParser / BytesFeedParser — push-mode streaming: fp.feed(chunk) repeatedly, then msg = fp.close(). All parsers accept an optional policy argument; always pass policy.default (or policy.SMTP) for modern EmailMessage output with typed headers — the legacy default is policy.compat32. File-based parsing: BytesParser().parse(open("msg.eml", "rb")). Defect collection: after parsing, msg.defects and per-part part.defects list any RFC violations found. Claude Code generates standards-compliant email processors, header extractors, streaming message readers, mbox file parsers, and email pipeline stages.
CLAUDE.md for email.parser
## email.parser Stack
- Stdlib: from email.parser import BytesParser, Parser
- from email.parser import BytesHeaderParser, HeaderParser
- from email.parser import BytesFeedParser, FeedParser
- from email import policy
- Bytes: msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
- Str: msg = Parser(policy=policy.default).parsestr(raw_text)
- File: msg = BytesParser(policy=policy.default).parse(open("x.eml","rb"))
- HdrOnly: hdr = BytesHeaderParser(policy=policy.default).parsebytes(raw)
- Stream: fp = BytesFeedParser(policy=policy.default)
- for chunk in ...: fp.feed(chunk)
- msg = fp.close()
- Defects: msg.defects # list; empty if RFC-compliant
email.parser RFC 5322 Parsing Pipeline
# app/emailparserutil.py — bytes, str, header-only, streaming, defects, batch
from __future__ import annotations
import io
import os
from dataclasses import dataclass, field
from email import policy as _policy
from email.message import EmailMessage, Message
from email.parser import (
BytesFeedParser,
BytesHeaderParser,
BytesParser,
FeedParser,
HeaderParser,
Parser,
)
from typing import Any, Iterator
# ─────────────────────────────────────────────────────────────────────────────
# 1. One-shot parsing helpers
# ─────────────────────────────────────────────────────────────────────────────
def parse_bytes(raw: bytes,
pol: Any = _policy.default) -> EmailMessage:
"""
Parse raw email bytes into an EmailMessage.
Example:
msg = parse_bytes(b"From: [email protected]\r\nSubject: Hi\r\n\r\nBody")
print(msg["Subject"])
"""
return BytesParser(policy=pol).parsebytes(raw) # type: ignore[return-value]
def parse_str(text: str,
pol: Any = _policy.default) -> EmailMessage:
"""
Parse a raw email string into an EmailMessage.
Example:
msg = parse_str("From: [email protected]\r\nSubject: Hi\r\n\r\nBody")
"""
return Parser(policy=pol).parsestr(text) # type: ignore[return-value]
def parse_file(path: str,
pol: Any = _policy.default) -> EmailMessage:
"""
Parse an .eml file from disk.
Example:
msg = parse_file("/tmp/message.eml")
print(msg["From"])
"""
with open(path, "rb") as fp:
return BytesParser(policy=pol).parse(fp) # type: ignore[return-value]
# ─────────────────────────────────────────────────────────────────────────────
# 2. Header-only parsers (fast path)
# ─────────────────────────────────────────────────────────────────────────────
def parse_headers_only(raw: "bytes | str",
pol: Any = _policy.default) -> EmailMessage:
"""
Parse only the headers of a message; body is not processed.
Much faster than a full parse for large messages.
Example:
hdr = parse_headers_only(raw_bytes)
print(hdr["Subject"], hdr["From"])
"""
if isinstance(raw, bytes):
return BytesHeaderParser(policy=pol).parsebytes(raw) # type: ignore[return-value]
return HeaderParser(policy=pol).parsestr(raw) # type: ignore[return-value]
def quick_subject(raw: "bytes | str") -> str:
"""
Extract Subject without fully parsing the message body.
Example:
subj = quick_subject(raw_bytes)
"""
return parse_headers_only(raw).get("Subject", "")
def quick_from(raw: "bytes | str") -> str:
"""
Extract the From header value without parsing the body.
Example:
sender = quick_from(raw_bytes)
"""
return str(parse_headers_only(raw).get("From", ""))
# ─────────────────────────────────────────────────────────────────────────────
# 3. Streaming / incremental parser
# ─────────────────────────────────────────────────────────────────────────────
def parse_stream(stream: "io.RawIOBase | io.BufferedIOBase",
chunk_size: int = 65536,
pol: Any = _policy.default) -> EmailMessage:
"""
Parse a message from a binary stream in chunks using BytesFeedParser.
Suitable for network sockets or large file streams.
Example:
with open("big.eml", "rb") as f:
msg = parse_stream(f, chunk_size=16384)
"""
fp = BytesFeedParser(policy=pol)
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
fp.feed(chunk)
return fp.close() # type: ignore[return-value]
def parse_chunks(chunks: "Iterator[bytes]",
pol: Any = _policy.default) -> EmailMessage:
"""
Parse a message from an iterator of byte chunks.
Example:
msg = parse_chunks(iter([b"Subject: Hi\r\n", b"\r\n", b"body"]))
"""
fp = BytesFeedParser(policy=pol)
for chunk in chunks:
fp.feed(chunk)
return fp.close() # type: ignore[return-value]
# ─────────────────────────────────────────────────────────────────────────────
# 4. Defect collection
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ParseReport:
ok: bool
defects: list[str] = field(default_factory=list)
msg: Any = None
def parse_with_report(raw: "bytes | str",
pol: Any = _policy.default) -> ParseReport:
"""
Parse a message and collect RFC defects from all parts.
Example:
report = parse_with_report(raw_bytes)
if not report.ok:
for d in report.defects:
print("DEFECT:", d)
"""
lax = _policy.default.clone(raise_on_defect=False) if pol is _policy.default else pol
try:
if isinstance(raw, bytes):
msg = BytesParser(policy=lax).parsebytes(raw)
else:
msg = Parser(policy=lax).parsestr(raw)
except Exception as e:
return ParseReport(ok=False, defects=[f"fatal: {e}"])
defects: list[str] = [str(d) for d in msg.defects]
for part in msg.walk():
defects.extend(str(d) for d in part.defects)
return ParseReport(ok=len(defects) == 0, defects=defects, msg=msg)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Batch directory parser
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class BatchResult:
path: str
ok: bool
subject: str
from_: str
defects: list[str] = field(default_factory=list)
error: str = ""
def parse_directory(directory: str,
pol: Any = _policy.default,
headers_only: bool = False) -> list[BatchResult]:
"""
Parse all .eml files in a directory. Returns one BatchResult per file.
headers_only=True uses the fast BytesHeaderParser path.
Example:
results = parse_directory("/var/mail/inbox")
for r in results:
print(r.subject, r.from_, r.ok)
"""
results: list[BatchResult] = []
for fname in sorted(os.listdir(directory)):
if not fname.lower().endswith(".eml"):
continue
fpath = os.path.join(directory, fname)
try:
with open(fpath, "rb") as f:
raw = f.read()
if headers_only:
msg = BytesHeaderParser(policy=pol).parsebytes(raw)
defects: list[str] = []
else:
report = parse_with_report(raw, pol)
msg = report.msg
defects = report.defects
results.append(BatchResult(
path=fpath,
ok=len(defects) == 0,
subject=msg.get("Subject", ""),
from_=str(msg.get("From", "")),
defects=defects,
))
except Exception as e:
results.append(BatchResult(
path=fpath, ok=False, subject="", from_="", error=str(e)
))
return results
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== email.parser demo ===")
clean = (
b"From: Alice <[email protected]>\r\n"
b"To: Bob <[email protected]>\r\n"
b"Subject: Parser demo\r\n"
b"Date: Mon, 03 Feb 2029 09:00:00 +0000\r\n"
b"Message-ID: <[email protected]>\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n"
b"\r\n"
b"Hello from email.parser!\r\n"
)
defective = (
b"From: notavalidemail\r\n"
b"To:\r\n"
b"Subject:\r\n"
b"\r\n"
b"sparse body"
)
# ── parse_bytes ────────────────────────────────────────────────────────
print("\n--- parse_bytes ---")
msg = parse_bytes(clean)
print(f" Subject : {msg['Subject']!r}")
print(f" From : {msg['From']!r}")
print(f" body : {msg.get_payload()!r}")
# ── parse_headers_only ─────────────────────────────────────────────────
print("\n--- parse_headers_only ---")
hdr = parse_headers_only(clean)
print(f" Subject : {hdr['Subject']!r}")
print(f" has_body: {bool(hdr.get_payload())}")
# ── parse_stream ───────────────────────────────────────────────────────
print("\n--- parse_stream ---")
stream_msg = parse_stream(io.BytesIO(clean), chunk_size=50)
print(f" Subject : {stream_msg['Subject']!r}")
# ── parse_chunks ───────────────────────────────────────────────────────
print("\n--- parse_chunks ---")
chunks = [clean[i:i+40] for i in range(0, len(clean), 40)]
chunked_msg = parse_chunks(iter(chunks))
print(f" Subject : {chunked_msg['Subject']!r}")
print(f" chunks : {len(chunks)}")
# ── parse_with_report (clean) ──────────────────────────────────────────
print("\n--- parse_with_report (clean) ---")
report_clean = parse_with_report(clean)
print(f" ok : {report_clean.ok}")
print(f" defects : {report_clean.defects}")
# ── parse_with_report (defective) ─────────────────────────────────────
print("\n--- parse_with_report (defective) ---")
report_bad = parse_with_report(defective)
print(f" ok : {report_bad.ok}")
for d in report_bad.defects:
print(f" defect : {d}")
# ── quick_subject / quick_from ─────────────────────────────────────────
print("\n--- quick_subject / quick_from ---")
print(f" subject : {quick_subject(clean)!r}")
print(f" from : {quick_from(clean)!r}")
print("\n=== done ===")
For the email.message.EmailMessage stdlib companion — when parsed with policy.default, BytesParser returns an EmailMessage whose headers are typed objects (msg["From"].addresses gives Address instances); always pass policy=email.policy.default to the parser rather than relying on the legacy compat32 default to get structured headers, defect detection, and modern get_content()/iter_attachments() access. For the mail-parser (PyPI) alternative — mailparser.parse_from_bytes(raw) provides a higher-level parsed object with .from_, .to, .subject, .date, .attachments, .body, .text_plain, and .text_html fields that map directly to common email fields without navigating the email.message MIME tree — use mail-parser for rapid extraction of common fields in data pipelines; use stdlib email.parser for full RFC 5322 control, policy customisation, and zero-dependency deployments. The Claude Skills 360 bundle includes email.parser skill sets covering parse_bytes()/parse_str()/parse_file() one-shot parsers, parse_headers_only()/quick_subject()/quick_from() fast header extractors, parse_stream()/parse_chunks() streaming parsers, ParseReport/parse_with_report() defect collector, and BatchResult/parse_directory() bulk .eml processor. Start with the free tier to try RFC 5322 parsing patterns and email.parser pipeline code generation.