PyArrow is the Python interface to Apache Arrow — a columnar in-memory format. pip install pyarrow. import pyarrow as pa, import pyarrow.compute as pc, import pyarrow.parquet as pq. Table from dict: table = pa.table({"col": [1,2,3], "name": ["a","b","c"]}). From pandas: table = pa.Table.from_pandas(df). To pandas: df = table.to_pandas(). Schema: pa.schema([("id", pa.int64()), ("name", pa.string()), ("val", pa.float32())]). Cast: pc.cast(table["col"], pa.float64()). Filter: table.filter(pc.greater(table["val"], 0.5)). Sort: table.sort_by([("col", "descending")]). Select: table.select(["a", "b"]). Compute: pc.mean(table["score"]), pc.sum(col), pc.count(col, mode="only_valid"), pc.utf8_lower(str_col). Parquet write: pq.write_table(table, "data.parquet", compression="snappy", row_group_size=50000). Parquet read: table = pq.read_table("data.parquet", columns=["a","b"]). Predicate pushdown: pq.read_table("data.parquet", filters=[("year","=",2024),("score",">",0.5)]). Parquet dataset: ds = pq.ParquetDataset("s3://bucket/path", filters=...). CSV: table = pa.csv.read_csv("data.csv", read_options=pa.csv.ReadOptions(block_size=2**24)). IPC: sink = pa.BufferOutputStream(), writer = pa.ipc.new_stream(sink, table.schema), writer.write_table(table). Dataset scanner: import pyarrow.dataset as ds, dataset = ds.dataset("path/", format="parquet", partitioning="hive"), scanner = dataset.scanner(columns=["a"], filter=pc.greater(ds.field("score"), 0.5)). Claude Code generates PyArrow ETL pipelines, Parquet converters, predicate pushdown readers, and Arrow IPC streaming services.
CLAUDE.md for PyArrow
## PyArrow Stack
- Version: pyarrow >= 15.0
- Table: pa.table(dict) | pa.Table.from_pandas(df) | table.to_pandas()
- Compute: import pyarrow.compute as pc | pc.filter/cast/sort/mean/sum/utf8_*
- Parquet: pq.read_table(path, columns, filters) | pq.write_table(table, path)
- Dataset: pyarrow.dataset.dataset(path, format="parquet", partitioning="hive")
- CSV: pa.csv.read_csv(path, ReadOptions, ConvertOptions, ParseOptions)
- IPC: pa.ipc.new_stream(sink, schema) | write_table | .close()
- Schema: pa.schema([("col", pa.int64()), ...]) for explicit type control
PyArrow Columnar Data Pipeline
# data/pyarrow_pipeline.py — columnar data processing with PyArrow
from __future__ import annotations
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Optional
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.csv as pa_csv
import pyarrow.ipc as pa_ipc
import pyarrow.dataset as pa_ds
# ── 1. Table construction and schema ─────────────────────────────────────────
def make_table(
data: dict | pd.DataFrame,
schema: pa.Schema = None,
) -> pa.Table:
"""
Create an Arrow Table from a dict or Pandas DataFrame.
Pass schema to enforce types (avoids implicit float64 for int columns).
"""
if isinstance(data, pd.DataFrame):
return pa.Table.from_pandas(data, schema=schema, preserve_index=False)
return pa.table(data, schema=schema)
def infer_schema(df: pd.DataFrame) -> pa.Schema:
"""Infer Arrow schema from a Pandas DataFrame."""
return pa.Schema.from_pandas(df, preserve_index=False)
def cast_columns(
table: pa.Table,
casts: dict, # {"col": pa.target_type}
) -> pa.Table:
"""
Cast multiple columns to target Arrow types.
Example: cast_columns(t, {"price": pa.float32(), "id": pa.int32()})
"""
for col_name, target_type in casts.items():
idx = table.schema.get_field_index(col_name)
col = pc.cast(table.column(col_name), target_type)
table = table.set_column(idx, col_name, col)
return table
# ── 2. Compute operations ─────────────────────────────────────────────────────
def filter_table(
table: pa.Table,
conditions: list[tuple],
) -> pa.Table:
"""
Filter a table with a list of (column, op, value) conditions ANDed together.
op: ">" | "<" | ">=" | "<=" | "==" | "!=" | "in" | "not_in"
Example: filter_table(t, [("score", ">", 0.5), ("region", "==", "west")])
"""
op_map = {
">": pc.greater,
"<": pc.less,
">=": pc.greater_equal,
"<=": pc.less_equal,
"==": pc.equal,
"!=": pc.not_equal,
}
mask = None
for col_name, op, value in conditions:
col = table.column(col_name)
if op == "in":
cond = pc.is_in(col, value_set=pa.array(value))
elif op == "not_in":
cond = pc.invert(pc.is_in(col, value_set=pa.array(value)))
else:
cond = op_map[op](col, value)
mask = cond if mask is None else pc.and_(mask, cond)
return table.filter(mask)
def aggregate_table(
table: pa.Table,
group_cols: list[str],
agg_spec: list[tuple],
) -> pd.DataFrame:
"""
GroupBy aggregation using Arrow compute.
agg_spec: [("col", "mean"), ("col2", "sum"), ("col", "count")]
Returns Pandas DataFrame for convenience.
"""
return (
table.group_by(group_cols)
.aggregate([(col, fn) for col, fn in agg_spec])
.to_pandas()
.sort_values(group_cols)
.reset_index(drop=True)
)
def sort_and_rank(
table: pa.Table,
sort_keys: list[tuple], # [("col", "ascending"), ("col2", "descending")]
) -> pa.Table:
"""Sort table by multiple columns."""
return table.sort_by(sort_keys)
def add_computed_column(
table: pa.Table,
name: str,
expr, # Result of a pc.* call
) -> pa.Table:
"""Append a computed column to the table."""
return table.append_column(name, expr)
def string_operations(
table: pa.Table,
col: str,
) -> pa.Table:
"""Example: lowercase, strip whitespace, extract substring."""
lowered = pc.utf8_lower(table.column(col))
stripped = pc.utf8_strip_whitespace(lowered)
return table.set_column(
table.schema.get_field_index(col), col, stripped
)
# ── 3. Parquet I/O ────────────────────────────────────────────────────────────
def write_parquet(
table: pa.Table,
path: str,
compression: str = "snappy", # "snappy" | "zstd" | "gzip" | None
row_group_size: int = 100_000,
partition_by: list[str] = None,
) -> str:
"""
Write Arrow Table to Parquet.
partition_by: write Hive-partitioned dataset (e.g., ["year", "region"])
"""
Path(path).parent.mkdir(parents=True, exist_ok=True)
if partition_by:
pa_ds.write_dataset(
table, path, format="parquet",
partitioning=pa_ds.partitioning(
pa.schema([(col, table.schema.field(col).type) for col in partition_by]),
flavor="hive",
),
existing_data_behavior="overwrite_or_ignore",
)
else:
pq.write_table(
table, path,
compression=compression,
row_group_size=row_group_size,
)
return path
def read_parquet(
path: str,
columns: list[str] = None,
filters: list[tuple] = None,
batch_size: int = None,
) -> pa.Table | "generator":
"""
Read Parquet with optional column pruning and predicate pushdown.
filters use DNF form: [("col", "op", val), ...] or [[...], [...]] for OR.
batch_size: if set, returns a generator of RecordBatches instead.
"""
kwargs = {}
if columns:
kwargs["columns"] = columns
if filters:
kwargs["filters"] = filters
if batch_size:
pf = pq.ParquetFile(path)
return pf.iter_batches(batch_size=batch_size, **kwargs)
return pq.read_table(path, **kwargs)
def scan_parquet_dataset(
path: str,
filter_expr = None,
columns: list[str] = None,
) -> pa.Table:
"""
Scan a Hive-partitioned Parquet dataset efficiently.
Discovers partitions automatically from directory structure.
Example path: "data/year=2024/" or "s3://bucket/data/"
"""
dataset = pa_ds.dataset(path, format="parquet", partitioning="hive")
scanner = dataset.scanner(
columns=columns,
filter=filter_expr, # pa_ds.field("score") > 0.5
)
return scanner.to_table()
# ── 4. CSV fast I/O ───────────────────────────────────────────────────────────
def read_csv_fast(
path: str,
columns: list[str] = None,
column_types: dict = None,
null_values: list[str] = None,
block_size_mb: int = 32,
) -> pa.Table:
"""
Fast CSV reading with Arrow (significantly faster than pandas.read_csv).
block_size_mb: read in chunks of this size for large files.
"""
read_options = pa_csv.ReadOptions(block_size=block_size_mb * 1024 * 1024)
convert_options = pa_csv.ConvertOptions(
include_columns=columns,
column_types={k: v for k, v in (column_types or {}).items()},
null_values=null_values or ["", "NA", "null", "NaN", "None"],
true_values=["True", "true", "1"],
false_values=["False", "false", "0"],
)
return pa_csv.read_csv(
path,
read_options=read_options,
convert_options=convert_options,
)
# ── 5. IPC streaming ──────────────────────────────────────────────────────────
def table_to_ipc_bytes(table: pa.Table) -> bytes:
"""Serialize an Arrow Table to IPC stream bytes (for network/cache transfer)."""
sink = pa.BufferOutputStream()
writer = pa_ipc.new_stream(sink, table.schema)
writer.write_table(table)
writer.close()
return sink.getvalue().to_pybytes()
def ipc_bytes_to_table(data: bytes) -> pa.Table:
"""Deserialize IPC stream bytes back to an Arrow Table."""
buf = pa.py_buffer(data)
reader = pa_ipc.open_stream(buf)
return reader.read_all()
def stream_table_in_batches(
table: pa.Table,
batch_size: int,
):
"""Yield Arrow RecordBatches from a table for streaming processing."""
for i in range(0, len(table), batch_size):
yield table.slice(i, batch_size)
# ── 6. Schema evolution and merging ──────────────────────────────────────────
def unify_schemas(tables: list[pa.Table]) -> pa.Table:
"""
Concatenate tables with potentially different schemas.
Missing columns are filled with null values.
"""
return pa.concat_tables(tables, promote_options="default")
def rename_columns(table: pa.Table, renames: dict) -> pa.Table:
"""Rename columns via schema replacement."""
new_names = [renames.get(name, name) for name in table.schema.names]
return table.rename_columns(new_names)
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile, os, time
print("PyArrow Columnar Data Demo")
print("=" * 50)
# Build a sample dataset
np.random.seed(42)
n = 1_000_000
df = pd.DataFrame({
"id": np.arange(n, dtype=np.int32),
"region": np.random.choice(["north", "south", "east", "west"], n),
"score": np.random.rand(n).astype(np.float32),
"value": np.random.exponential(100, n).astype(np.float32),
"year": np.random.choice([2022, 2023, 2024], n).astype(np.int16),
})
print(f"\nDataFrame: {n:,} rows")
# Pandas CSV baseline
with tempfile.TemporaryDirectory() as tmpdir:
csv_path = os.path.join(tmpdir, "data.csv")
df.to_csv(csv_path, index=False)
t0 = time.perf_counter()
df_pd = pd.read_csv(csv_path)
pd_ms = (time.perf_counter() - t0) * 1000
t0 = time.perf_counter()
table = read_csv_fast(csv_path)
arr_ms = (time.perf_counter() - t0) * 1000
print(f"\nCSV read — pandas: {pd_ms:.1f} ms, Arrow: {arr_ms:.1f} ms ({pd_ms/arr_ms:.1f}x speedup)")
# Write Parquet
pq_path = os.path.join(tmpdir, "data.parquet")
t0 = time.perf_counter()
write_parquet(table, pq_path)
write_ms = (time.perf_counter() - t0) * 1000
size_mb = os.path.getsize(pq_path) / 1e6
print(f"\nParquet write: {write_ms:.1f} ms, {size_mb:.1f} MB (vs {os.path.getsize(csv_path)/1e6:.1f} MB CSV)")
# Predicate pushdown
t0 = time.perf_counter()
filtered = read_parquet(pq_path, columns=["id", "score"],
filters=[("region", "=", "north"), ("score", ">", 0.9)])
filter_ms = (time.perf_counter() - t0) * 1000
print(f"\nPredicate pushdown: {len(filtered):,} rows, {filter_ms:.1f} ms")
# Compute (group by)
print("\nGroupBy aggregation (Arrow compute):")
agg = aggregate_table(table, ["region"], [("score", "mean"), ("value", "sum")])
print(agg.to_string(index=False))
# IPC round-trip
small = table.slice(0, 10_000)
ipc_bytes = table_to_ipc_bytes(small)
recovered = ipc_bytes_to_table(ipc_bytes)
print(f"\nIPC round-trip: {len(ipc_bytes):,} bytes → {len(recovered):,} rows recovered")
For the Pandas alternative for DataFrames — Pandas uses row-oriented NumPy arrays internally, causing 5-10x slower CSV parsing and lacking Parquet predicate pushdown, while PyArrow’s columnar layout enables SIMD vectorization for aggregations, the filters parameter in pq.read_table skips entire row groups on disk before reading them, and pa.concat_tables with schema promotion merges files with different column sets without loading all data into Pandas simultaneously. For the DuckDB alternative for SQL-based analytics — DuckDB provides SQL syntax while PyArrow’s dataset.Scanner with Arrow expressions handles partition-aware scans over S3/GCS, pa.ipc.new_stream enables zero-copy Arrow Flight gRPC data transport between microservices, and pa.Table.from_pandas / to_pandas provides zero-copy conversion to every Arrow-compatible runtime (Pandas, Polars, Dask, Spark, BigQuery, Snowflake). The Claude Skills 360 bundle includes PyArrow skill sets covering table construction and schema, compute operations, filter with predicate pushdown, Parquet read/write with compression, Hive-partitioned dataset scanning, fast CSV parsing, IPC stream serialization, batch streaming, and schema merging with column promotion. Start with the free tier to try columnar data pipeline code generation.