Python’s pickletools module disassembles and optimizes pickle byte streams without unpickling them, making it invaluable for debugging serialization issues and auditing pickle data. import pickletools. Disassemble: pickletools.dis(pickle_bytes, output=None, indentlevel=4, annotate=0) — prints annotated opcode listing to stdout or output file object; annotate controls column width of type annotations. Generate opcodes: pickletools.genops(pickle_bytes) → iterator of (opcode, arg, pos) tuples — opcode is a _Opcode namedtuple; arg is the decoded argument; pos is the byte offset. Optimize: pickletools.optimize(p) → bytes — removes duplicate PUT/BINPUT/LONG_BINPUT memoization opcodes that store values never retrieved by a GET, reducing stream size. Opcode list: pickletools.opcodes — list of _Opcode objects with .name, .code, .arg, .stack_before, .stack_after, .proto, .doc attributes. Protocol constants: pickle protocols 0–5 with increasing efficiency. Claude Code generates pickle debuggers, protocol analyzers, size optimizers, and serialization validators.
CLAUDE.md for pickletools
## pickletools Stack
- Stdlib: import pickletools, pickle
- Dis: pickletools.dis(pickle.dumps(obj)) # print opcode listing
- Ops: for op, arg, pos in pickletools.genops(data): ...
- Opt: smaller = pickletools.optimize(data) # remove unused PUTs
- Note: Never unpickles — safe for inspecting untrusted data structure
pickletools Pickle Bytecode Analyzer Pipeline
# app/pickletoolsutil.py — disassemble, analyze, optimize, compare, validate
from __future__ import annotations
import io
import pickle
import pickletools
import struct
from dataclasses import dataclass, field
from pathlib import Path
# ─────────────────────────────────────────────────────────────────────────────
# 1. Disassembly helpers
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class OpcodeRecord:
pos: int
code: str
name: str
arg: object
proto: int # minimum protocol that introduced this opcode
def __str__(self) -> str:
arg_str = repr(self.arg) if self.arg is not None else ""
if len(arg_str) > 60:
arg_str = arg_str[:57] + "..."
return f"{self.pos:6d} {self.code!r:4s} {self.name:<24s} {arg_str}"
def disassemble(data: bytes) -> list[OpcodeRecord]:
"""
Disassemble pickle bytes into a list of OpcodeRecord objects.
Example:
records = disassemble(pickle.dumps({"a": 1}))
for r in records:
print(r)
"""
records = []
for opcode, arg, pos in pickletools.genops(data):
records.append(OpcodeRecord(
pos=pos,
code=opcode.code,
name=opcode.name,
arg=arg,
proto=opcode.proto,
))
return records
def dis_to_string(data: bytes, annotate: int = 40) -> str:
"""
Return pickletools.dis output as a string.
Example:
print(dis_to_string(pickle.dumps([1, 2, 3])))
"""
buf = io.StringIO()
pickletools.dis(data, output=buf, annotate=annotate)
return buf.getvalue()
# ─────────────────────────────────────────────────────────────────────────────
# 2. Stream analysis
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class PickleStats:
total_bytes: int
protocol: int
opcode_count: int
opcode_freq: dict[str, int]
memo_puts: int # number of memoize (PUT) opcodes
memo_gets: int # number of memo fetch (GET) opcodes
string_bytes: int # bytes consumed by string/bytes opcodes
nested_depth: int # maximum stack depth reached
def efficiency(self) -> float:
"""Ratio of GETs to PUTs — higher means memo is being used well."""
if self.memo_puts == 0:
return 1.0
return self.memo_gets / self.memo_puts
def __str__(self) -> str:
top = sorted(self.opcode_freq.items(), key=lambda kv: -kv[1])[:5]
top_str = ", ".join(f"{n}×{c}" for n, c in top)
return (f"PickleStats(proto={self.protocol}, "
f"{self.total_bytes}B, {self.opcode_count} ops, "
f"depth={self.nested_depth}, "
f"memo efficiency={self.efficiency():.2f}, "
f"top=[{top_str}])")
_PUT_OPCODES = {"PUT", "BINPUT", "LONG_BINPUT", "MEMOIZE"}
_GET_OPCODES = {"GET", "BINGET", "LONG_BINGET"}
_PUSH_OPCODES = {
"MARK", "FRAME", "PROTO",
"INT", "LONG", "LONG1", "LONG4",
"STRING", "BINSTRING", "SHORT_BINSTRING",
"BINBYTES", "SHORT_BINBYTES", "BINBYTES8",
"BYTEARRAY8", "NEXT_BUFFER", "READONLY_BUFFER",
"UNICODE", "SHORT_BINUNICODE", "BINUNICODE", "BINUNICODE8",
"FLOAT", "BINFLOAT",
"TRUE", "FALSE", "NONE",
"NEWFALSE", "NEWTRUE",
"EMPTY_LIST", "EMPTY_TUPLE", "EMPTY_DICT", "EMPTY_SET", "FROZENSET",
}
_POP_OPCODES = {
"APPEND", "APPENDS", "SETITEM", "SETITEMS",
"ADDITEMS", "BUILD", "STOP",
}
def analyze(data: bytes) -> PickleStats:
"""
Analyze a pickle byte stream for size, protocol, opcode frequency, and depth.
Example:
stats = analyze(pickle.dumps(my_obj, protocol=5))
print(stats)
"""
records = disassemble(data)
freq: dict[str, int] = {}
depth = 0
max_depth = 0
memo_puts = 0
memo_gets = 0
string_bytes = 0
protocol = 0
for r in records:
freq[r.name] = freq.get(r.name, 0) + 1
if r.name == "PROTO" and isinstance(r.arg, int):
protocol = r.arg
if r.name in _PUT_OPCODES:
memo_puts += 1
if r.name in _GET_OPCODES:
memo_gets += 1
if r.name in {"STRING", "BINSTRING", "SHORT_BINSTRING",
"BINBYTES", "SHORT_BINBYTES", "BINUNICODE",
"SHORT_BINUNICODE", "BINUNICODE8", "BINBYTES8"}:
if isinstance(r.arg, (str, bytes)):
string_bytes += len(r.arg)
if r.name in _PUSH_OPCODES:
depth += 1
max_depth = max(max_depth, depth)
elif r.name in _POP_OPCODES:
depth = max(0, depth - 1)
return PickleStats(
total_bytes=len(data),
protocol=protocol,
opcode_count=len(records),
opcode_freq=freq,
memo_puts=memo_puts,
memo_gets=memo_gets,
string_bytes=string_bytes,
nested_depth=max_depth,
)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Optimization and size comparison
# ─────────────────────────────────────────────────────────────────────────────
def optimize(data: bytes) -> bytes:
"""
Remove unused PUT memoization opcodes from a pickle stream.
Returns the optimized byte string.
Example:
original = pickle.dumps(obj)
smaller = optimize(original)
print(f"saved {len(original) - len(smaller)} bytes")
"""
return pickletools.optimize(data)
@dataclass
class SizeComparison:
protocol: int
original: int
optimized: int
saving: int
@property
def saving_pct(self) -> float:
if self.original == 0:
return 0.0
return 100 * self.saving / self.original
def __str__(self) -> str:
return (f"proto={self.protocol} "
f"{self.original}B → {self.optimized}B "
f"(-{self.saving}B / {self.saving_pct:.1f}%)")
def compare_protocols(obj: object) -> list[SizeComparison]:
"""
Pickle an object at all available protocols and compare optimized sizes.
Example:
for row in compare_protocols(my_data):
print(row)
"""
results = []
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
raw = pickle.dumps(obj, protocol=proto)
opt = optimize(raw)
results.append(SizeComparison(
protocol=proto,
original=len(raw),
optimized=len(opt),
saving=len(raw) - len(opt),
))
return results
# ─────────────────────────────────────────────────────────────────────────────
# 4. Safe structure probe (no unpickling)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class PickleProbe:
"""Lightweight structural summary extracted without unpickling."""
protocol: int
has_global: bool # True if stream contains GLOBAL/STACK_GLOBAL opcodes
global_names: list[str] # module.name strings referenced
has_reduce: bool
has_build: bool
probable_type: str # heuristic guess at top-level type
def is_safe_scalars_only(self) -> bool:
"""No GLOBAL or REDUCE opcodes → only primitive values."""
return not self.has_global and not self.has_reduce
def probe(data: bytes) -> PickleProbe:
"""
Probe pickle bytes for structure without unpickling.
Useful for auditing untrusted data.
Example:
p = probe(data)
if not p.is_safe_scalars_only():
raise ValueError(f"Untrusted: uses {p.global_names}")
"""
records = disassemble(data)
protocol = 0
globals_: list[str] = []
has_reduce = False
has_build = False
probable_type = "unknown"
for r in records:
if r.name == "PROTO" and isinstance(r.arg, int):
protocol = r.arg
if r.name == "GLOBAL" and isinstance(r.arg, str):
globals_.append(r.arg.replace(" ", "."))
if r.name == "STACK_GLOBAL":
globals_.append("<stack_global>")
if r.name in {"REDUCE", "NEWOBJ", "NEWOBJ_EX"}:
has_reduce = True
if r.name == "BUILD":
has_build = True
# Simple heuristic for top-level type
if r.name == "EMPTY_LIST":
probable_type = "list"
elif r.name == "EMPTY_DICT":
probable_type = "dict"
elif r.name == "EMPTY_TUPLE":
probable_type = "tuple"
elif r.name == "EMPTY_SET":
probable_type = "set"
elif r.name in {"INT", "LONG", "LONG1", "LONG4"}:
probable_type = "int"
elif r.name in {"FLOAT", "BINFLOAT"}:
probable_type = "float"
elif r.name in {"STRING", "BINSTRING", "SHORT_BINSTRING",
"BINUNICODE", "SHORT_BINUNICODE", "BINUNICODE8"}:
probable_type = "str"
return PickleProbe(
protocol=protocol,
has_global=len(globals_) > 0,
global_names=globals_,
has_reduce=has_reduce,
has_build=has_build,
probable_type=probable_type,
)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import datetime
print("=== pickletools demo ===")
# ── disassemble simple object ──────────────────────────────────────────────
print("\n--- dis_to_string({'key': [1, 2]}) ---")
data = pickle.dumps({"key": [1, 2]}, protocol=2)
print(dis_to_string(data))
# ── disassemble as records ────────────────────────────────────────────────
print("--- disassemble records ---")
for r in disassemble(data):
print(f" {r}")
# ── analyze ───────────────────────────────────────────────────────────────
print("\n--- analyze ---")
complex_obj = {
"x": list(range(20)),
"y": {"nested": "data", "flag": True},
"z": (1, 2, 3),
"shared": [1, 2],
}
complex_obj["also_shared"] = complex_obj["shared"] # creates memo entry
stats = analyze(pickle.dumps(complex_obj, protocol=4))
print(f" {stats}")
# ── compare_protocols ─────────────────────────────────────────────────────
print("\n--- compare_protocols ---")
for row in compare_protocols(complex_obj):
print(f" {row}")
# ── probe (safety audit) ──────────────────────────────────────────────────
print("\n--- probe ---")
scalars = pickle.dumps({"a": 1, "b": [2, 3]})
class_data = pickle.dumps(datetime.datetime.now())
for label, d in [("scalars", scalars), ("datetime", class_data)]:
p = probe(d)
safe = "SAFE" if p.is_safe_scalars_only() else f"UNSAFE ({p.global_names})"
print(f" {label:12s}: proto={p.protocol}, "
f"type={p.probable_type}, {safe}")
print("\n=== done ===")
For the pickle alternative — pickle.loads(data) and pickle.load(f) fully deserialize data into live Python objects, and pickle.dumps(obj, protocol=N) controls protocol version and wire format — use pickle when you need to actually reconstruct objects; use pickletools when you want to inspect or audit pickle streams without executing them (security reviews, debugging truncated streams, understanding why serialization is large). For the marshal alternative — marshal.dumps(obj) / marshal.loads(data) serialize basic Python types (used internally for .pyc files) with a simpler byte format — use marshal only for code objects and .pyc-adjacent tooling; use pickle + pickletools for general-purpose object serialization with full class support and the ability to inspect the resulting byte stream. The Claude Skills 360 bundle includes pickletools skill sets covering OpcodeRecord with disassemble()/dis_to_string() disassemblers, PickleStats with analyze() stream analyzer, SizeComparison with optimize()/compare_protocols() size tools, and PickleProbe with probe() safety auditor. Start with the free tier to try pickle inspection patterns and pickletools pipeline code generation.