Claude Code for bz2: Python BZip2 Compression — Claude Skills 360 Blog
Blog / AI / Claude Code for bz2: Python BZip2 Compression
AI

Claude Code for bz2: Python BZip2 Compression

Published: September 20, 2028
Read time: 5 min read
By: Claude Skills 360

Python’s bz2 module provides bzip2 compression — better ratios than zlib/gzip, faster than lzma, with a simpler API. import bz2. compress: bz2.compress(data, compresslevel=9) → bz2 bytes; level 1–9 (default 9). decompress: bz2.decompress(data) — fully decompress bz2 bytes in one call. bz2.open: with bz2.open("file.bz2", "rb") as f: → file-like object; "wt" for text write; compresslevel and encoding params. BZ2File: f = bz2.BZ2File("out.bz2", "w", compresslevel=6) — file wrapping. Incremental: c = bz2.BZ2Compressor(compresslevel=6); c.compress(chunk) + ... + c.flush(). BZ2Decompressor: d = bz2.BZ2Decompressor(); d.decompress(chunk) — stateful; d.eof → True when stream ends; d.needs_input → True if more input needed; d.unused_data → bytes after end of stream. Multi-stream: bz2 files may contain concatenated bzip2 streams; BZ2File handles them transparently; BZ2Decompressor only handles one stream — for multi-stream files, reset after each d.eof. Context manager: with bz2.BZ2File("f.bz2", "w") as f: — auto-closes. bz2.open("file.bz2", "rt", encoding="utf-8") — compressed text. Claude Code generates bz2 log rotators, tbz2 archive writers, streaming pipeline components, and multi-stream decoders.

CLAUDE.md for bz2

## bz2 Stack
- Stdlib: import bz2
- One-shot: data = bz2.compress(raw, 6); raw = bz2.decompress(data)
- File:    with bz2.open("file.bz2", "rb") as f: data = f.read()
- Write:   with bz2.open("out.bz2", "wb", compresslevel=6) as f: f.write(data)
- Stream:  c = bz2.BZ2Compressor(6); out = b"".join(c.compress(c) for c in src) + c.flush()

bz2 Compression Pipeline

# app/bz2util.py — compress/decompress, file ops, streaming, multi-stream, bench
from __future__ import annotations

import bz2
import io
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Generator, Iterator


# ─────────────────────────────────────────────────────────────────────────────
# 1. One-shot helpers
# ─────────────────────────────────────────────────────────────────────────────

def compress(data: bytes, level: int = 9) -> bytes:
    """
    Compress bytes with bzip2 at the given level (1=fast, 9=best ratio).

    Example:
        compressed = compress(large_bytes, level=6)
    """
    return bz2.compress(data, compresslevel=level)


def decompress(data: bytes) -> bytes:
    """
    Decompress bzip2 bytes.

    Example:
        original = decompress(bz2_bytes)
    """
    return bz2.decompress(data)


def compress_ratio(data: bytes, level: int = 9) -> float:
    """
    Return original / compressed size ratio.

    Example:
        print(f"{compress_ratio(data):.2f}x")
    """
    c = compress(data, level)
    return len(data) / len(c) if c else 0.0


# ─────────────────────────────────────────────────────────────────────────────
# 2. File operations
# ─────────────────────────────────────────────────────────────────────────────

def compress_file(
    src: str | Path,
    dest: str | Path | None = None,
    level: int = 9,
    remove_src: bool = False,
    chunk_size: int = 65536,
) -> Path:
    """
    Compress a file to .bz2.

    Example:
        path = compress_file("server.log")   # → "server.log.bz2"
    """
    src_path = Path(src)
    dst_path = Path(dest) if dest else Path(str(src_path) + ".bz2")
    with src_path.open("rb") as fin, bz2.open(dst_path, "wb", compresslevel=level) as fout:
        while True:
            chunk = fin.read(chunk_size)
            if not chunk:
                break
            fout.write(chunk)
    if remove_src:
        src_path.unlink()
    return dst_path


def decompress_file(
    src: str | Path,
    dest: str | Path | None = None,
    remove_src: bool = False,
    chunk_size: int = 65536,
) -> Path:
    """
    Decompress a .bz2 file.

    Example:
        path = decompress_file("archive.tar.bz2")   # → "archive.tar"
    """
    src_path = Path(src)
    dst_str = str(src_path)
    if dst_str.endswith(".bz2"):
        dst_str = dst_str[:-4]
    dst_path = Path(dest) if dest else Path(dst_str)
    with bz2.open(src_path, "rb") as fin, dst_path.open("wb") as fout:
        while True:
            chunk = fin.read(chunk_size)
            if not chunk:
                break
            fout.write(chunk)
    if remove_src:
        src_path.unlink()
    return dst_path


