Claude Code for Apache Arrow: In-Memory Columnar Data Processing — Claude Skills 360 Blog
Blog / AI / Claude Code for Apache Arrow: In-Memory Columnar Data Processing
AI

Claude Code for Apache Arrow: In-Memory Columnar Data Processing

Published: August 16, 2027
Read time: 5 min read
By: Claude Skills 360

Apache Arrow is the universal in-memory columnar format — zero-copy interop between Python, Rust, Java, JavaScript. PyArrow: import pyarrow as pa. pa.table({"id": [1,2,3], "name": ["a","b","c"]}) creates a Table. pa.schema([pa.field("id", pa.int64()), pa.field("name", pa.string())]) defines schema. table.to_pandas() converts to DataFrame; pa.Table.from_pandas(df) converts back. pa.RecordBatch.from_pydict({"col": values}, schema=schema) for streaming — pa.RecordBatchReader.from_batches(schema, batches). Parquet: pq.write_table(table, "out.parquet", compression="snappy"), pq.read_table("data.parquet", columns=["id","amount"]) with column pushdown. pq.ParquetDataset("dir/", filesystem=s3fs) reads a directory of Parquet files. Compute: pc.sum(table["amount"]), pc.filter(table, pc.greater(table["amount"], 100)), pc.sort_indices(table, [(b"amount", "descending")]), pc.cast(array, pa.float64()). Arrow IPC: pa.ipc.new_stream(sink, schema) writes a stream; pa.ipc.open_stream(source).read_all() reads. Arrow Flight pyarrow.flight: FlightClient(addr), client.do_get(ticket) streams Arrow batches over gRPC. pa.dataset.dataset(path, format="parquet", filesystem=fs) for partitioned datasets with predicate pushdown. Node.js apache-arrow: tableFromArrays({ id: Int32Array.from([1,2]), name: ["a","b"] }), tableFromIPC(buffer), Table.from(batches). Claude Code generates PyArrow data pipelines, Parquet I/O, Arrow IPC serialization, and Flight service implementations.

CLAUDE.md for Apache Arrow

## Apache Arrow Stack
- Python: pyarrow >= 15 — pa.Table, pa.RecordBatch, pyarrow.parquet, pyarrow.compute
- Node.js: apache-arrow >= 14 — tableFromArrays, tableFromIPC, Table.from()
- Columnar ops: import pyarrow.compute as pc — pc.sum/mean/filter/sort_indices/cast
- Parquet: pq.write_table(table, path, compression="zstd") / pq.read_table(path, columns=[...])
- IPC stream: pa.ipc.new_stream(sink, schema) → write_batch(batch) → close()
- Dataset API: pa.dataset.dataset(path, format="parquet") for partitioned data with pushdown

PyArrow Table Operations

# lib/arrow/tables.py — PyArrow table building and processing
from __future__ import annotations
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from typing import Any


# ── Schema definitions ─────────────────────────────────────────────────────

ORDERS_SCHEMA = pa.schema([
    pa.field("order_id",   pa.string(),      nullable=False),
    pa.field("user_id",    pa.string(),      nullable=False),
    pa.field("amount",     pa.float64(),     nullable=False),
    pa.field("currency",   pa.dictionary(pa.int8(), pa.string()), nullable=False),
    pa.field("status",     pa.dictionary(pa.int8(), pa.string()), nullable=False),
    pa.field("created_at", pa.timestamp("us", tz="UTC"), nullable=False),
    pa.field("tags",       pa.list_(pa.string()), nullable=True),
])

# ── Builders ───────────────────────────────────────────────────────────────

def dicts_to_table(records: list[dict[str, Any]], schema: pa.Schema) -> pa.Table:
    """Convert a list of dicts to a typed Arrow table."""
    arrays = {}
    for field in schema:
        values = [r.get(field.name) for r in records]
        try:
            arrays[field.name] = pa.array(values, type=field.type)
        except Exception:
            arrays[field.name] = pa.array(values)  # Let Arrow infer on fallback

    return pa.table(arrays, schema=schema)


