Claude Code for Delta Lake: ACID Transactions on Data Lakes — Claude Skills 360 Blog
Blog / AI / Claude Code for Delta Lake: ACID Transactions on Data Lakes
AI

Claude Code for Delta Lake: ACID Transactions on Data Lakes

Published: August 15, 2027
Read time: 5 min read
By: Claude Skills 360

Delta Lake adds ACID transactions and time travel to Parquet files on object storage — delta-rs is the Rust-backed Python library with no JVM. from deltalake import DeltaTable, write_deltalake. write_deltalake("s3://bucket/table", df, mode="append") writes Parquet with Delta transaction log. mode="overwrite" replaces all data; mode="overwrite" with partition_filters replaces a partition. DeltaTable("s3://bucket/table") opens an existing table. .to_pandas() reads all data; .to_pyarrow() returns Arrow format. Time travel: DeltaTable("s3://bucket/table", version=5).to_pandas() reads version 5; .load_as_version(datetime(2025, 7, 1)) reads as of a timestamp. .history(10) lists recent operations with timestamp and operation type. MERGE (upsert): dt.merge(source_df, "target.order_id = source.order_id").when_matched_update_all().when_not_matched_insert_all().execute(). dt.optimize().compact() rewrites small files into larger ones. dt.optimize().z_order(["user_id", "created_at"]) co-locates data for query patterns. dt.vacuum(retention_hours=168) removes old Parquet files no longer referenced. Schema enforcement: Delta Lake rejects writes that don’t match the schema by default. Schema evolution: write_deltalake(..., schema_mode="merge") adds new columns. Change Data Feed: dt.load_with_datetime(start, end) + CDF reads _change_type column. Storage config: storage_options = {"AWS_ACCESS_KEY_ID": ..., "AWS_SECRET_ACCESS_KEY": ...} passed to all read/write calls. Claude Code generates delta-rs table operations, merge patterns, optimization scripts, and DuckDB integration.

CLAUDE.md for Delta Lake

## Delta Lake Stack
- Library: deltalake >= 0.17 (delta-rs, Python/Rust — no JVM)
- Write: write_deltalake(path, data, mode="append"|"overwrite"|"error", storage_options={...})
- Read: DeltaTable(path, storage_options={...}).to_pandas() or .to_pyarrow()
- Time travel: DeltaTable(path, version=N) or load_as_version(datetime(...))
- Merge/upsert: dt.merge(source, predicate).when_matched_update_all().when_not_matched_insert_all().execute()
- Optimize: dt.optimize().compact() + dt.optimize().z_order(["col1", "col2"])
- Vacuum: dt.vacuum(retention_hours=168) — default 7 days retention

Delta Table Client

# lib/delta/table.py — delta-rs table operations
import os
from datetime import datetime
from typing import Optional, Union
import pyarrow as pa
import pandas as pd
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError


def _storage_options() -> dict:
    """Build S3/GCS/ADLS storage options from environment."""
    provider = os.environ.get("DELTA_STORAGE_PROVIDER", "s3")

    if provider == "s3":
        opts: dict = {}
        if os.environ.get("AWS_ACCESS_KEY_ID"):
            opts["AWS_ACCESS_KEY_ID"]     = os.environ["AWS_ACCESS_KEY_ID"]
            opts["AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"]
            opts["AWS_REGION"]            = os.environ.get("AWS_REGION", "us-east-1")
        if os.environ.get("AWS_ENDPOINT_URL"):
            opts["AWS_ENDPOINT_URL"]     = os.environ["AWS_ENDPOINT_URL"]
            opts["AWS_ALLOW_HTTP"]       = "true"
        return opts

    if provider == "gcs":
        return {"GOOGLE_SERVICE_ACCOUNT": os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")}

    return {}  # Default / credential chain


STORAGE_OPTIONS = _storage_options()
WAREHOUSE_PATH  = os.environ.get("DELTA_WAREHOUSE", "s3://my-datalake/delta")


class DeltaTableOps:
    """Operations wrapper for a Delta table."""

    def __init__(self, table_path: str):
        self.path = table_path if table_path.startswith(("s3://", "gs://", "abfs://", "/")) \
                    else f"{WAREHOUSE_PATH}/{table_path}"

    def exists(self) -> bool:
        try:
            DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
            return True
        except TableNotFoundError:
            return False

    def read(
        self,
        version:     Optional[int]      = None,
        as_of:       Optional[datetime] = None,
        columns:     Optional[list]     = None,
        row_filter:  Optional[str]      = None,  # PyArrow expression string
    ) -> pd.DataFrame:
        """Read from the table, optionally at a specific version/time."""
        if version is not None:
            dt = DeltaTable(self.path, version=version, storage_options=STORAGE_OPTIONS)
        elif as_of is not None:
            dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
            dt.load_as_version(as_of)
        else:
            dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)

        dataset = dt.to_pyarrow_dataset()

        if row_filter or columns:
            import pyarrow.compute as pc
            scanner = dataset.scanner(
                columns=columns,
                filter=pc.Expression._call("and", [])  # placeholder — use pyarrow expressions
                if not row_filter else None,
            )
            return scanner.to_table().to_pandas()

        return dt.to_pandas(columns=columns)

    def append(self, data: Union[pd.DataFrame, pa.Table]) -> None:
        """Append data with schema enforcement."""
        write_deltalake(
            self.path,
            data,
            mode="append",
            storage_options=STORAGE_OPTIONS,
            schema_mode="merge",  # Add new columns if present
        )

    def overwrite(
        self,
        data:              Union[pd.DataFrame, pa.Table],
        partition_filters: Optional[list] = None,
    ) -> None:
        """Overwrite the entire table or specific partitions."""
        write_deltalake(
            self.path,
            data,
            mode="overwrite",
            partition_filters=partition_filters,  # e.g. [("date", "=", "2025-07-15")]
            storage_options=STORAGE_OPTIONS,
        )

    def upsert(
        self,
        source:     Union[pd.DataFrame, pa.Table],
        merge_key:  str,
    ) -> None:
        """Merge source into target — upsert by merge_key."""
        dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)

        (
            dt.merge(
                source=source if isinstance(source, pa.Table) else pa.Table.from_pandas(source),
                predicate=f"target.{merge_key} = source.{merge_key}",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute()
        )

    def history(self, limit: int = 20) -> list[dict]:
        """Get operation history for auditing."""
        dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
        return dt.history(limit)

    def optimize(self, z_order_columns: Optional[list] = None) -> dict:
        """Compact small files and optionally Z-order."""
        dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)

        if z_order_columns:
            result = dt.optimize().z_order(z_order_columns)
        else:
            result = dt.optimize().compact()

        return result

    def vacuum(self, retention_hours: int = 168) -> list[str]:
        """Remove old Parquet files beyond retention period."""
        dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
        return dt.vacuum(retention_hours=retention_hours, enforce_retention_duration=True, dry_run=False)