def read_bz2_text(path: str | Path, encoding: str = "utf-8") -> str:
    """Read a .bz2-compressed text file."""
    with bz2.open(str(path), "rt", encoding=encoding) as f:
        return f.read()


def write_bz2_text(path: str | Path, text: str,
                   encoding: str = "utf-8", level: int = 9) -> None:
    """Write a string to a .bz2-compressed text file."""
    with bz2.open(str(path), "wt", encoding=encoding, compresslevel=level) as f:
        f.write(text)


def read_bz2_lines(path: str | Path, encoding: str = "utf-8") -> list[str]:
    """
    Yield lines from a .bz2-compressed text file.

    Example:
        lines = read_bz2_lines("access.log.bz2")
    """
    with bz2.open(str(path), "rt", encoding=encoding) as f:
        return f.readlines()


# ─────────────────────────────────────────────────────────────────────────────
# 3. Incremental streaming
# ─────────────────────────────────────────────────────────────────────────────

def compress_stream(
    source: Iterator[bytes],
    level: int = 9,
) -> Generator[bytes, None, None]:
    """
    Compress a stream of byte chunks lazily.

    Example:
        with open("big.log", "rb") as fin, open("big.log.bz2", "wb") as fout:
            chunks = iter(lambda: fin.read(65536), b"")
            for c in compress_stream(chunks):
                fout.write(c)
    """
    comp = bz2.BZ2Compressor(compresslevel=level)
    for chunk in source:
        out = comp.compress(chunk)
        if out:
            yield out
    final = comp.flush()
    if final:
        yield final


def decompress_stream(source: Iterator[bytes]) -> Generator[bytes, None, None]:
    """
    Decompress a single bzip2 stream lazily.

    Example:
        with open("data.bz2", "rb") as f:
            chunks = iter(lambda: f.read(65536), b"")
            for chunk in decompress_stream(chunks):
                process(chunk)
    """
    dec = bz2.BZ2Decompressor()
    for chunk in source:
        if dec.eof:
            break
        out = dec.decompress(chunk)
        if out:
            yield out


def decompress_multi_stream(data: bytes) -> bytes:
    """
    Decompress bzip2 data that may contain multiple concatenated streams.

    Example:
        full = decompress_multi_stream(concatenated_bz2_data)
    """
    result = bytearray()
    pos = 0
    while pos < len(data):
        dec = bz2.BZ2Decompressor()
        result.extend(dec.decompress(data[pos:]))
        if dec.eof:
            pos += len(data[pos:]) - len(dec.unused_data)
        else:
            break
    return bytes(result)


# ─────────────────────────────────────────────────────────────────────────────
# 4. Benchmarking
# ─────────────────────────────────────────────────────────────────────────────

@dataclass
class BZ2BenchResult:
    level:           int
    original_size:   int
    compressed_size: int
    compress_ms:     float
    decompress_ms:   float

    @property
    def ratio(self) -> float:
        return self.original_size / self.compressed_size if self.compressed_size else 0.0

    def __str__(self) -> str:
        return (f"level={self.level}  "
                f"{self.compressed_size:8,d} B  "
                f"ratio={self.ratio:.2f}x  "
                f"cmp={self.compress_ms:.1f}ms  "
                f"dcmp={self.decompress_ms:.1f}ms")


def benchmark(data: bytes, levels: list[int] | None = None) -> list[BZ2BenchResult]:
    """
    Benchmark bz2 at multiple compresslevel values.

    Example:
        for r in benchmark(sample_data, levels=[1, 5, 9]):
            print(r)
    """
    targets = levels or [1, 4, 7, 9]
    results: list[BZ2BenchResult] = []
    for lvl in targets:
        t0 = time.monotonic()
        compressed = compress(data, lvl)
        cmp_ms = (time.monotonic() - t0) * 1000

        t0 = time.monotonic()
        decompress(compressed)
        dcmp_ms = (time.monotonic() - t0) * 1000

        results.append(BZ2BenchResult(
            level=lvl,
            original_size=len(data),
            compressed_size=len(compressed),
            compress_ms=cmp_ms,
            decompress_ms=dcmp_ms,
        ))
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 5. In-memory compressed buffer
# ─────────────────────────────────────────────────────────────────────────────