def batch_from_records(
    records: list[dict[str, Any]],
    schema:  pa.Schema,
    batch_size: int = 10_000,
):
    """Yield Arrow RecordBatches from a list of dicts (for streaming)."""
    for i in range(0, len(records), batch_size):
        chunk = records[i : i + batch_size]
        yield pa.RecordBatch.from_pydict(
            {f.name: [r.get(f.name) for r in chunk] for f in schema},
            schema=schema,
        )


# ── Compute ────────────────────────────────────────────────────────────────

def filter_table(table: pa.Table, **conditions) -> pa.Table:
    """Filter table with keyword equality conditions."""
    mask = None
    for col, val in conditions.items():
        col_mask = pc.equal(table[col], val)
        mask     = col_mask if mask is None else pc.and_(mask, col_mask)
    return table.filter(mask) if mask is not None else table


def aggregate_by(
    table:      pa.Table,
    group_col:  str,
    value_col:  str,
    agg:        str = "sum",  # "sum" | "mean" | "count" | "min" | "max"
) -> pa.Table:
    """Group-by aggregation using PyArrow compute."""
    from pyarrow import acero

    aggregation = {
        "sum":   ("sum", value_col),
        "mean":  ("mean", value_col),
        "count": ("count", value_col),
        "min":   ("min", value_col),
        "max":   ("max", value_col),
    }[agg]

    plan = acero.Declaration(
        "aggregate",
        acero.AggregateNodeOptions(
            aggregations=[aggregation],
            keys=[group_col],
        ),
        inputs=[acero.Declaration("table_source", acero.TableSourceNodeOptions(table))],
    )

    return plan.to_table()


def sort_table(
    table:   pa.Table,
    by:      str,
    descend: bool = False,
) -> pa.Table:
    order = "descending" if descend else "ascending"
    indices = pc.sort_indices(table, sort_keys=[(by, order)])
    return table.take(indices)

Parquet I/O

# lib/arrow/parquet_io.py — efficient Parquet read/write with partitioning
import os
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import s3fs


def get_filesystem(path: str):
    """Return the appropriate filesystem for the given path."""
    if path.startswith("s3://"):
        return s3fs.S3FileSystem(
            key=os.environ.get("AWS_ACCESS_KEY_ID"),
            secret=os.environ.get("AWS_SECRET_ACCESS_KEY"),
            endpoint_url=os.environ.get("AWS_ENDPOINT_URL"),
        )
    return None  # Local filesystem


def write_parquet(
    table:     pa.Table,
    path:      str,
    partition_by: list[str] | None = None,
    compression: str = "zstd",
    row_group_size: int = 256 * 1024,  # 256 KB
) -> None:
    """Write Arrow table to Parquet, optionally partitioned."""
    fs  = get_filesystem(path)
    uri = path.replace("s3://", "") if path.startswith("s3://") else path

    if partition_by:
        pq.write_to_dataset(
            table,
            root_path=uri,
            partition_cols=partition_by,
            existing_data_behavior="overwrite_or_ignore",
            compression=compression,
            filesystem=fs,
        )
    else:
        pq.write_table(
            table,
            uri,
            compression=compression,
            row_group_size=row_group_size,
            filesystem=fs,
        )


def read_parquet(
    path:     str,
    columns:  list[str] | None = None,
    filters:  list | None = None,  # [[("col", "op", val), ...], ...]
) -> pa.Table:
    """Read Parquet files with optional column and predicate pushdown."""
    fs  = get_filesystem(path)
    uri = path.replace("s3://", "") if path.startswith("s3://") else path

    dataset = ds.dataset(uri, format="parquet", filesystem=fs)

    return dataset.to_table(
        columns=columns,
        filter=ds.Expression._call("and", [])  # pass pyarrow expressions directly
        if not filters else pq.filters_to_expression(filters),
    )

