Delta Lake adds ACID transactions and time travel to Parquet files on object storage — delta-rs is the Rust-backed Python library with no JVM. from deltalake import DeltaTable, write_deltalake. write_deltalake("s3://bucket/table", df, mode="append") writes Parquet with Delta transaction log. mode="overwrite" replaces all data; mode="overwrite" with partition_filters replaces a partition. DeltaTable("s3://bucket/table") opens an existing table. .to_pandas() reads all data; .to_pyarrow() returns Arrow format. Time travel: DeltaTable("s3://bucket/table", version=5).to_pandas() reads version 5; .load_as_version(datetime(2025, 7, 1)) reads as of a timestamp. .history(10) lists recent operations with timestamp and operation type. MERGE (upsert): dt.merge(source_df, "target.order_id = source.order_id").when_matched_update_all().when_not_matched_insert_all().execute(). dt.optimize().compact() rewrites small files into larger ones. dt.optimize().z_order(["user_id", "created_at"]) co-locates data for query patterns. dt.vacuum(retention_hours=168) removes old Parquet files no longer referenced. Schema enforcement: Delta Lake rejects writes that don’t match the schema by default. Schema evolution: write_deltalake(..., schema_mode="merge") adds new columns. Change Data Feed: dt.load_with_datetime(start, end) + CDF reads _change_type column. Storage config: storage_options = {"AWS_ACCESS_KEY_ID": ..., "AWS_SECRET_ACCESS_KEY": ...} passed to all read/write calls. Claude Code generates delta-rs table operations, merge patterns, optimization scripts, and DuckDB integration.
CLAUDE.md for Delta Lake
## Delta Lake Stack
- Library: deltalake >= 0.17 (delta-rs, Python/Rust — no JVM)
- Write: write_deltalake(path, data, mode="append"|"overwrite"|"error", storage_options={...})
- Read: DeltaTable(path, storage_options={...}).to_pandas() or .to_pyarrow()
- Time travel: DeltaTable(path, version=N) or load_as_version(datetime(...))
- Merge/upsert: dt.merge(source, predicate).when_matched_update_all().when_not_matched_insert_all().execute()
- Optimize: dt.optimize().compact() + dt.optimize().z_order(["col1", "col2"])
- Vacuum: dt.vacuum(retention_hours=168) — default 7 days retention
Delta Table Client
# lib/delta/table.py — delta-rs table operations
import os
from datetime import datetime
from typing import Optional, Union
import pyarrow as pa
import pandas as pd
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError
def _storage_options() -> dict:
"""Build S3/GCS/ADLS storage options from environment."""
provider = os.environ.get("DELTA_STORAGE_PROVIDER", "s3")
if provider == "s3":
opts: dict = {}
if os.environ.get("AWS_ACCESS_KEY_ID"):
opts["AWS_ACCESS_KEY_ID"] = os.environ["AWS_ACCESS_KEY_ID"]
opts["AWS_SECRET_ACCESS_KEY"] = os.environ["AWS_SECRET_ACCESS_KEY"]
opts["AWS_REGION"] = os.environ.get("AWS_REGION", "us-east-1")
if os.environ.get("AWS_ENDPOINT_URL"):
opts["AWS_ENDPOINT_URL"] = os.environ["AWS_ENDPOINT_URL"]
opts["AWS_ALLOW_HTTP"] = "true"
return opts
if provider == "gcs":
return {"GOOGLE_SERVICE_ACCOUNT": os.environ.get("GOOGLE_SERVICE_ACCOUNT_JSON", "")}
return {} # Default / credential chain
STORAGE_OPTIONS = _storage_options()
WAREHOUSE_PATH = os.environ.get("DELTA_WAREHOUSE", "s3://my-datalake/delta")
class DeltaTableOps:
"""Operations wrapper for a Delta table."""
def __init__(self, table_path: str):
self.path = table_path if table_path.startswith(("s3://", "gs://", "abfs://", "/")) \
else f"{WAREHOUSE_PATH}/{table_path}"
def exists(self) -> bool:
try:
DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
return True
except TableNotFoundError:
return False
def read(
self,
version: Optional[int] = None,
as_of: Optional[datetime] = None,
columns: Optional[list] = None,
row_filter: Optional[str] = None, # PyArrow expression string
) -> pd.DataFrame:
"""Read from the table, optionally at a specific version/time."""
if version is not None:
dt = DeltaTable(self.path, version=version, storage_options=STORAGE_OPTIONS)
elif as_of is not None:
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
dt.load_as_version(as_of)
else:
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
dataset = dt.to_pyarrow_dataset()
if row_filter or columns:
import pyarrow.compute as pc
scanner = dataset.scanner(
columns=columns,
filter=pc.Expression._call("and", []) # placeholder — use pyarrow expressions
if not row_filter else None,
)
return scanner.to_table().to_pandas()
return dt.to_pandas(columns=columns)
def append(self, data: Union[pd.DataFrame, pa.Table]) -> None:
"""Append data with schema enforcement."""
write_deltalake(
self.path,
data,
mode="append",
storage_options=STORAGE_OPTIONS,
schema_mode="merge", # Add new columns if present
)
def overwrite(
self,
data: Union[pd.DataFrame, pa.Table],
partition_filters: Optional[list] = None,
) -> None:
"""Overwrite the entire table or specific partitions."""
write_deltalake(
self.path,
data,
mode="overwrite",
partition_filters=partition_filters, # e.g. [("date", "=", "2025-07-15")]
storage_options=STORAGE_OPTIONS,
)
def upsert(
self,
source: Union[pd.DataFrame, pa.Table],
merge_key: str,
) -> None:
"""Merge source into target — upsert by merge_key."""
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
(
dt.merge(
source=source if isinstance(source, pa.Table) else pa.Table.from_pandas(source),
predicate=f"target.{merge_key} = source.{merge_key}",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
def history(self, limit: int = 20) -> list[dict]:
"""Get operation history for auditing."""
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
return dt.history(limit)
def optimize(self, z_order_columns: Optional[list] = None) -> dict:
"""Compact small files and optionally Z-order."""
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
if z_order_columns:
result = dt.optimize().z_order(z_order_columns)
else:
result = dt.optimize().compact()
return result
def vacuum(self, retention_hours: int = 168) -> list[str]:
"""Remove old Parquet files beyond retention period."""
dt = DeltaTable(self.path, storage_options=STORAGE_OPTIONS)
return dt.vacuum(retention_hours=retention_hours, enforce_retention_duration=True, dry_run=False)
Maintenance Script
# scripts/delta_maintenance.py — daily optimize + vacuum for all tables
import os
import sys
from lib.delta.table import DeltaTableOps
TABLES = {
"orders": {
"path": "analytics/orders",
"z_order_cols": ["user_id", "created_at"],
"retention_hours": 168,
},
"events": {
"path": "analytics/events",
"z_order_cols": ["session_id", "event_type"],
"retention_hours": 336, # Events kept 14 days
},
"users": {
"path": "analytics/users",
"z_order_cols": None, # Full table scan acceptable
"retention_hours": 720, # 30 days
},
}
errors = []
for name, cfg in TABLES.items():
print(f"\n── {name} ──────────────────────────")
ops = DeltaTableOps(cfg["path"])
try:
print(f" Optimizing (z_order={cfg['z_order_cols']})...")
result = ops.optimize(z_order_columns=cfg["z_order_cols"])
print(f" Compacted {result.get('numFilesRemoved', 0)} → {result.get('numFilesAdded', 0)} files")
print(f" Vacuuming (retention={cfg['retention_hours']}h)...")
removed = ops.vacuum(retention_hours=cfg["retention_hours"])
print(f" Removed {len(removed)} old files")
except Exception as e:
print(f" ERROR: {e}")
errors.append(f"{name}: {e}")
if errors:
print(f"\nMaintenance completed with {len(errors)} error(s):")
for err in errors: print(f" - {err}")
sys.exit(1)
print("\nMaintenance completed successfully.")
DuckDB Integration
# lib/delta/duckdb_query.py — query Delta tables with DuckDB
import duckdb
import os
def query_delta(table_path: str, sql: str) -> list[dict]:
"""Query a Delta table at any path using DuckDB delta extension."""
conn = duckdb.connect()
conn.execute("INSTALL delta; LOAD delta;")
conn.execute("INSTALL httpfs; LOAD httpfs;")
# S3 credentials
if os.environ.get("AWS_ACCESS_KEY_ID"):
conn.execute(f"""
CREATE SECRET (
TYPE s3,
KEY_ID '{os.environ["AWS_ACCESS_KEY_ID"]}',
SECRET '{os.environ["AWS_SECRET_ACCESS_KEY"]}',
REGION '{os.environ.get("AWS_REGION", "us-east-1")}'
)
""")
full_sql = sql.replace("{delta_table}", f"delta_scan('{table_path}')")
result = conn.execute(full_sql).fetchdf()
conn.close()
return result.to_dict("records")
# Example:
# query_delta(
# "s3://my-lake/delta/analytics/orders",
# """
# SELECT
# date_trunc('week', created_at) AS week,
# sum(amount) AS revenue
# FROM {delta_table}
# WHERE status = 'completed'
# AND created_at >= current_date - INTERVAL 90 DAY
# GROUP BY 1 ORDER BY 1 DESC
# """
# )
For the Apache Iceberg alternative when needing broader multi-engine compatibility (Trino, Flink, Hive, DuckDB), more expressive hidden partitioning that auto-evolves without rewriting data, and a catalog that decouples metadata from compute — Iceberg is preferred for multi-cloud and multi-engine lakehouses while Delta Lake has deeper Databricks/Azure integration, native Delta Live Tables streaming, and a stronger Change Data Feed implementation. For the Apache Hudi alternative when specifically needing record-level incremental pulls with Merge-On-Read for near-realtime analytics (seconds latency) on frequently updated tables — Hudi optimizes for high-frequency small updates while Delta Lake has a cleaner API for batch upserts and fits teams already using Databricks or Azure Synapse. The Claude Skills 360 bundle includes Delta Lake skill sets covering delta-rs operations, upserts, optimize/vacuum, and DuckDB integration. Start with the free tier to try lakehouse table generation.