Apache Iceberg is the open table format for massive analytic datasets — ACID transactions, schema evolution, and time travel on object storage. PyIceberg: from pyiceberg.catalog import load_catalog. REST catalog: catalog = load_catalog("rest", **{"uri": REST_CATALOG_URL, "token": TOKEN}). AWS Glue catalog: load_catalog("glue", **{"type": "glue", "warehouse": "s3://bucket/warehouse"}). catalog.create_table("db.orders", schema=Schema(...)) creates a table. table = catalog.load_table("db.orders") loads an existing table. Read: table.scan().to_pandas() or .to_arrow(). Filtered scan: table.scan(row_filter="amount > 100 AND status = 'completed'", selected_fields=["order_id", "amount"]). Write: table.append(pa.Table.from_pandas(df)) — atomic commit. Overwrite: table.overwrite(new_df, "date = '2025-07-15'") — replaces matching partition. Time travel: table.scan(snapshot_id=12345).to_pandas() or table.scan(as_of_timestamp=1718000000000).to_pandas(). Schema evolution: with table.update_schema() as update: update.add_column("new_col", IntegerType()) — backward compatible. Partition evolution: with table.update_spec() as update: update.add_field("months(created_at)") — no rewrite needed. table.history() lists snapshots. table.expire_snapshots(older_than=datetime(2025, 1, 1)) cleans old snapshots. DuckDB: SELECT * FROM iceberg_scan('s3://bucket/warehouse/db/orders') WHERE amount > 100 using LOAD iceberg. Claude Code generates PyIceberg table creation, catalog configuration, time travel queries, and schema evolution scripts.
CLAUDE.md for Apache Iceberg
## Apache Iceberg Stack
- Client: pyiceberg >= 0.7 (Python), or DuckDB iceberg extension, or Spark with iceberg jars
- Catalog: REST (Tabular/Polaris/Lakeformation), Glue, Hive, or JDBC
- Storage: S3 (s3://), GCS (gs://), ADLS (abfs://) — any object store
- Read: table.scan(row_filter="col > val", selected_fields=[...]).to_arrow()
- Write: table.append(pa.table) or table.overwrite(pa.table, overwrite_filter)
- Time travel: scan(snapshot_id=X) or scan(as_of_timestamp=epoch_ms)
- Schema evolution: with table.update_schema() as u: u.add_column / rename_column / delete_column
Catalog Setup and Table Management
# lib/iceberg/catalog.py — PyIceberg catalog configuration
import os
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
NestedField, StringType, LongType, FloatType,
BooleanType, TimestampType, DateType, ListType, StructType,
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform, BucketTransform, IdentityTransform
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.expressions import Reference
def get_catalog() -> Catalog:
"""Load catalog from environment configuration."""
catalog_type = os.environ.get("ICEBERG_CATALOG_TYPE", "rest")
if catalog_type == "glue":
return load_catalog("glue", **{
"type": "glue",
"warehouse": os.environ["ICEBERG_WAREHOUSE"], # s3://my-datalake/warehouse
})
if catalog_type == "rest":
return load_catalog("rest", **{
"uri": os.environ["ICEBERG_REST_URI"],
"token": os.environ.get("ICEBERG_REST_TOKEN", ""),
"warehouse": os.environ.get("ICEBERG_WAREHOUSE", ""),
"s3.endpoint": os.environ.get("S3_ENDPOINT", ""), # for MinIO/R2
"s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID", ""),
"s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY", ""),
})
# Filesystem (local dev / testing)
return load_catalog("local", **{
"type": "sql",
"uri": f"sqlite:///{os.environ.get('ICEBERG_LOCAL_DB', '/tmp/iceberg.db')}",
"warehouse": os.environ.get("ICEBERG_WAREHOUSE", "/tmp/iceberg-warehouse"),
})
# ── Table schemas ──────────────────────────────────────────────────────────
ORDERS_SCHEMA = Schema(
NestedField(1, "order_id", StringType(), required=True),
NestedField(2, "user_id", StringType(), required=True),
NestedField(3, "amount", FloatType(), required=True),
NestedField(4, "currency", StringType(), required=True),
NestedField(5, "status", StringType(), required=True),
NestedField(6, "created_at", TimestampType(), required=True),
NestedField(7, "updated_at", TimestampType(), required=False),
NestedField(8, "metadata", StringType(), required=False), # JSON blob
)
ORDERS_PARTITION = PartitionSpec(
PartitionField(source_id=6, field_id=1000, transform=DayTransform(), name="created_date"),
PartitionField(source_id=2, field_id=1001, transform=BucketTransform(16), name="user_bucket"),
)
ORDERS_SORT_ORDER = SortOrder(
SortField(source_id=6), # sort by created_at
SortField(source_id=2), # then by user_id
)
def ensure_table(namespace: str = "analytics", table_name: str = "orders"):
"""Create table if it doesn't exist, or return existing."""
catalog = get_catalog()
try:
catalog.create_namespace(namespace)
except Exception:
pass # Namespace already exists
full_name = f"{namespace}.{table_name}"
try:
return catalog.load_table(full_name)
except Exception:
return catalog.create_table(
full_name,
schema=ORDERS_SCHEMA,
partition_spec=ORDERS_PARTITION,
sort_order=ORDERS_SORT_ORDER,
properties={
"write.format.default": "parquet",
"write.parquet.compression-codec": "zstd",
"write.metadata.compression-codec": "gzip",
"history.expire.max-snapshot-age-ms": str(7 * 24 * 60 * 60 * 1000), # 7 days
},
)
Read and Write Operations
# lib/iceberg/operations.py — read/write/time-travel helpers
from datetime import datetime
from typing import Optional
import pyarrow as pa
import pandas as pd
from pyiceberg.expressions import (
EqualTo, GreaterThan, GreaterThanOrEqual, And, IsNull, NotNull,
)
from .catalog import get_catalog
TABLE = "analytics.orders"
def read_orders(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
status: Optional[str] = None,
limit: Optional[int] = None,
) -> pd.DataFrame:
"""Read orders with optional filters."""
catalog = get_catalog()
table = catalog.load_table(TABLE)
filters = []
if start_date:
filters.append(GreaterThanOrEqual("created_at", int(start_date.timestamp() * 1_000_000)))
if end_date:
filters.append(GreaterThan("created_at", int(end_date.timestamp() * 1_000_000)))
if status:
filters.append(EqualTo("status", status))
row_filter = None
for f in filters:
row_filter = f if row_filter is None else And(row_filter, f)
scan = table.scan(row_filter=row_filter)
df = scan.to_pandas()
return df.head(limit) if limit else df
def append_orders(df: pd.DataFrame) -> None:
"""Append new orders to the Iceberg table."""
catalog = get_catalog()
table = catalog.load_table(TABLE)
arrow_table = pa.Table.from_pandas(df, schema=table.schema().as_arrow())
table.append(arrow_table)
print(f"Appended {len(df)} rows. New snapshot: {table.current_snapshot().snapshot_id}")
def time_travel_query(
as_of_timestamp: Optional[datetime] = None,
snapshot_id: Optional[int] = None,
) -> pd.DataFrame:
"""Query table at a specific point in time."""
catalog = get_catalog()
table = catalog.load_table(TABLE)
if snapshot_id:
return table.scan(snapshot_id=snapshot_id).to_pandas()
if as_of_timestamp:
epoch_ms = int(as_of_timestamp.timestamp() * 1000)
return table.scan(as_of_timestamp=epoch_ms).to_pandas()
return table.scan().to_pandas()
def list_snapshots() -> list[dict]:
"""List all snapshots for auditing and debugging."""
catalog = get_catalog()
table = catalog.load_table(TABLE)
snapshots = []
for snap in reversed(table.history()):
snapshots.append({
"snapshot_id": snap.snapshot_id,
"timestamp_ms": snap.timestamp_ms,
"timestamp": datetime.fromtimestamp(snap.timestamp_ms / 1000).isoformat(),
"operation": snap.summary.get("operation", "unknown") if snap.summary else "unknown",
})
return snapshots
def evolve_schema() -> None:
"""Add columns without rewriting data — Iceberg schema evolution."""
catalog = get_catalog()
table = catalog.load_table(TABLE)
with table.update_schema() as update:
# Add new columns (backward compatible — existing files don't need rewriting)
update.add_column("refund_amount", pa.float32(), doc="Partial or full refund amount")
update.add_column("payment_method", pa.string(), doc="e.g. stripe, paypal, crypto")
# Rename a column
# update.rename_column("metadata", "extra")
# Make optional column required (only if no nulls exist)
# update.make_column_required("user_id")
print("Schema evolved. No data rewrite required.")
DuckDB Integration
# lib/iceberg/duckdb_query.py — query Iceberg tables with DuckDB
import duckdb
import os
def get_duckdb_conn() -> duckdb.DuckDBPyConnection:
"""DuckDB connection with Iceberg and httpfs extensions."""
conn = duckdb.connect()
conn.execute("INSTALL iceberg; LOAD iceberg;")
conn.execute("INSTALL httpfs; LOAD httpfs;")
# S3 credentials
conn.execute(f"""
SET s3_region = '{os.environ.get("AWS_REGION", "us-east-1")}';
SET s3_access_key_id = '{os.environ.get("AWS_ACCESS_KEY_ID", "")}';
SET s3_secret_access_key = '{os.environ.get("AWS_SECRET_ACCESS_KEY", "")}';
""")
return conn
def query_iceberg_sql(
table_location: str, # e.g. s3://bucket/warehouse/analytics/orders
sql_template: str, # Use {table} as placeholder
params: dict = {},
) -> list[dict]:
"""Run SQL against an Iceberg table via DuckDB."""
conn = get_duckdb_conn()
query = sql_template.format(
table=f"iceberg_scan('{table_location}')",
**params,
)
result = conn.execute(query).fetchdf()
conn.close()
return result.to_dict("records")
# Example usage:
# query_iceberg_sql(
# "s3://my-lake/warehouse/analytics/orders",
# """
# SELECT
# date_trunc('day', created_at) AS day,
# sum(amount) AS revenue,
# count(*) AS order_count
# FROM {table}
# WHERE created_at >= now() - INTERVAL 30 DAY
# GROUP BY 1 ORDER BY 1 DESC
# """,
# )
For the Delta Lake alternative when working primarily in the Databricks ecosystem or needing native Delta Live Tables for streaming ingestion, OPTIMIZE/ZORDER auto-tuning, and tight Spark integration with Change Data Feed for CDC pipelines — Delta Lake is the incumbent open table format in Databricks while Apache Iceberg has broader multi-engine support (Spark, Trino, Flink, DuckDB, Hive) and a more expressive partition evolution model. For the Apache Hudi alternative when needing record-level upserts and deletes with low-latency streaming ingestion, Merge-On-Read for near-realtime tables, and deep Kafka/Delta Streamer integration for CDC use cases — Hudi optimizes for frequent small updates while Iceberg optimizes for batch analytics with cleaner metadata management and better query performance on large tables. The Claude Skills 360 bundle includes Apache Iceberg skill sets covering PyIceberg catalog setup, time travel, schema evolution, and DuckDB integration. Start with the free tier to try lakehouse generation.