pandas is the Python data analysis library for tabular data. pip install pandas. DataFrame: import pandas as pd; df = pd.DataFrame({"col":[1,2,3]}). read_csv: df = pd.read_csv("data.csv"). read_json: df = pd.read_json("data.json"). read_excel: df = pd.read_excel("report.xlsx", sheet_name="Sheet1"). read_parquet: df = pd.read_parquet("data.parquet"). info: df.info(). describe: df.describe(). head/tail: df.head(10). shape: df.shape → (rows, cols). dtypes: df.dtypes. loc: df.loc[df["age"]>18, ["name","email"]]. iloc: df.iloc[0:5, 1:3]. query: df.query("age > 18 and country == 'US'"). assign: df.assign(full_name=df["first"]+df["last"]). apply: df["col"].apply(lambda x: x*2). map: df["status"].map({"active":1,"inactive":0}). groupby: df.groupby("country")["revenue"].sum(). agg: df.groupby("cat").agg({"val":["mean","std","count"]}). merge: df.merge(other, on="id", how="left"). concat: pd.concat([df1, df2], ignore_index=True). pivot_table: df.pivot_table(values="sales",index="month",columns="product",aggfunc="sum"). fillna: df["col"].fillna(0). dropna: df.dropna(subset=["email"]). to_datetime: pd.to_datetime(df["date"]). resample: df.set_index("date").resample("M")["sales"].sum(). value_counts: df["country"].value_counts(). to_csv: df.to_csv("out.csv", index=False). to_parquet: df.to_parquet("out.parquet"). Claude Code generates pandas pipelines, ETL scripts, data quality checks, and aggregation reports.
CLAUDE.md for pandas
## pandas Stack
- Version: pandas >= 2.0 | pip install pandas pyarrow
- Load: pd.read_csv/read_json/read_excel/read_parquet | df.info() | df.describe()
- Select: df.loc[mask, cols] | df.query("col > val") | df.iloc[rows, cols]
- Transform: df.assign(new=...) | df["col"].apply(fn) | df.groupby(keys).agg(dict)
- Combine: df.merge(other, on="key", how="left") | pd.concat([dfs], ignore_index=True)
- Export: df.to_csv("out.csv", index=False) | df.to_parquet("out.parquet")
pandas Data Pipeline
# app/data_pipeline.py — pandas load, clean, transform, aggregate, merge, export
from __future__ import annotations
import logging
from io import StringIO
from pathlib import Path
from typing import Any
import pandas as pd
import numpy as np
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Loading helpers
# ─────────────────────────────────────────────────────────────────────────────
def load_csv(
path: str | Path,
parse_dates: list[str] | None = None,
dtype: dict | None = None,
low_memory: bool = False,
encoding: str = "utf-8",
) -> pd.DataFrame:
"""
Load a CSV with sensible defaults.
Example:
df = load_csv("orders.csv", parse_dates=["created_at"])
df = load_csv("large.csv", dtype={"id": "int32", "code": "category"})
"""
return pd.read_csv(
path,
parse_dates=parse_dates,
dtype=dtype,
low_memory=low_memory,
encoding=encoding,
)
def load_json_records(data: str | list[dict]) -> pd.DataFrame:
"""
Load JSON records (list of dicts or JSON string).
Example:
df = load_json_records([{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}])
"""
if isinstance(data, str):
return pd.read_json(StringIO(data), orient="records")
return pd.DataFrame(data)
def load_multi_csv(paths: list[str | Path], **read_kwargs) -> pd.DataFrame:
"""
Load and concat multiple CSVs.
Example:
df = load_multi_csv(["jan.csv", "feb.csv", "mar.csv"], parse_dates=["date"])
"""
frames = []
for p in paths:
try:
frames.append(pd.read_csv(p, **read_kwargs))
except Exception as exc:
log.warning("Could not load %s: %s", p, exc)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Cleaning helpers
# ─────────────────────────────────────────────────────────────────────────────
def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize column names: lowercase, strip, replace spaces/hyphens with underscores.
Example:
df = clean_column_names(df)
# " First Name " → "first_name", "Order-ID" → "order_id"
"""
df = df.copy()
df.columns = (
df.columns
.str.strip()
.str.lower()
.str.replace(r"[\s\-]+", "_", regex=True)
.str.replace(r"[^\w]", "", regex=True)
)
return df
def drop_duplicate_rows(df: pd.DataFrame, subset: list[str] | None = None, keep: str = "first") -> pd.DataFrame:
"""
Drop duplicate rows, logging the count removed.
Example:
df = drop_duplicate_rows(df, subset=["order_id"])
"""
before = len(df)
df = df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(df)
if removed:
log.info("Dropped %d duplicate rows", removed)
return df
def fill_missing(
df: pd.DataFrame,
numeric_fill: float = 0.0,
string_fill: str = "",
bool_fill: bool = False,
) -> pd.DataFrame:
"""
Fill missing values by column dtype.
Example:
df = fill_missing(df, numeric_fill=0, string_fill="unknown")
"""
df = df.copy()
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
df[col] = df[col].fillna(numeric_fill)
elif pd.api.types.is_bool_dtype(df[col]):
df[col] = df[col].fillna(bool_fill)
elif pd.api.types.is_object_dtype(df[col]):
df[col] = df[col].fillna(string_fill)
return df
def parse_dates_auto(df: pd.DataFrame, date_cols: list[str]) -> pd.DataFrame:
"""
Parse date columns to datetime, inferring format.
Example:
df = parse_dates_auto(df, ["created_at", "updated_at", "birth_date"])
"""
df = df.copy()
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce", infer_datetime_format=True)
return df
def cast_columns(df: pd.DataFrame, schema: dict[str, str]) -> pd.DataFrame:
"""
Cast columns to specified dtypes.
Example:
df = cast_columns(df, {"user_id": "int32", "category": "category", "score": "float32"})
"""
df = df.copy()
for col, dtype in schema.items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except Exception as exc:
log.warning("Could not cast %s to %s: %s", col, dtype, exc)
return df
def report_quality(df: pd.DataFrame) -> dict:
"""
Generate a data quality report: missing counts, dtypes, unique values.
Example:
report = report_quality(df)
for col, stats in report["columns"].items():
print(f"{col}: {stats['null_pct']:.1f}% null, {stats['unique']} unique")
"""
total = len(df)
columns = {}
for col in df.columns:
null_count = int(df[col].isna().sum())
columns[col] = {
"dtype": str(df[col].dtype),
"null_count": null_count,
"null_pct": round(null_count / total * 100, 2) if total else 0,
"unique": int(df[col].nunique()),
}
return {
"rows": total,
"cols": len(df.columns),
"columns": columns,
}
# ─────────────────────────────────────────────────────────────────────────────
# 3. Transformation helpers
# ─────────────────────────────────────────────────────────────────────────────
def add_date_parts(df: pd.DataFrame, date_col: str, prefix: str | None = None) -> pd.DataFrame:
"""
Expand a datetime column into year/month/day/weekday/quarter columns.
Example:
df = add_date_parts(df, "created_at")
# Adds: created_at_year, created_at_month, created_at_day, ...
"""
df = df.copy()
pfx = prefix or date_col
col = pd.to_datetime(df[date_col], errors="coerce")
df[f"{pfx}_year"] = col.dt.year
df[f"{pfx}_month"] = col.dt.month
df[f"{pfx}_day"] = col.dt.day
df[f"{pfx}_weekday"] = col.dt.day_name()
df[f"{pfx}_quarter"] = col.dt.quarter
return df
def bin_numeric(
df: pd.DataFrame,
col: str,
bins: int | list,
labels: list | None = None,
out_col: str | None = None,
) -> pd.DataFrame:
"""
Bin a numeric column into categories.
Example:
df = bin_numeric(df, "age", bins=[0,18,35,60,100], labels=["child","adult","midlife","senior"])
"""
df = df.copy()
df[out_col or f"{col}_bin"] = pd.cut(df[col], bins=bins, labels=labels, include_lowest=True)
return df
def top_n_others(
df: pd.DataFrame,
col: str,
n: int = 10,
other_label: str = "Other",
) -> pd.DataFrame:
"""
Keep top n values in a categorical column, relabeling the rest as 'Other'.
Example:
df = top_n_others(df, "country", n=10) # keeps top 10 countries
"""
df = df.copy()
top = df[col].value_counts().nlargest(n).index
df[col] = df[col].where(df[col].isin(top), other=other_label)
return df
# ─────────────────────────────────────────────────────────────────────────────
# 4. Aggregation helpers
# ─────────────────────────────────────────────────────────────────────────────
def summarize_by(
df: pd.DataFrame,
group_cols: list[str] | str,
agg: dict[str, str | list[str]],
sort_col: str | None = None,
ascending: bool = False,
) -> pd.DataFrame:
"""
Group and aggregate, flattening multi-level column headers.
Example:
summary = summarize_by(
df, "country",
agg={"revenue": ["sum", "mean"], "orders": "count"},
sort_col="revenue_sum",
)
"""
g = df.groupby(group_cols).agg(agg).reset_index()
# Flatten multi-level columns: ("revenue","sum") → "revenue_sum"
if isinstance(g.columns, pd.MultiIndex):
g.columns = ["_".join(filter(None, c)).strip("_") for c in g.columns]
if sort_col and sort_col in g.columns:
g = g.sort_values(sort_col, ascending=ascending)
return g
def frequency_table(
df: pd.DataFrame,
col: str,
top_n: int | None = None,
pct: bool = True,
) -> pd.DataFrame:
"""
frequency table with count and optional percentage.
Example:
ft = frequency_table(df, "status", top_n=10)
"""
counts = df[col].value_counts(dropna=False)
if top_n:
counts = counts.head(top_n)
result = counts.rename("count").reset_index()
result.columns = [col, "count"]
if pct:
result["pct"] = (result["count"] / result["count"].sum() * 100).round(2)
return result
def rolling_agg(
df: pd.DataFrame,
date_col: str,
value_col: str,
window: int = 7,
freq: str = "D",
agg: str = "sum",
) -> pd.DataFrame:
"""
Compute a rolling aggregate on a time-indexed column.
Example:
daily = rolling_agg(df, "date", "revenue", window=7, agg="sum")
# Returns daily series with 7-day rolling sum column
"""
ts = (
df.set_index(pd.to_datetime(df[date_col]))[value_col]
.resample(freq)
.agg(agg)
.reset_index()
)
ts.columns = [date_col, value_col]
ts[f"{value_col}_rolling_{window}"] = ts[value_col].rolling(window, min_periods=1).mean()
return ts
# ─────────────────────────────────────────────────────────────────────────────
# 5. Export helpers
# ─────────────────────────────────────────────────────────────────────────────
def to_csv(df: pd.DataFrame, path: str | Path, index: bool = False, **kwargs) -> Path:
"""
Write DataFrame to CSV.
Example:
to_csv(summary_df, "reports/summary.csv")
"""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(p, index=index, **kwargs)
log.info("Wrote %d rows to %s", len(df), p)
return p
def to_parquet(df: pd.DataFrame, path: str | Path, **kwargs) -> Path:
"""
Write DataFrame to Parquet (requires pyarrow or fastparquet).
Example:
to_parquet(cleaned_df, "data/cleaned.parquet")
"""
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(p, index=False, **kwargs)
log.info("Wrote %d rows to %s", len(df), p)
return p
def to_json_records(df: pd.DataFrame, orient: str = "records") -> list[dict]:
"""
Convert DataFrame to a list of dicts (JSON-serializable).
Example:
records = to_json_records(df)
return jsonify(records)
"""
return df.to_dict(orient=orient)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import random, string
rng = random.Random(42)
# Generate synthetic orders dataset
countries = ["US", "UK", "DE", "FR", "JP", "AU", "CA", "BR", "IN", "MX"]
statuses = ["completed", "pending", "cancelled", "refunded"]
records = []
for i in range(1, 201):
records.append({
"Order ID": i,
"Customer": "".join(rng.choices(string.ascii_uppercase, k=6)),
"Country": rng.choice(countries),
"Status": rng.choice(statuses),
"Revenue": round(rng.uniform(10, 1000), 2),
"Items": rng.randint(1, 20),
"Created At": f"2024-{rng.randint(1,12):02d}-{rng.randint(1,28):02d}",
})
df = pd.DataFrame(records)
print("=== clean_column_names ===")
df = clean_column_names(df)
print(f" columns: {list(df.columns)}")
print("\n=== report_quality ===")
quality = report_quality(df)
print(f" rows={quality['rows']}, cols={quality['cols']}")
for col, stats in list(quality["columns"].items())[:3]:
print(f" {col}: dtype={stats['dtype']}, unique={stats['unique']}, null%={stats['null_pct']}")
print("\n=== parse_dates_auto + add_date_parts ===")
df = parse_dates_auto(df, ["created_at"])
df = add_date_parts(df, "created_at")
print(f" created_at dtype: {df['created_at'].dtype}")
print(f" months: {sorted(df['created_at_month'].unique())}")
print("\n=== summarize_by country ===")
summary = summarize_by(df, "country", {"revenue": ["sum","mean"], "items": "count"}, "revenue_sum")
print(f" top 3 countries by revenue:\n{summary.head(3).to_string(index=False)}")
print("\n=== frequency_table status ===")
ft = frequency_table(df, "status")
print(ft.to_string(index=False))
print("\n=== rolling_agg monthly revenue ===")
monthly = rolling_agg(df, "created_at", "revenue", window=3, freq="ME")
print(f" monthly rows: {len(monthly)}")
print(f" {monthly.tail(3).to_string(index=False)}")
For the polars alternative — polars is a modern DataFrame library written in Rust with a lazy evaluation engine, true multi-threaded execution, and 5–50× faster performance on large datasets compared to pandas; pandas has a far larger ecosystem (integration with matplotlib, scikit-learn, statsmodels, and thousands of data science tools), more extensive documentation, and wider community support — use polars for large-scale data processing pipelines where performance is critical, pandas when you need ecosystem depth and the performance difference doesn’t matter at your data size. For the dask alternative — dask parallelizes pandas operations across multiple cores or a cluster, enabling DataFrames larger than RAM by processing data in chunks; pandas works entirely in memory and is single-threaded by default — use dask when your data exceeds available RAM or you need multi-core/distributed execution of pandas-like operations, pandas for in-memory datasets where simplicity and ecosystem compatibility matter more than parallelism. The Claude Skills 360 bundle includes pandas skill sets covering load_csv()/load_json_records()/load_multi_csv(), clean_column_names()/drop_duplicate_rows()/fill_missing()/parse_dates_auto()/cast_columns()/report_quality(), add_date_parts()/bin_numeric()/top_n_others() transforms, summarize_by()/frequency_table()/rolling_agg() aggregations, and to_csv()/to_parquet()/to_json_records() exports. Start with the free tier to try data analysis and ETL pipeline code generation.