class BZ2Buffer:
    """
    In-memory bz2 reader/writer wrapping a BytesIO stream.
    Use as a file-like interface to a compressed in-memory buffer.

    Example:
        buf = BZ2Buffer()
        buf.write(b"line 1\\n")
        buf.write(b"line 2\\n")
        data = buf.getvalue()   # compressed bytes
        original = decompress(data)
    """

    def __init__(self, level: int = 9) -> None:
        self._raw = io.BytesIO()
        self._comp = bz2.BZ2Compressor(compresslevel=level)
        self._closed = False

    def write(self, data: bytes) -> int:
        chunk = self._comp.compress(data)
        if chunk:
            self._raw.write(chunk)
        return len(data)

    def flush(self) -> bytes:
        """Flush remaining compressed data and return all compressed bytes."""
        if not self._closed:
            final = self._comp.flush()
            if final:
                self._raw.write(final)
            self._closed = True
        return self._raw.getvalue()

    def getvalue(self) -> bytes:
        return self.flush()


# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import tempfile

    print("=== bz2 demo ===")

    sample = (b"Python bz2 compression demo. " * 300 +
              bytes(range(256)) * 30)

    # ── one-shot ──────────────────────────────────────────────────────────────
    print("\n--- compress / decompress ---")
    compressed = compress(sample)
    print(f"  original:   {len(sample):8,d} bytes")
    print(f"  compressed: {len(compressed):8,d} bytes  ratio={compress_ratio(sample):.2f}x")
    print(f"  roundtrip:  {decompress(compressed) == sample}")

    # ── file ops ──────────────────────────────────────────────────────────────
    print("\n--- compress_file / decompress_file ---")
    with tempfile.TemporaryDirectory() as tmpdir:
        src = Path(tmpdir) / "data.bin"
        src.write_bytes(sample)
        bz2_path = compress_file(src, level=6)
        print(f"  {src.name} ({src.stat().st_size:,d}) → {bz2_path.name} ({bz2_path.stat().st_size:,d})")
        out = decompress_file(bz2_path, Path(tmpdir) / "data_out.bin")
        print(f"  decompressed ok: {out.read_bytes() == sample}")

    # ── streaming ─────────────────────────────────────────────────────────────
    print("\n--- compress_stream / decompress_stream ---")
    chunks = [sample[i:i+8192] for i in range(0, len(sample), 8192)]
    stream_out = b"".join(compress_stream(iter(chunks), level=1))
    stream_in  = b"".join(decompress_stream(iter([stream_out])))
    print(f"  streamed: {len(sample):,d}{len(stream_out):,d} bytes")
    print(f"  roundtrip ok: {stream_in == sample}")

    # ── multi-stream ──────────────────────────────────────────────────────────
    print("\n--- decompress_multi_stream ---")
    part1 = compress(b"stream one; ")
    part2 = compress(b"stream two!")
    multi = decompress_multi_stream(part1 + part2)
    print(f"  multi-stream result: {multi!r}")

    # ── benchmark ─────────────────────────────────────────────────────────────
    print("\n--- benchmark ---")
    for r in benchmark(sample, levels=[1, 5, 9]):
        print(f"  {r}")

    # ── BZ2Buffer ─────────────────────────────────────────────────────────────
    print("\n--- BZ2Buffer ---")
    buf = BZ2Buffer(level=6)
    for i in range(10):
        buf.write(f"line {i}: " .encode() + b"data " * 20 + b"\n")
    compressed_buf = buf.getvalue()
    print(f"  buffer compressed to {len(compressed_buf):,d} bytes")
    print(f"  roundtrip ok: {decompress(compressed_buf).startswith(b'line 0:')}")

    print("\n=== done ===")

For the gzip alternative — gzip implements DEFLATE and offers the same gzip.compress() / gzip.open() / gzip.GzipFile API pattern as bz2; it compresses faster but produces larger files; .gz files are universally supported — use gzip when decompression speed or broad tool compatibility matters most; use bz2 when you need better-than-gzip ratios and .bz2 / .tar.bz2 is the expected format. For the lzma alternative — lzma (stdlib) provides LZMA/XZ compression with the best ratio of the three but the slowest compression speed; .xz is the format used by modern Linux distribution packages — use lzma for archival and distribution packaging where the archive will be downloaded many times (each byte saved multiplies); use bz2 when decompression latency is more important than compression ratio, such as for frequently-accessed log archives. The Claude Skills 360 bundle includes bz2 skill sets covering compress()/decompress()/compress_ratio() one-shot helpers, compress_file()/decompress_file()/read_bz2_text()/write_bz2_text()/read_bz2_lines() file operations, compress_stream()/decompress_stream()/decompress_multi_stream() lazy streaming, BZ2BenchResult dataclass with benchmark(), and BZ2Buffer in-memory compressed writer. Start with the free tier to try BZip2 compression patterns and bz2 pipeline code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free