Arrow IPC Streaming

# lib/arrow/ipc.py — Arrow IPC stream serialization
import io
import pyarrow as pa
import pyarrow.ipc as ipc


def serialize_batches(
    batches: list[pa.RecordBatch],
    schema:  pa.Schema,
) -> bytes:
    """Serialize RecordBatches to Arrow IPC stream format."""
    sink   = pa.BufferOutputStream()
    writer = ipc.new_stream(sink, schema)
    for batch in batches:
        writer.write_batch(batch)
    writer.close()
    return sink.getvalue().to_pybytes()


def deserialize_batches(data: bytes) -> list[pa.RecordBatch]:
    """Deserialize Arrow IPC stream to RecordBatches."""
    reader  = ipc.open_stream(pa.py_buffer(data))
    batches = []
    try:
        while True:
            batches.append(reader.read_next_batch())
    except StopIteration:
        pass
    return batches


def table_to_ipc(table: pa.Table) -> bytes:
    return serialize_batches(table.to_batches(), table.schema)


def table_from_ipc(data: bytes) -> pa.Table:
    return pa.Table.from_batches(deserialize_batches(data))

TypeScript Arrow (apache-arrow)

// lib/arrow/client.ts — Apache Arrow in TypeScript/Node.js
import {
  tableFromArrays,
  tableFromIPC,
  RecordBatchReader,
  Table,
  Int32,
  Float64,
  Utf8,
  makeVector,
  Uint8,
} from "apache-arrow"

/** Build an Arrow table from typed arrays */
export function buildOrdersTable(orders: Array<{
  orderId: string
  userId:  string
  amount:  number
  status:  string
}>): Table {
  return tableFromArrays({
    order_id: orders.map((o) => o.orderId),
    user_id:  orders.map((o) => o.userId),
    amount:   new Float64Array(orders.map((o) => o.amount)),
    status:   orders.map((o) => o.status),
  })
}

/** Serialize Arrow table to IPC bytes for transport */
export function tableToBytes(table: Table): Uint8Array {
  const batches: Uint8Array[] = []
  let size = 0

  for (const batch of table.batches) {
    const writer = RecordBatchReader.from([batch])
    // Collect IPC bytes using the streaming format
  }

  // Use ipc.serialize from apache-arrow
  const { tableToIPC } = require("apache-arrow")
  return tableToIPC(table)
}

/** Deserialize Arrow IPC bytes to table */
export function tableFromBytes(bytes: Uint8Array): Table {
  return tableFromIPC(bytes)
}

/** Aggregate Arrow table in JS (no round-trip to Python) */
export function groupBySum(
  table: Table,
  groupCol: string,
  sumCol:   string,
): Record<string, number> {
  const result: Record<string, number> = {}

  for (let i = 0; i < table.numRows; i++) {
    const key   = String(table.getChild(groupCol)?.get(i))
    const value = Number(table.getChild(sumCol)?.get(i)) || 0
    result[key] = (result[key] ?? 0) + value
  }

  return result
}

For the Polars alternative when wanting a high-performance DataFrame library in Python that uses Arrow memory internally but provides a friendlier API than PyArrow for data transformations — Polars is built on Arrow and can zero-copy exchange data with PyArrow via .to_arrow() / pl.from_arrow(); use PyArrow for I/O and serialization, Polars for transformations. For the Pandas alternative when needing the most mature Python DataFrame ecosystem with maximum library compatibility (scikit-learn, statsmodels, matplotlib) — Pandas stores data in NumPy format and has overhead converting to/from Arrow, while PyArrow is preferred for high-throughput Parquet I/O, streaming pipelines, and cross-language data exchange where zero-copy matters. The Claude Skills 360 bundle includes Apache Arrow skill sets covering PyArrow tables, Parquet I/O, IPC streaming, and TypeScript Arrow. Start with the free tier to try columnar data generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free