Python’s zlib module wraps the zlib deflate/inflate library for byte-level compression. import zlib. compress: zlib.compress(data, level=zlib.Z_DEFAULT_COMPRESSION) → bytes; level 0 = no compression, 1 = fastest, 9 = best, -1 = default (~6). decompress: zlib.decompress(data, wbits=15, bufsize=16384) → bytes. compressobj: zlib.compressobj(level, method=DEFLATED, wbits=15) → streaming Compress object; .compress(data) + .flush(zlib.Z_FINISH) → bytes. decompressobj: zlib.decompressobj(wbits=15) → streaming Decompress; .decompress(chunk) + .flush(). wbits: 15 = zlib format (with header+adler32); -15 = raw deflate (no header); 31 (MAX_WBITS+16) = gzip format. crc32: zlib.crc32(data, value=0) → signed 32-bit int; chain: crc32(chunk2, crc32(chunk1)). adler32: zlib.adler32(data, value=1) → same chaining. Z_SYNC_FLUSH: flush compressor to byte boundary for real-time streaming. Z_FULL_FLUSH: reset LZ77 history for random access. Z_FINISH: finalize stream. Z_DEFAULT_COMPRESSION = -1. Z_BEST_SPEED = 1. Z_BEST_COMPRESSION = 9. Error: zlib.error. DEF_BUF_SIZE = 16384. Claude Code generates HTTP body compressors, frame-level stream compressors, chunk checksummers, and binary protocol encoders.
CLAUDE.md for zlib
## zlib Stack
- Stdlib: import zlib
- One-shot compress: zlib.compress(data, level=6)
- One-shot decompress: zlib.decompress(compressed)
- Streaming compress: c = zlib.compressobj(6); out = c.compress(d) + c.flush()
- CRC-32: crc = zlib.crc32(data) & 0xFFFFFFFF (unsigned)
- Raw deflate: wbits=-15 (no header; for HTTP Transfer-Encoding: deflate)
- gzip format: wbits=31 (add gzip header/trailer in zlib)
zlib Compression Pipeline
# app/zlibutil.py — one-shot, streaming, chunked, checksum, framed protocol
from __future__ import annotations
import io
import struct
import zlib
from dataclasses import dataclass
from typing import Iterable
# ─────────────────────────────────────────────────────────────────────────────
# 1. One-shot helpers
# ─────────────────────────────────────────────────────────────────────────────
def compress_bytes(data: bytes, level: int = 6) -> bytes:
"""
Compress data to zlib format (with header and Adler-32 checksum).
Example:
compressed = compress_bytes(json_bytes, level=6)
ratio = len(compressed) / len(json_bytes)
"""
return zlib.compress(data, level)
def decompress_bytes(data: bytes) -> bytes:
"""
Decompress zlib-format data.
Example:
original = decompress_bytes(compressed)
"""
return zlib.decompress(data)
def compress_raw(data: bytes, level: int = 6) -> bytes:
"""
Compress to raw deflate (no zlib header/trailer).
Used for HTTP Transfer-Encoding: deflate and custom framing.
Example:
payload = compress_raw(body, level=1) # fast, for HTTP
"""
c = zlib.compressobj(level, zlib.DEFLATED, -zlib.MAX_WBITS)
return c.compress(data) + c.flush(zlib.Z_FINISH)
def decompress_raw(data: bytes) -> bytes:
"""Decompress raw deflate bytes (no zlib header)."""
d = zlib.decompressobj(-zlib.MAX_WBITS)
return d.decompress(data) + d.flush()
def compress_gzip(data: bytes, level: int = 6) -> bytes:
"""
Compress to gzip format using zlib (adds gzip header/trailer).
Equivalent to gzip.compress() but without the Python overhead.
Example:
gz_bytes = compress_gzip(html_bytes)
send_as_gzip_response(gz_bytes)
"""
c = zlib.compressobj(level, zlib.DEFLATED, zlib.MAX_WBITS | 16)
return c.compress(data) + c.flush(zlib.Z_FINISH)
def decompress_gzip(data: bytes) -> bytes:
"""Decompress gzip-format bytes."""
return zlib.decompress(data, zlib.MAX_WBITS | 16)
def ratio(original: bytes, compressed: bytes) -> float:
"""Compression ratio (compressed/original, lower = better)."""
if not original:
return 1.0
return len(compressed) / len(original)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Streaming compression
# ─────────────────────────────────────────────────────────────────────────────
def compress_stream(
chunks: Iterable[bytes],
level: int = 6,
raw: bool = False,
) -> Iterable[bytes]:
"""
Compress a stream of byte chunks, yielding compressed chunks.
Use raw=True for raw deflate (no zlib header).
Example:
for chunk in compress_stream(read_file_chunks("big.log")):
socket.send(chunk)
"""
wbits = -zlib.MAX_WBITS if raw else zlib.MAX_WBITS
c = zlib.compressobj(level, zlib.DEFLATED, wbits)
for chunk in chunks:
if chunk:
out = c.compress(chunk)
if out:
yield out
final = c.flush(zlib.Z_FINISH)
if final:
yield final
def decompress_stream(
chunks: Iterable[bytes],
raw: bool = False,
bufsize: int = 65536,
) -> Iterable[bytes]:
"""
Decompress a stream of compressed byte chunks, yielding decompressed chunks.
Example:
for plaintext in decompress_stream(recv_chunks(socket)):
process(plaintext)
"""
wbits = -zlib.MAX_WBITS if raw else zlib.MAX_WBITS
d = zlib.decompressobj(wbits, bufsize)
for chunk in chunks:
if chunk:
out = d.decompress(chunk)
if out:
yield out
final = d.flush()
if final:
yield final
def compress_to_buffer(data: bytes, level: int = 6) -> io.BytesIO:
"""
Compress data into an in-memory BytesIO buffer (rewound to start).
Example:
buf = compress_to_buffer(large_json)
upload_to_s3(buf.read())
"""
buf = io.BytesIO()
buf.write(compress_bytes(data, level=level))
buf.seek(0)
return buf
# ─────────────────────────────────────────────────────────────────────────────
# 3. Checksum helpers
# ─────────────────────────────────────────────────────────────────────────────
def crc32(data: bytes, seed: int = 0) -> int:
"""
Return unsigned CRC-32 checksum of data.
Chain calls for streaming: crc32(chunk2, crc32(chunk1)).
Example:
checksum = crc32(payload)
verify: crc32(received) == expected_crc
"""
return zlib.crc32(data, seed) & 0xFFFFFFFF
def crc32_stream(chunks: Iterable[bytes]) -> int:
"""Compute CRC-32 over a stream of chunks."""
value = 0
for chunk in chunks:
value = zlib.crc32(chunk, value)
return value & 0xFFFFFFFF
def adler32(data: bytes, seed: int = 1) -> int:
"""
Return Adler-32 checksum. Faster than CRC-32 but weaker.
Used inside zlib format headers.
Example:
chk = adler32(data)
"""
return zlib.adler32(data, seed) & 0xFFFFFFFF
def adler32_stream(chunks: Iterable[bytes]) -> int:
"""Compute Adler-32 over a stream of chunks."""
value = 1
for chunk in chunks:
value = zlib.adler32(chunk, value)
return value & 0xFFFFFFFF
# ─────────────────────────────────────────────────────────────────────────────
# 4. Framed compressed message protocol
# ─────────────────────────────────────────────────────────────────────────────
_FRAME_MAGIC = b"ZF"
_FRAME_VERSION = 1
# Format: magic(2) + version(1) + level(1) + orig_len(4) + comp_len(4) + crc32(4) + payload
_HEADER = struct.Struct(">2sBBIII") # 16 bytes
@dataclass
class CompressedFrame:
"""
A self-describing compressed frame with magic bytes, checksum, and sizes.
Suitable for binary protocols, file formats, and IPC payloads.
Example:
frame = CompressedFrame.encode(json_bytes, level=6)
data = frame.to_bytes()
# ... send/store data ...
original = CompressedFrame.decode(data)
"""
original_len: int
compressed: bytes
crc: int
level: int = 6
@classmethod
def encode(cls, data: bytes, level: int = 6) -> "CompressedFrame":
compressed = compress_bytes(data, level)
return cls(
original_len=len(data),
compressed=compressed,
crc=crc32(data),
level=level,
)
def to_bytes(self) -> bytes:
header = _HEADER.pack(
_FRAME_MAGIC,
_FRAME_VERSION,
self.level,
self.original_len,
len(self.compressed),
self.crc,
)
return header + self.compressed
@classmethod
def decode(cls, buf: bytes) -> bytes:
if len(buf) < _HEADER.size:
raise zlib.error("Frame too short")
magic, version, level, orig_len, comp_len, stored_crc = _HEADER.unpack(buf[:_HEADER.size])
if magic != _FRAME_MAGIC:
raise zlib.error(f"Bad magic: {magic!r}")
payload = buf[_HEADER.size:_HEADER.size + comp_len]
data = decompress_bytes(payload)
if len(data) != orig_len:
raise zlib.error(f"Length mismatch: expected {orig_len} got {len(data)}")
actual_crc = crc32(data)
if actual_crc != stored_crc:
raise zlib.error(f"CRC mismatch: expected {stored_crc:#010x} got {actual_crc:#010x}")
return data
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import json
print("=== zlib demo ===")
data = json.dumps({"users": [{"id": i, "name": f"User{i}", "bio": "A" * 200} for i in range(50)]}).encode()
print(f"\nOriginal size: {len(data):,} bytes")
print("\n--- one-shot compress/decompress ---")
for level in [1, 6, 9]:
c = compress_bytes(data, level=level)
rt = decompress_bytes(c)
r = ratio(data, c)
print(f" level={level}: {len(c):,} bytes ratio={r:.3f} roundtrip={rt==data}")
print("\n--- raw deflate ---")
raw_c = compress_raw(data, level=6)
raw_d = decompress_raw(raw_c)
print(f" raw deflate: {len(raw_c):,} bytes roundtrip={raw_d==data}")
print("\n--- gzip format ---")
gz = compress_gzip(data, level=6)
gz_d = decompress_gzip(gz)
print(f" gzip: {len(gz):,} bytes roundtrip={gz_d==data}")
print("\n--- streaming ---")
chunk_size = 1024
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
compressed_chunks = list(compress_stream(iter(chunks), level=6))
total_compressed = sum(len(c) for c in compressed_chunks)
decompressed = b"".join(decompress_stream(iter(compressed_chunks)))
print(f" {len(chunks)} chunks compressed to {len(compressed_chunks)} chunks")
print(f" total compressed: {total_compressed:,} bytes")
print(f" decompressed matches: {decompressed == data}")
print("\n--- checksums ---")
crc = crc32(data)
adr = adler32(data)
stream_crc = crc32_stream(iter(chunks))
print(f" crc32: {crc:#010x}")
print(f" adler32: {adr:#010x}")
print(f" stream_crc32: {stream_crc:#010x} matches: {stream_crc == crc}")
print("\n--- CompressedFrame ---")
frame = CompressedFrame.encode(data, level=6)
frame_bytes = frame.to_bytes()
decoded = CompressedFrame.decode(frame_bytes)
overhead = len(frame_bytes) - len(frame.compressed)
print(f" frame: {len(frame_bytes):,} bytes (header={overhead}B + payload={len(frame.compressed):,}B)")
print(f" decoded matches: {decoded == data}")
# Tamper test
try:
bad = bytearray(frame_bytes)
bad[100] ^= 0xFF
CompressedFrame.decode(bytes(bad))
except zlib.error as e:
print(f" tamper detected: {e}")
print("\n=== done ===")
For the gzip alternative — gzip.compress/gzip.decompress/gzip.open produce standard gzip files with proper headers, OS field, and mtime; zlib at wbits=31 also emits gzip format but skips the Python-level filename and mtime fields that gzip module sets — use gzip when producing files that will be decompressed by standard tools (gunzip, tar -z) or stored on disk; use zlib directly when you need raw deflate for HTTP Transfer-Encoding: deflate, custom frame protocols, or when gzip file metadata fields are irrelevant. For the lzma/bz2 alternative — bz2 (block-oriented Burrows-Wheeler) achieves better compression ratios than zlib at the cost of 5–10× slower speed; lzma (LZMA2/XZ) achieves the best compression ratios of the three but is 20–50× slower and uses significantly more memory — use lzma for distributing archives where download size dominates (Python packages, deployment bundles); use bz2 for archival storage with moderate speed requirements; use zlib/gzip for network transport, in-process caching, and any workload where throughput matters more than maximum ratio. The Claude Skills 360 bundle includes zlib skill sets covering compress_bytes()/decompress_bytes()/compress_raw()/decompress_raw()/compress_gzip()/decompress_gzip() one-shot helpers, compress_stream()/decompress_stream() chunked streaming, crc32()/crc32_stream()/adler32()/adler32_stream() checksum helpers, and CompressedFrame self-describing framed protocol with CRC verification. Start with the free tier to try data compression patterns and zlib pipeline code generation.