Python’s lzma module provides LZMA and XZ compression, offering the best compression ratio among stdlib codecs. import lzma. compress: lzma.compress(data) → XZ-format bytes; preset=9 for maximum (slow) compression; preset=1 for fast; format=lzma.FORMAT_ALONE for standalone LZMA. decompress: lzma.decompress(data) — auto-detects XZ or LZMA format; memlimit=64*1024*1024 to cap decompression memory. lzma.open: with lzma.open("file.xz", "rb") as f: data = f.read() — reads XZ files; "wt" for compressed text write. LZMAFile: lzma.LZMAFile("out.xz", "w", preset=6) — file-like object. Incremental: c = lzma.LZMACompressor(preset=6); c.compress(chunk1) + c.compress(chunk2) + c.flush(). FORMAT_XZ: FORMAT_XZ — .xz container (default, with block headers and integrity check). FORMAT_ALONE: .lzma container (single block, no recovery). FORMAT_RAW: no container, for embedding in another format. CHECK_CRC64 (default for XZ), CHECK_SHA256. Filter chains: filters=[{"id": lzma.FILTER_DELTA, "dist": 4}, {"id": lzma.FILTER_LZMA2, "preset": 9}] — pre-filter for structured binary data. lzma.is_check_supported(lzma.CHECK_SHA256) — check platform support. Claude Code generates XZ file readers, streaming compressors, preset benchmarkers, and multi-filter archive pipelines.
CLAUDE.md for lzma
## lzma Stack
- Stdlib: import lzma
- One-shot: data = lzma.compress(raw, preset=6); raw = lzma.decompress(data)
- File: with lzma.open("file.xz", "rb") as f: data = f.read()
- Write: with lzma.open("out.xz", "wb", preset=6) as f: f.write(data)
- Stream: c = lzma.LZMACompressor(); chunks = [c.compress(c) for c in src]; c.flush()
lzma Compression Pipeline
# app/lzmautil.py — compress/decompress, file ops, streaming, benchmark, archive
from __future__ import annotations
import io
import lzma
import os
import time
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Generator, Callable, Iterator
# ─────────────────────────────────────────────────────────────────────────────
# 1. One-shot helpers
# ─────────────────────────────────────────────────────────────────────────────
def compress(data: bytes, preset: int = 6) -> bytes:
"""
Compress bytes to XZ format.
Example:
compressed = compress(large_bytes)
print(f"{len(large_bytes)} → {len(compressed)} bytes")
"""
return lzma.compress(data, format=lzma.FORMAT_XZ, preset=preset)
def decompress(data: bytes) -> bytes:
"""
Decompress XZ or LZMA bytes.
Example:
original = decompress(xz_bytes)
"""
return lzma.decompress(data)
def compress_ratio(data: bytes, preset: int = 6) -> float:
"""
Return the compression ratio (original / compressed).
Example:
ratio = compress_ratio(bytes(range(256)) * 1000)
print(f"{ratio:.2f}x")
"""
compressed = compress(data, preset=preset)
return len(data) / len(compressed) if compressed else 0.0
# ─────────────────────────────────────────────────────────────────────────────
# 2. File operations
# ─────────────────────────────────────────────────────────────────────────────
def compress_file(src: str | Path, dest: str | Path | None = None,
preset: int = 6, remove_src: bool = False) -> Path:
"""
Compress a file to .xz. Output defaults to src + ".xz".
Example:
path = compress_file("large.log") # → "large.log.xz"
"""
src_path = Path(src)
dst_path = Path(dest) if dest else Path(str(src_path) + ".xz")
with src_path.open("rb") as fin, lzma.open(dst_path, "wb", preset=preset) as fout:
while True:
chunk = fin.read(65536)
if not chunk:
break
fout.write(chunk)
if remove_src:
src_path.unlink()
return dst_path
def decompress_file(src: str | Path, dest: str | Path | None = None,
remove_src: bool = False) -> Path:
"""
Decompress an .xz or .lzma file.
Example:
path = decompress_file("archive.tar.xz") # → "archive.tar"
"""
src_path = Path(src)
dst_path = Path(dest) if dest else Path(str(src_path).removesuffix(".xz")
.removesuffix(".lzma"))
with lzma.open(src_path, "rb") as fin, dst_path.open("wb") as fout:
while True:
chunk = fin.read(65536)
if not chunk:
break
fout.write(chunk)
if remove_src:
src_path.unlink()
return dst_path
def read_xz_text(path: str | Path, encoding: str = "utf-8") -> str:
"""
Read a .xz-compressed text file.
Example:
text = read_xz_text("data.txt.xz")
"""
with lzma.open(str(path), "rt", encoding=encoding) as f:
return f.read()
def write_xz_text(path: str | Path, text: str, encoding: str = "utf-8",
preset: int = 6) -> None:
"""
Write a string to a .xz-compressed text file.
Example:
write_xz_text("output.txt.xz", "Hello, compressed world!")
"""
with lzma.open(str(path), "wt", encoding=encoding, preset=preset) as f:
f.write(text)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Streaming compression
# ─────────────────────────────────────────────────────────────────────────────
def compress_stream(
source: Iterator[bytes],
preset: int = 6,
) -> Generator[bytes, None, None]:
"""
Compress a stream of byte chunks lazily; yields compressed chunks.
Example:
with open("large.bin", "rb") as f:
chunks = iter(lambda: f.read(65536), b"")
with open("large.bin.xz", "wb") as out:
for c in compress_stream(chunks):
out.write(c)
"""
comp = lzma.LZMACompressor(format=lzma.FORMAT_XZ, preset=preset)
for chunk in source:
compressed = comp.compress(chunk)
if compressed:
yield compressed
final = comp.flush()
if final:
yield final
def decompress_stream(
source: Iterator[bytes],
) -> Generator[bytes, None, None]:
"""
Decompress a stream of compressed byte chunks lazily.
Example:
with open("data.xz", "rb") as f:
chunks = iter(lambda: f.read(65536), b"")
for chunk in decompress_stream(chunks):
process(chunk)
"""
dec = lzma.LZMADecompressor()
for chunk in source:
result = dec.decompress(chunk)
if result:
yield result
def compress_to_bytes(source: Iterator[bytes], preset: int = 6) -> bytes:
"""
Compress a stream into a single bytes object.
Example:
raw = compress_to_bytes(iter([b"hello ", b"world"]))
"""
return b"".join(compress_stream(source, preset=preset))
# ─────────────────────────────────────────────────────────────────────────────
# 4. Preset benchmarking
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class PresetResult:
preset: int
compressed_size: int
original_size: int
elapsed_ms: float
@property
def ratio(self) -> float:
return self.original_size / self.compressed_size if self.compressed_size else 0.0
def __str__(self) -> str:
return (f"preset={self.preset} "
f"{self.compressed_size:8,d} bytes "
f"ratio={self.ratio:.2f}x "
f"{self.elapsed_ms:.1f}ms")
def benchmark_presets(data: bytes, presets: list[int] | None = None) -> list[PresetResult]:
"""
Benchmark LZMA compression at multiple preset levels.
Example:
results = benchmark_presets(sample_data, presets=[1, 3, 6, 9])
for r in results:
print(r)
"""
targets = presets or [1, 3, 6, 9]
results: list[PresetResult] = []
for p in targets:
t0 = time.monotonic()
compressed = compress(data, preset=p)
elapsed = (time.monotonic() - t0) * 1000
results.append(PresetResult(
preset=p,
compressed_size=len(compressed),
original_size=len(data),
elapsed_ms=elapsed,
))
return results
# ─────────────────────────────────────────────────────────────────────────────
# 5. Filter chain helpers
# ─────────────────────────────────────────────────────────────────────────────
def compress_with_delta(data: bytes, dist: int = 4, preset: int = 6) -> bytes:
"""
Compress bytes using a DELTA pre-filter before LZMA2.
Useful for structured binary data (PCM audio, float arrays).
Example:
compressed = compress_with_delta(pcm_bytes, dist=2) # 16-bit samples
"""
filters = [
{"id": lzma.FILTER_DELTA, "dist": dist},
{"id": lzma.FILTER_LZMA2, "preset": preset},
]
return lzma.compress(data, format=lzma.FORMAT_RAW, filters=filters)
def decompress_with_delta(data: bytes, dist: int = 4) -> bytes:
"""Decompress data compressed with compress_with_delta."""
filters = [
{"id": lzma.FILTER_DELTA, "dist": dist},
{"id": lzma.FILTER_LZMA2},
]
return lzma.decompress(data, format=lzma.FORMAT_RAW, filters=filters)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile
print("=== lzma demo ===")
sample = (b"Python lzma compression demo. " * 200 +
bytes(range(256)) * 20)
# ── one-shot ──────────────────────────────────────────────────────────────
print("\n--- compress / decompress ---")
compressed = compress(sample)
print(f" original: {len(sample):8,d} bytes")
print(f" compressed: {len(compressed):8,d} bytes ratio={compress_ratio(sample):.2f}x")
restored = decompress(compressed)
print(f" roundtrip ok: {restored == sample}")
# ── file ops ──────────────────────────────────────────────────────────────
print("\n--- compress_file / decompress_file ---")
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "data.bin"
src.write_bytes(sample)
xz_path = compress_file(src, preset=1)
print(f" {src.name} ({src.stat().st_size:,d} B) → {xz_path.name} ({xz_path.stat().st_size:,d} B)")
out_path = decompress_file(xz_path, Path(tmpdir) / "data_out.bin")
print(f" decompressed ok: {out_path.read_bytes() == sample}")
# ── streaming ─────────────────────────────────────────────────────────────
print("\n--- compress_stream ---")
chunk_size = 4096
chunks = [sample[i:i+chunk_size] for i in range(0, len(sample), chunk_size)]
stream_compressed = b"".join(compress_stream(iter(chunks), preset=1))
stream_decompressed = decompress(stream_compressed)
print(f" stream compressed {len(sample):,d} → {len(stream_compressed):,d} bytes")
print(f" roundtrip ok: {stream_decompressed == sample}")
# ── presets ───────────────────────────────────────────────────────────────
print("\n--- benchmark_presets ---")
for r in benchmark_presets(sample, presets=[1, 3, 6]):
print(f" {r}")
# ── delta filter ──────────────────────────────────────────────────────────
print("\n--- compress_with_delta ---")
# Simulate 16-bit PCM: alternating pattern should compress well with dist=2
pcm = bytes(range(256)) * 100
plain_size = len(compress(pcm, preset=6))
delta_size = len(compress_with_delta(pcm, dist=2, preset=6))
print(f" pcm without delta: {plain_size:,d} bytes")
print(f" pcm with delta: {delta_size:,d} bytes")
restored_pcm = decompress_with_delta(compress_with_delta(pcm, dist=2), dist=2)
print(f" delta roundtrip ok: {restored_pcm == pcm}")
print("\n=== done ===")
For the zlib alternative — zlib (stdlib via zlib module) implements DEFLATE compression; it achieves lower compression ratios than LZMA but compresses and decompresses much faster; gzip and zipfile also use DEFLATE — use zlib / gzip when speed matters more than size (web assets, log rotation, in-memory caching); use lzma when you need the best possible compression ratio for archival, distribution packages, or bandwidth-constrained storage. For the bz2 alternative — bz2 (stdlib) uses the Burrows-Wheeler algorithm; it compresses better than DEFLATE but worse than LZMA, and decompresses faster than LZMA — use bz2 as a middle ground when you need better-than-gzip ratio but LZMA’s CPU overhead is prohibitive; .tar.bz2 (tbz2) archives are a common use case; use lzma / XZ for .tar.xz which is the default for modern Linux distribution packages. The Claude Skills 360 bundle includes lzma skill sets covering compress()/decompress()/compress_ratio() one-shot helpers, compress_file()/decompress_file()/read_xz_text()/write_xz_text() file operations, compress_stream()/decompress_stream()/compress_to_bytes() lazy streaming, PresetResult dataclass with benchmark_presets(), and compress_with_delta()/decompress_with_delta() filter-chain helpers. Start with the free tier to try XZ compression patterns and lzma pipeline code generation.