Python’s csv module reads and writes comma-separated values. import csv. reader: for row in csv.reader(f): row — list per line. writer: w = csv.writer(f); w.writerow(row). DictReader: for d in csv.DictReader(f): d["col"] — dict per line. DictWriter: w = csv.DictWriter(f, fieldnames=[...]); w.writeheader(); w.writerow(d). dialect: csv.excel (default), csv.excel_tab, csv.unix_dialect. Sniffer: dialect = csv.Sniffer().sniff(f.read(2048)); f.seek(0); csv.reader(f, dialect). has_header: csv.Sniffer().has_header(sample). delimiter: csv.reader(f, delimiter="\t"). quotechar: csv.reader(f, quotechar="'"). quoting: csv.QUOTE_ALL, csv.QUOTE_NONNUMERIC, csv.QUOTE_NONE. skipinitialspace: csv.reader(f, skipinitialspace=True). extrasaction: csv.DictWriter(f, fieldnames=cols, extrasaction="ignore"). restkey/restval: extra columns go to restkey; missing cols default to restval. register_dialect: csv.register_dialect("pipes", delimiter="|"). lineterminator: csv.writer(f, lineterminator="\r\n"). StringIO: buf = io.StringIO(); w = csv.writer(buf); data = buf.getvalue(). escape_char: csv.writer(f, escapechar="\\", quoting=csv.QUOTE_NONE). csv.Error for bad CSV data. Claude Code generates CSV ETL loaders, typed row parsers, schema validators, and streaming processors.
CLAUDE.md for csv
## csv Stack
- Stdlib: import csv, io
- Read dicts: [row for row in csv.DictReader(open(path, encoding="utf-8"))]
- Write dicts: csv.DictWriter(f, fieldnames=cols, extrasaction="ignore")
- Detect: csv.Sniffer().sniff(f.read(1024)); f.seek(0); csv.reader(f, dialect)
- Stream: for row in csv.reader(f): process(row) — no full load into memory
- Encoding: always open with encoding="utf-8", newline="" (Windows compatibility)
csv Processing Pipeline
# app/csvutil.py — DictReader, DictWriter, Sniffer, streaming, type coercion
from __future__ import annotations
import csv
import io
import itertools
from dataclasses import dataclass, field, fields
from pathlib import Path
from typing import Any, Callable, Iterable, Iterator, TYPE_CHECKING
if TYPE_CHECKING:
pass
# ─────────────────────────────────────────────────────────────────────────────
# 1. Reading helpers
# ─────────────────────────────────────────────────────────────────────────────
def read_csv(
path: str | Path,
encoding: str = "utf-8",
dialect: str | csv.Dialect | None = None,
**reader_kwargs,
) -> list[dict[str, str]]:
"""
Read a CSV file into a list of dicts.
Auto-detects dialect when dialect=None.
Example:
rows = read_csv("data.csv")
rows = read_csv("data.tsv", delimiter="\t")
"""
p = Path(path)
with p.open(encoding=encoding, newline="") as f:
if dialect is None:
sample = f.read(4096)
f.seek(0)
try:
dialect = csv.Sniffer().sniff(sample) if sample else "excel"
except csv.Error:
dialect = "excel"
reader = csv.DictReader(f, dialect=dialect, **reader_kwargs)
return [dict(row) for row in reader]
def iter_csv(
path: str | Path,
encoding: str = "utf-8",
skip_blank: bool = True,
**reader_kwargs,
) -> Iterator[dict[str, str]]:
"""
Lazily iterate CSV rows as dicts — memory-efficient for large files.
Example:
for row in iter_csv("large.csv"):
process(row)
"""
with open(path, encoding=encoding, newline="") as f:
reader = csv.DictReader(f, **reader_kwargs)
for row in reader:
if skip_blank and not any(row.values()):
continue
yield dict(row)
def read_csv_rows(
path: str | Path,
encoding: str = "utf-8",
skip_header: bool = True,
**reader_kwargs,
) -> list[list[str]]:
"""
Read CSV as list of raw rows (list of lists).
Example:
rows = read_csv_rows("data.csv")
"""
with open(path, encoding=encoding, newline="") as f:
reader = csv.reader(f, **reader_kwargs)
rows = list(reader)
return rows[1:] if skip_header and rows else rows
def csv_header(path: str | Path, encoding: str = "utf-8") -> list[str]:
"""
Read just the header row from a CSV file without loading the data.
Example:
cols = csv_header("wide_file.csv")
"""
with open(path, encoding=encoding, newline="") as f:
return next(csv.reader(f), [])
# ─────────────────────────────────────────────────────────────────────────────
# 2. Writing helpers
# ─────────────────────────────────────────────────────────────────────────────
def write_csv(
rows: Iterable[dict],
path: str | Path,
fieldnames: list[str] | None = None,
encoding: str = "utf-8",
extrasaction: str = "ignore",
) -> int:
"""
Write dicts to a CSV file; return number of rows written.
If fieldnames is None, uses keys of the first row.
Example:
write_csv([{"name": "Alice", "score": 95}], "out.csv")
"""
rows = list(rows)
if not rows:
return 0
if fieldnames is None:
fieldnames = list(rows[0].keys())
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction=extrasaction)
writer.writeheader()
writer.writerows(rows)
return len(rows)
def to_csv_string(rows: Iterable[dict], fieldnames: list[str] | None = None) -> str:
"""
Serialize dicts to a CSV string (no file I/O).
Example:
s = to_csv_string([{"a": 1, "b": 2}, {"a": 3, "b": 4}])
"""
rows = list(rows)
if not rows:
return ""
if fieldnames is None:
fieldnames = list(rows[0].keys())
buf = io.StringIO()
writer = csv.DictWriter(buf, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
return buf.getvalue()
def append_rows(
rows: Iterable[dict],
path: str | Path,
encoding: str = "utf-8",
) -> int:
"""
Append rows to an existing CSV without re-writing the header.
Example:
append_rows([{"ts": "2024-01-16", "event": "click"}], "events.csv")
"""
rows = list(rows)
if not rows:
return 0
p = Path(path)
fname = list(rows[0].keys())
with p.open("a", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=fname, extrasaction="ignore")
writer.writerows(rows)
return len(rows)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Type coercion
# ─────────────────────────────────────────────────────────────────────────────
def coerce_row(
row: dict[str, str],
schema: dict[str, Callable[[str], Any]],
ignore_errors: bool = False,
) -> dict[str, Any]:
"""
Apply type converters to string-valued CSV row.
Example:
schema = {"id": int, "score": float, "active": lambda x: x == "true"}
cleaned = coerce_row({"id": "42", "score": "3.14", "active": "true"}, schema)
# {"id": 42, "score": 3.14, "active": True}
"""
result: dict[str, Any] = {}
for key, val in row.items():
conv = schema.get(key)
if conv is None:
result[key] = val
continue
try:
result[key] = conv(val.strip()) if val.strip() else None
except (ValueError, TypeError) as exc:
if ignore_errors:
result[key] = None
else:
raise ValueError(f"Column {key!r}: cannot convert {val!r}: {exc}") from exc
return result
def coerce_rows(
rows: Iterable[dict[str, str]],
schema: dict[str, Callable[[str], Any]],
ignore_errors: bool = False,
) -> list[dict[str, Any]]:
"""
Apply coerce_row to every row in an iterable.
Example:
schema = {"id": int, "amount": float, "name": str}
typed = coerce_rows(read_csv("orders.csv"), schema)
"""
return [coerce_row(row, schema, ignore_errors) for row in rows]
# ─────────────────────────────────────────────────────────────────────────────
# 4. Streaming / chunked processing
# ─────────────────────────────────────────────────────────────────────────────
def iter_chunks(
path: str | Path,
chunk_size: int = 1000,
encoding: str = "utf-8",
**kwargs,
) -> Iterator[list[dict[str, str]]]:
"""
Yield CSV rows in chunks of chunk_size for batch processing.
Example:
for chunk in iter_chunks("huge.csv", chunk_size=500):
db.bulk_insert(chunk)
"""
it = iter_csv(path, encoding=encoding, **kwargs)
while True:
chunk = list(itertools.islice(it, chunk_size))
if not chunk:
break
yield chunk
def filter_rows(
path: str | Path,
predicate: Callable[[dict[str, str]], bool],
encoding: str = "utf-8",
) -> list[dict[str, str]]:
"""
Read CSV and return only rows matching predicate.
Example:
active = filter_rows("users.csv", lambda r: r["status"] == "active")
"""
return [row for row in iter_csv(path, encoding=encoding) if predicate(row)]
def transform_csv(
src: str | Path,
dst: str | Path,
fn: Callable[[dict[str, str]], dict | None],
encoding: str = "utf-8",
) -> tuple[int, int]:
"""
Stream-transform src to dst: apply fn to each row.
If fn returns None the row is dropped.
Returns (rows_read, rows_written).
Example:
transform_csv(
"raw.csv", "clean.csv",
lambda r: {**r, "name": r["name"].strip().title()} if r["name"] else None,
)
"""
read = written = 0
output: list[dict] = []
fieldnames: list[str] | None = None
for row in iter_csv(src, encoding=encoding):
read += 1
result = fn(row)
if result is not None:
output.append(result)
if fieldnames is None:
fieldnames = list(result.keys())
written = write_csv(output, dst, fieldnames=fieldnames, encoding=encoding)
return read, written
# ─────────────────────────────────────────────────────────────────────────────
# 5. Dataclass CSV integration
# ─────────────────────────────────────────────────────────────────────────────
def dataclass_to_rows(objects: Iterable[Any]) -> list[dict]:
"""
Convert dataclass instances to dicts for CSV writing.
Example:
@dataclass class Product: id: int; name: str; price: float
write_csv(dataclass_to_rows(products), "products.csv")
"""
import dataclasses
return [dataclasses.asdict(obj) for obj in objects]
def rows_to_dataclass(cls, rows: Iterable[dict], schema=None) -> list:
"""
Convert CSV dicts to typed dataclass instances.
Apply schema coercion if provided.
Example:
@dataclass class Product: id: int; name: str; price: float
schema = {"id": int, "price": float}
products = rows_to_dataclass(Product, read_csv("products.csv"), schema)
"""
import dataclasses
valid = {f.name for f in dataclasses.fields(cls)}
result = []
for row in rows:
if schema:
row = coerce_row(row, schema, ignore_errors=True)
filtered = {k: v for k, v in row.items() if k in valid}
result.append(cls(**filtered))
return result
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile
print("=== csv demo ===")
sample_data = [
{"id": "1", "name": "Alice", "score": "95.5", "active": "true"},
{"id": "2", "name": "Bob", "score": "82.0", "active": "false"},
{"id": "3", "name": " Carol ", "score": "78", "active": "true"},
]
with tempfile.TemporaryDirectory() as td:
path = f"{td}/users.csv"
# Write
print("\n--- write_csv ---")
n = write_csv(sample_data, path)
print(f" wrote {n} rows")
print(f" header: {csv_header(path)}")
# Read back
print("\n--- read_csv ---")
rows = read_csv(path)
for r in rows:
print(f" {r}")
# Coerce types
print("\n--- coerce_rows ---")
schema = {"id": int, "score": float, "active": lambda x: x == "true"}
typed = coerce_rows(rows, schema)
for r in typed:
print(f" {r}")
# Filter
print("\n--- filter_rows ---")
active = filter_rows(path, lambda r: r["active"] == "true")
print(f" active users: {[r['name'] for r in active]}")
# Transform
print("\n--- transform_csv ---")
clean = f"{td}/clean.csv"
r_read, r_written = transform_csv(
path, clean,
lambda r: {**r, "name": r["name"].strip().title()},
)
print(f" read={r_read}, written={r_written}")
print(f" cleaned: {[r['name'] for r in read_csv(clean)]}")
# Chunks
print("\n--- iter_chunks ---")
chunks = list(iter_chunks(path, chunk_size=2))
print(f" {len(chunks)} chunks: sizes={[len(c) for c in chunks]}")
# StringIO
print("\n--- to_csv_string ---")
s = to_csv_string(sample_data[:2])
print(f" {s.strip()!r}")
# Dataclass round-trip
print("\n--- dataclass round-trip ---")
@dataclass
class User:
id: str = ""
name: str = ""
score: str = ""
active: str = ""
users = rows_to_dataclass(User, rows)
print(f" users: {users}")
print("\n=== done ===")
For the pandas alternative — pandas.read_csv() auto-detects types, handles encoding variations, supports chunked reading (chunksize), provides vectorized column operations, and integrates directly with Parquet, Excel, and databases; stdlib csv offers zero overhead, no dependencies, and fine-grained control per row — use pandas when you need DataFrame operations, statistical aggregation, or output to multiple formats from the same data, stdlib csv when you are streaming large files row by row, building ETL pipelines that transform records individually, or embedding CSV handling in a library that should have no heavy dependencies. For the polars alternative — polars.read_csv() reads CSV into a lazy or eager DataFrame with Rust-speed performance, parallel parsing, and a strong type system enforced at read time; stdlib csv is single-threaded and returns strings — use polars when CSV files are GB-scale and you need columnar aggregations or joins, stdlib csv for sub-100MB files, row-by-row processing, or when adding third-party dependencies is not acceptable. The Claude Skills 360 bundle includes csv skill sets covering read_csv()/iter_csv()/read_csv_rows()/csv_header() reading helpers, write_csv()/to_csv_string()/append_rows() writing helpers, coerce_row()/coerce_rows() type coercion with schema dicts, iter_chunks()/filter_rows()/transform_csv() streaming processing, and dataclass_to_rows()/rows_to_dataclass() dataclass integration. Start with the free tier to try tabular data ingestion and csv pipeline code generation.