Maintenance Script

# scripts/delta_maintenance.py — daily optimize + vacuum for all tables
import os
import sys
from lib.delta.table import DeltaTableOps

TABLES = {
    "orders": {
        "path":          "analytics/orders",
        "z_order_cols":  ["user_id", "created_at"],
        "retention_hours": 168,
    },
    "events": {
        "path":          "analytics/events",
        "z_order_cols":  ["session_id", "event_type"],
        "retention_hours": 336,  # Events kept 14 days
    },
    "users": {
        "path":          "analytics/users",
        "z_order_cols":  None,  # Full table scan acceptable
        "retention_hours": 720,  # 30 days
    },
}

errors = []

for name, cfg in TABLES.items():
    print(f"\n── {name} ──────────────────────────")
    ops = DeltaTableOps(cfg["path"])

    try:
        print(f"  Optimizing (z_order={cfg['z_order_cols']})...")
        result = ops.optimize(z_order_columns=cfg["z_order_cols"])
        print(f"  Compacted {result.get('numFilesRemoved', 0)}{result.get('numFilesAdded', 0)} files")

        print(f"  Vacuuming (retention={cfg['retention_hours']}h)...")
        removed = ops.vacuum(retention_hours=cfg["retention_hours"])
        print(f"  Removed {len(removed)} old files")

    except Exception as e:
        print(f"  ERROR: {e}")
        errors.append(f"{name}: {e}")

if errors:
    print(f"\nMaintenance completed with {len(errors)} error(s):")
    for err in errors: print(f"  - {err}")
    sys.exit(1)

print("\nMaintenance completed successfully.")

DuckDB Integration

# lib/delta/duckdb_query.py — query Delta tables with DuckDB
import duckdb
import os


def query_delta(table_path: str, sql: str) -> list[dict]:
    """Query a Delta table at any path using DuckDB delta extension."""
    conn = duckdb.connect()
    conn.execute("INSTALL delta; LOAD delta;")
    conn.execute("INSTALL httpfs; LOAD httpfs;")

    # S3 credentials
    if os.environ.get("AWS_ACCESS_KEY_ID"):
        conn.execute(f"""
            CREATE SECRET (
                TYPE s3,
                KEY_ID '{os.environ["AWS_ACCESS_KEY_ID"]}',
                SECRET '{os.environ["AWS_SECRET_ACCESS_KEY"]}',
                REGION '{os.environ.get("AWS_REGION", "us-east-1")}'
            )
        """)

    full_sql = sql.replace("{delta_table}", f"delta_scan('{table_path}')")
    result   = conn.execute(full_sql).fetchdf()
    conn.close()
    return result.to_dict("records")


# Example:
# query_delta(
#     "s3://my-lake/delta/analytics/orders",
#     """
#     SELECT
#         date_trunc('week', created_at) AS week,
#         sum(amount)                    AS revenue
#     FROM {delta_table}
#     WHERE status = 'completed'
#       AND created_at >= current_date - INTERVAL 90 DAY
#     GROUP BY 1 ORDER BY 1 DESC
#     """
# )

For the Apache Iceberg alternative when needing broader multi-engine compatibility (Trino, Flink, Hive, DuckDB), more expressive hidden partitioning that auto-evolves without rewriting data, and a catalog that decouples metadata from compute — Iceberg is preferred for multi-cloud and multi-engine lakehouses while Delta Lake has deeper Databricks/Azure integration, native Delta Live Tables streaming, and a stronger Change Data Feed implementation. For the Apache Hudi alternative when specifically needing record-level incremental pulls with Merge-On-Read for near-realtime analytics (seconds latency) on frequently updated tables — Hudi optimizes for high-frequency small updates while Delta Lake has a cleaner API for batch upserts and fits teams already using Databricks or Azure Synapse. The Claude Skills 360 bundle includes Delta Lake skill sets covering delta-rs operations, upserts, optimize/vacuum, and DuckDB integration. Start with the free tier to try lakehouse table generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free