Python’s xml.dom.pulldom module provides a hybrid SAX/DOM parser: it produces a lazy event stream like SAX but lets you optionally expand any element into a full DOM subtree (like minidom) on demand. from xml.dom import pulldom. Parse file: events = pulldom.parse("data.xml"). Parse string: events = pulldom.parseString(b"<root/>"). Iterate: for event, node in events: — event is one of the constants pulldom.START_ELEMENT, END_ELEMENT, CHARACTERS, PROCESSING_INSTRUCTION, COMMENT, IGNORABLE_WHITESPACE, START_DOCUMENT, END_DOCUMENT. Expand: events.expandNode(node) — reads ahead in the stream to build the node’s full DOM subtree in memory; call only on START_ELEMENT nodes you want to process deeply, skip the rest. The key advantage over SAX: you can ignore most of the document at SAX speed and call expandNode() only on the elements you care about, avoiding loading the full document into memory like minidom would. Claude Code generates memory-efficient XML stream processors, selective element extractors, large-file RSS readers, XML-to-JSON converters, and document-structure validators.
CLAUDE.md for xml.dom.pulldom
## xml.dom.pulldom Stack
- Stdlib: from xml.dom import pulldom
- Parse file: events = pulldom.parse("data.xml")
- Parse bytes: events = pulldom.parseString(xml_bytes)
- Iterate: for event, node in events: ...
- Events: pulldom.START_ELEMENT / END_ELEMENT / CHARACTERS
- PROCESSING_INSTRUCTION / COMMENT / START_DOCUMENT
- Expand: events.expandNode(node) # build full DOM subtree
- # then use node as minidom Node: node.toxml()
- Pattern: check START_ELEMENT + node.tagName → expandNode() → process
xml.dom.pulldom Streaming Parser Pipeline
# app/pulldomutil.py — iterate, extract, count, convert, validate, stream-large
from __future__ import annotations
from xml.dom import pulldom, minidom
from typing import Any, Iterator
from dataclasses import dataclass, field
import io
# ─────────────────────────────────────────────────────────────────────────────
# 1. Event stream introspection helpers
# ─────────────────────────────────────────────────────────────────────────────
EVENT_NAMES = {
pulldom.START_ELEMENT: "START_ELEMENT",
pulldom.END_ELEMENT: "END_ELEMENT",
pulldom.CHARACTERS: "CHARACTERS",
pulldom.PROCESSING_INSTRUCTION: "PROCESSING_INSTRUCTION",
pulldom.COMMENT: "COMMENT",
pulldom.IGNORABLE_WHITESPACE: "IGNORABLE_WHITESPACE",
pulldom.START_DOCUMENT: "START_DOCUMENT",
pulldom.END_DOCUMENT: "END_DOCUMENT",
}
def event_name(evt: int) -> str:
"""
Return the human-readable name of a pulldom event constant.
Example:
event_name(pulldom.START_ELEMENT) # "START_ELEMENT"
"""
return EVENT_NAMES.get(evt, f"UNKNOWN({evt})")
def iter_start_elements(xml_source: "str | bytes | io.IOBase",
tag: str | None = None) -> Iterator[minidom.Element]:
"""
Yield fully-expanded DOM elements from an XML source.
If tag is given, only elements with that tag name are yielded.
Skips all other events without building DOM nodes.
Example:
for el in iter_start_elements(xml_bytes, "item"):
print(el.getAttribute("id"))
"""
if isinstance(xml_source, (str, bytes)):
events = pulldom.parseString(
xml_source if isinstance(xml_source, bytes)
else xml_source.encode("utf-8"))
else:
events = pulldom.parse(xml_source)
for event, node in events:
if event == pulldom.START_ELEMENT:
if tag is None or node.tagName == tag:
events.expandNode(node)
yield node
# ─────────────────────────────────────────────────────────────────────────────
# 2. Element counter (SAX-speed, no DOM expansion)
# ─────────────────────────────────────────────────────────────────────────────
def count_elements(xml_source: "str | bytes",
tag: str | None = None) -> dict[str, int]:
"""
Count element occurrences without expanding any nodes.
If tag is given, count only that tag; otherwise count all.
Returns {tag: count}.
Example:
counts = count_elements(xml_bytes)
counts = count_elements(xml_bytes, "item")
"""
if isinstance(xml_source, str):
xml_source = xml_source.encode("utf-8")
events = pulldom.parseString(xml_source)
counts: dict[str, int] = {}
for event, node in events:
if event == pulldom.START_ELEMENT:
name = node.tagName
if tag is None or name == tag:
counts[name] = counts.get(name, 0) + 1
return counts
# ─────────────────────────────────────────────────────────────────────────────
# 3. Selective element extractor
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ExtractedElement:
tag: str
attributes: dict[str, str]
text: str
xml: str # serialised to XML string
def extract_elements(xml_source: "str | bytes",
tag: str,
max_items: int = 1000) -> list[ExtractedElement]:
"""
Extract all elements with the given tag name into ExtractedElement records.
expandNode() is called only for matching elements.
Example:
items = extract_elements(rss_bytes, "item", max_items=50)
for item in items:
print(item.attributes, item.text[:80])
"""
if isinstance(xml_source, str):
xml_source = xml_source.encode("utf-8")
results: list[ExtractedElement] = []
events = pulldom.parseString(xml_source)
for event, node in events:
if len(results) >= max_items:
break
if event == pulldom.START_ELEMENT and node.tagName == tag:
events.expandNode(node)
attrs = {}
for i in range(node.attributes.length):
a = node.attributes.item(i)
attrs[a.name] = a.value
texts: list[str] = []
for child in node.childNodes:
if child.nodeType == child.TEXT_NODE:
texts.append(child.data)
elif child.nodeType == child.ELEMENT_NODE:
for sub in child.childNodes:
if sub.nodeType == sub.TEXT_NODE:
texts.append(sub.data)
results.append(ExtractedElement(
tag=node.tagName,
attributes=attrs,
text="".join(texts).strip(),
xml=node.toxml(),
))
return results
# ─────────────────────────────────────────────────────────────────────────────
# 4. XML-to-dict with selective expand
# ─────────────────────────────────────────────────────────────────────────────
def node_to_dict(node: Any) -> Any:
"""
Recursively convert a minidom Node subtree to a Python dict/list/str.
Example:
for el in iter_start_elements(xml, "record"):
d = node_to_dict(el)
print(d)
"""
if node.nodeType == node.TEXT_NODE:
return node.data.strip()
if node.nodeType == node.ELEMENT_NODE:
result: dict[str, Any] = {}
# attributes
for i in range(node.attributes.length):
a = node.attributes.item(i)
result[f"@{a.name}"] = a.value
# children
children = [c for c in node.childNodes
if c.nodeType in (c.ELEMENT_NODE, c.TEXT_NODE)]
for child in children:
key = child.nodeName if child.nodeType == child.ELEMENT_NODE else "#text"
val = node_to_dict(child)
if not val:
continue
if key in result:
existing = result[key]
if not isinstance(existing, list):
result[key] = [existing]
result[key].append(val)
else:
result[key] = val
if not result:
return ""
if list(result.keys()) == ["#text"]:
return result["#text"]
return result
return ""
def xml_to_dicts(xml_source: "str | bytes",
root_tag: str) -> list[dict]:
"""
Parse XML and return a list of dicts for elements matching root_tag.
Example:
records = xml_to_dicts(xml_bytes, "person")
for r in records:
print(r.get("name"), r.get("email"))
"""
return [node_to_dict(el)
for el in iter_start_elements(xml_source, root_tag)]
# ─────────────────────────────────────────────────────────────────────────────
# 5. Structure validator
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class ValidationReport:
valid: bool
element_count: int
depth_max: int
required_tags_found: list[str]
required_tags_missing: list[str]
errors: list[str] = field(default_factory=list)
def validate_xml_structure(xml_source: "str | bytes",
required_tags: list[str] | None = None) -> ValidationReport:
"""
Validate that an XML document is well-formed and contains required tags.
Example:
report = validate_xml_structure(xml_bytes, required_tags=["title", "item"])
print(report.valid, report.required_tags_missing)
"""
if isinstance(xml_source, str):
xml_source = xml_source.encode("utf-8")
required_tags = required_tags or []
found_tags: set[str] = set()
element_count = 0
depth = 0
depth_max = 0
errors: list[str] = []
try:
events = pulldom.parseString(xml_source)
for event, node in events:
if event == pulldom.START_ELEMENT:
element_count += 1
depth += 1
depth_max = max(depth_max, depth)
found_tags.add(node.tagName)
elif event == pulldom.END_ELEMENT:
depth -= 1
except Exception as e:
errors.append(f"parse error: {e}")
found = [t for t in required_tags if t in found_tags]
missing = [t for t in required_tags if t not in found_tags]
return ValidationReport(
valid=len(errors) == 0,
element_count=element_count,
depth_max=depth_max,
required_tags_found=found,
required_tags_missing=missing,
errors=errors,
)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== xml.dom.pulldom demo ===")
sample_xml = b"""<?xml version="1.0"?>
<catalog>
<book id="b1" lang="en">
<title>Python Cookbook</title>
<author>David Beazley</author>
<price>39.99</price>
</book>
<book id="b2" lang="fr">
<title>Apprendre Python</title>
<author>Mark Lutz</author>
<price>29.99</price>
</book>
<magazine id="m1">
<title>Python Weekly</title>
</magazine>
</catalog>"""
# ── count_elements ────────────────────────────────────────────────────────
print("\n--- count_elements ---")
counts = count_elements(sample_xml)
for name, n in sorted(counts.items()):
print(f" {name:15s}: {n}")
# ── extract_elements ──────────────────────────────────────────────────────
print("\n--- extract_elements (book) ---")
books = extract_elements(sample_xml, "book")
for b in books:
print(f" id={b.attributes.get('id')} "
f"lang={b.attributes.get('lang')} "
f"text={b.text[:30]!r}")
# ── xml_to_dicts ──────────────────────────────────────────────────────────
print("\n--- xml_to_dicts (book) ---")
for d in xml_to_dicts(sample_xml, "book"):
print(f" {d}")
# ── validate_xml_structure ────────────────────────────────────────────────
print("\n--- validate_xml_structure ---")
report = validate_xml_structure(
sample_xml, required_tags=["title", "author", "isbn"])
print(f" valid = {report.valid}")
print(f" element_count = {report.element_count}")
print(f" depth_max = {report.depth_max}")
print(f" found = {report.required_tags_found}")
print(f" missing = {report.required_tags_missing}")
# ── event_name ────────────────────────────────────────────────────────────
print("\n--- event names ---")
for k, name in list(EVENT_NAMES.items())[:5]:
print(f" {k}: {name}")
print("\n=== done ===")
For the xml.etree.ElementTree stdlib alternative — ET.iterparse(source, events=("start","end")) provides a similar pull-parsing API with lower overhead per event and a simpler elem.clear() pattern for streaming large files — use iterparse with elem.clear() for most large-XML streaming tasks as it is faster and has a cleaner API; use pulldom when you need selective DOM expansion of matched elements to exploit minidom’s Node.toxml(), XPath-like traversal, or namespace resolution. For the lxml.etree (PyPI) alternative — lxml.etree.iterparse(file, events=("start","end","start-ns")) is 3–10× faster than stdlib XML parsing and adds DTD/schema validation, XPath/XSLT, and better namespace handling — use lxml for production XML processing of large or complex documents. The Claude Skills 360 bundle includes xml.dom.pulldom skill sets covering event_name()/iter_start_elements() event helpers, count_elements() SAX-speed counter, ExtractedElement/extract_elements() selective extractor, node_to_dict()/xml_to_dicts() converter, and ValidationReport/validate_xml_structure() structure validator. Start with the free tier to try pull-mode XML patterns and pulldom pipeline code generation.