Apache Arrow is the universal in-memory columnar format — zero-copy interop between Python, Rust, Java, JavaScript. PyArrow: import pyarrow as pa. pa.table({"id": [1,2,3], "name": ["a","b","c"]}) creates a Table. pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())]) defines schema. table.to_pandas() converts to DataFrame; pa.Table.from_pandas(df) converts back. pa.RecordBatch.from_pydict({"col": values}, schema=schema) for streaming — pa.RecordBatchReader.from_batches(schema, batches). Parquet: pq.write_table(table, "out.parquet", compression="snappy"), pq.read_table("data.parquet", columns=["id","amount"]) with column pushdown. pq.ParquetDataset("dir/", filesystem=s3fs) reads a directory of Parquet files. Compute: pc.sum(table["amount"]), pc.filter(table, pc.greater(table["amount"], 100)), pc.sort_indices(table, [(b"amount", "descending")]), pc.cast(array, pa.float64()). Arrow IPC: pa.ipc.new_stream(sink, schema) writes a stream; pa.ipc.open_stream(source).read_all() reads. Arrow Flight pyarrow.flight: FlightClient(addr), client.do_get(ticket) streams Arrow batches over gRPC. pa.dataset.dataset(path, format="parquet", filesystem=fs) for partitioned datasets with predicate pushdown. Node.js apache-arrow: tableFromArrays({ id: Int32Array.from([1,2]), name: ["a","b"] }), tableFromIPC(buffer), Table.from(batches). Claude Code generates PyArrow data pipelines, Parquet I/O, Arrow IPC serialization, and Flight service implementations.
CLAUDE.md for Apache Arrow
## Apache Arrow Stack
- Python: pyarrow >= 15 — pa.Table, pa.RecordBatch, pyarrow.parquet, pyarrow.compute
- Node.js: apache-arrow >= 14 — tableFromArrays, tableFromIPC, Table.from()
- Columnar ops: import pyarrow.compute as pc — pc.sum/mean/filter/sort_indices/cast
- Parquet: pq.write_table(table, path, compression="zstd") / pq.read_table(path, columns=[...])
- IPC stream: pa.ipc.new_stream(sink, schema) → write_batch(batch) → close()
- Dataset API: pa.dataset.dataset(path, format="parquet") for partitioned data with pushdown
PyArrow Table Operations
# lib/arrow/tables.py — PyArrow table building and processing
from __future__ import annotations
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from typing import Any
# ── Schema definitions ─────────────────────────────────────────────────────
ORDERS_SCHEMA = pa.schema([
pa.field("order_id", pa.string(), nullable=False),
pa.field("user_id", pa.string(), nullable=False),
pa.field("amount", pa.float64(), nullable=False),
pa.field("currency", pa.dictionary(pa.int8(), pa.string()), nullable=False),
pa.field("status", pa.dictionary(pa.int8(), pa.string()), nullable=False),
pa.field("created_at", pa.timestamp("us", tz="UTC"), nullable=False),
pa.field("tags", pa.list_(pa.string()), nullable=True),
])
# ── Builders ───────────────────────────────────────────────────────────────
def dicts_to_table(records: list[dict[str, Any]], schema: pa.Schema) -> pa.Table:
"""Convert a list of dicts to a typed Arrow table."""
arrays = {}
for field in schema:
values = [r.get(field.name) for r in records]
try:
arrays[field.name] = pa.array(values, type=field.type)
except Exception:
arrays[field.name] = pa.array(values) # Let Arrow infer on fallback
return pa.table(arrays, schema=schema)
def batch_from_records(
records: list[dict[str, Any]],
schema: pa.Schema,
batch_size: int = 10_000,
):
"""Yield Arrow RecordBatches from a list of dicts (for streaming)."""
for i in range(0, len(records), batch_size):
chunk = records[i : i + batch_size]
yield pa.RecordBatch.from_pydict(
{f.name: [r.get(f.name) for r in chunk] for f in schema},
schema=schema,
)
# ── Compute ────────────────────────────────────────────────────────────────
def filter_table(table: pa.Table, **conditions) -> pa.Table:
"""Filter table with keyword equality conditions."""
mask = None
for col, val in conditions.items():
col_mask = pc.equal(table[col], val)
mask = col_mask if mask is None else pc.and_(mask, col_mask)
return table.filter(mask) if mask is not None else table
def aggregate_by(
table: pa.Table,
group_col: str,
value_col: str,
agg: str = "sum", # "sum" | "mean" | "count" | "min" | "max"
) -> pa.Table:
"""Group-by aggregation using PyArrow compute."""
from pyarrow import acero
aggregation = {
"sum": ("sum", value_col),
"mean": ("mean", value_col),
"count": ("count", value_col),
"min": ("min", value_col),
"max": ("max", value_col),
}[agg]
plan = acero.Declaration(
"aggregate",
acero.AggregateNodeOptions(
aggregations=[aggregation],
keys=[group_col],
),
inputs=[acero.Declaration("table_source", acero.TableSourceNodeOptions(table))],
)
return plan.to_table()
def sort_table(
table: pa.Table,
by: str,
descend: bool = False,
) -> pa.Table:
order = "descending" if descend else "ascending"
indices = pc.sort_indices(table, sort_keys=[(by, order)])
return table.take(indices)
Parquet I/O
# lib/arrow/parquet_io.py — efficient Parquet read/write with partitioning
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import s3fs
def get_filesystem(path: str):
"""Return the appropriate filesystem for the given path."""
if path.startswith("s3://"):
return s3fs.S3FileSystem(
key=os.environ.get("AWS_ACCESS_KEY_ID"),
secret=os.environ.get("AWS_SECRET_ACCESS_KEY"),
endpoint_url=os.environ.get("AWS_ENDPOINT_URL"),
)
return None # Local filesystem
def write_parquet(
table: pa.Table,
path: str,
partition_by: list[str] | None = None,
compression: str = "zstd",
row_group_size: int = 256 * 1024, # 256 KB
) -> None:
"""Write Arrow table to Parquet, optionally partitioned."""
fs = get_filesystem(path)
uri = path.replace("s3://", "") if path.startswith("s3://") else path
if partition_by:
pq.write_to_dataset(
table,
root_path=uri,
partition_cols=partition_by,
existing_data_behavior="overwrite_or_ignore",
compression=compression,
filesystem=fs,
)
else:
pq.write_table(
table,
uri,
compression=compression,
row_group_size=row_group_size,
filesystem=fs,
)
def read_parquet(
path: str,
columns: list[str] | None = None,
filters: list | None = None, # [[("col", "op", val), ...], ...]
) -> pa.Table:
"""Read Parquet files with optional column and predicate pushdown."""
fs = get_filesystem(path)
uri = path.replace("s3://", "") if path.startswith("s3://") else path
dataset = ds.dataset(uri, format="parquet", filesystem=fs)
return dataset.to_table(
columns=columns,
filter=ds.Expression._call("and", []) # pass pyarrow expressions directly
if not filters else pq.filters_to_expression(filters),
)
Arrow IPC Streaming
# lib/arrow/ipc.py — Arrow IPC stream serialization
import io
import pyarrow as pa
import pyarrow.ipc as ipc
def serialize_batches(
batches: list[pa.RecordBatch],
schema: pa.Schema,
) -> bytes:
"""Serialize RecordBatches to Arrow IPC stream format."""
sink = pa.BufferOutputStream()
writer = ipc.new_stream(sink, schema)
for batch in batches:
writer.write_batch(batch)
writer.close()
return sink.getvalue().to_pybytes()
def deserialize_batches(data: bytes) -> list[pa.RecordBatch]:
"""Deserialize Arrow IPC stream to RecordBatches."""
reader = ipc.open_stream(pa.py_buffer(data))
batches = []
try:
while True:
batches.append(reader.read_next_batch())
except StopIteration:
pass
return batches
def table_to_ipc(table: pa.Table) -> bytes:
return serialize_batches(table.to_batches(), table.schema)
def table_from_ipc(data: bytes) -> pa.Table:
return pa.Table.from_batches(deserialize_batches(data))
TypeScript Arrow (apache-arrow)
// lib/arrow/client.ts — Apache Arrow in TypeScript/Node.js
import {
tableFromArrays,
tableFromIPC,
RecordBatchReader,
Table,
Int32,
Float64,
Utf8,
makeVector,
Uint8,
} from "apache-arrow"
/** Build an Arrow table from typed arrays */
export function buildOrdersTable(orders: Array<{
orderId: string
userId: string
amount: number
status: string
}>): Table {
return tableFromArrays({
order_id: orders.map((o) => o.orderId),
user_id: orders.map((o) => o.userId),
amount: new Float64Array(orders.map((o) => o.amount)),
status: orders.map((o) => o.status),
})
}
/** Serialize Arrow table to IPC bytes for transport */
export function tableToBytes(table: Table): Uint8Array {
const batches: Uint8Array[] = []
let size = 0
for (const batch of table.batches) {
const writer = RecordBatchReader.from([batch])
// Collect IPC bytes using the streaming format
}
// Use ipc.serialize from apache-arrow
const { tableToIPC } = require("apache-arrow")
return tableToIPC(table)
}
/** Deserialize Arrow IPC bytes to table */
export function tableFromBytes(bytes: Uint8Array): Table {
return tableFromIPC(bytes)
}
/** Aggregate Arrow table in JS (no round-trip to Python) */
export function groupBySum(
table: Table,
groupCol: string,
sumCol: string,
): Record<string, number> {
const result: Record<string, number> = {}
for (let i = 0; i < table.numRows; i++) {
const key = String(table.getChild(groupCol)?.get(i))
const value = Number(table.getChild(sumCol)?.get(i)) || 0
result[key] = (result[key] ?? 0) + value
}
return result
}
For the Polars alternative when wanting a high-performance DataFrame library in Python that uses Arrow memory internally but provides a friendlier API than PyArrow for data transformations — Polars is built on Arrow and can zero-copy exchange data with PyArrow via .to_arrow() / pl.from_arrow(); use PyArrow for I/O and serialization, Polars for transformations. For the Pandas alternative when needing the most mature Python DataFrame ecosystem with maximum library compatibility (scikit-learn, statsmodels, matplotlib) — Pandas stores data in NumPy format and has overhead converting to/from Arrow, while PyArrow is preferred for high-throughput Parquet I/O, streaming pipelines, and cross-language data exchange where zero-copy matters. The Claude Skills 360 bundle includes Apache Arrow skill sets covering PyArrow tables, Parquet I/O, IPC streaming, and TypeScript Arrow. Start with the free tier to try columnar data generation.