Python’s io module provides in-memory stream objects compatible with file I/O. import io. StringIO: buf = io.StringIO(); buf.write("text"); buf.getvalue() — in-memory text. BytesIO: buf = io.BytesIO(initial_bytes) — in-memory bytes. getvalue: buf.getvalue() — full buffer contents without moving position. seek: buf.seek(0) — rewind; buf.seek(0, io.SEEK_END) — end. tell: buf.tell() — current position. truncate: buf.truncate(n) — resize to n bytes/chars. read: buf.read(n) — n chars/bytes; buf.read() — all. readline: buf.readline() — up to newline. readlines: buf.readlines() — list of lines. write: buf.write(s) — append at position. writelines: buf.writelines(lines). TextIOWrapper: io.TextIOWrapper(BytesIO(data), encoding="utf-8") — decode bytes as text stream. BufferedReader: io.BufferedReader(raw_io, buffer_size=io.DEFAULT_BUFFER_SIZE). BufferedWriter: wraps raw writable with buffering. RawIOBase / IOBase: base classes for custom streams. SEEK_SET=0, SEEK_CUR=1, SEEK_END=2. closed: buf.closed — True after close(). context manager: with io.StringIO() as buf: — auto-closes. Claude Code generates in-memory CSV formatters, HTTP response buffers, ZIP-in-memory builders, and test stream fixtures.
CLAUDE.md for io
## io Stack
- Stdlib: import io
- Text buffer: buf = io.StringIO(); buf.write("..."); content = buf.getvalue()
- Binary buffer: buf = io.BytesIO(); buf.write(b"..."); buf.seek(0); buf.read()
- Wrap bytes as text: io.TextIOWrapper(BytesIO(data), encoding="utf-8")
- Rewind: buf.seek(0) — always seek(0) before reading getvalue-less
- CSV in memory: writer = csv.writer(io.StringIO()); s = writer.getvalue()
io In-Memory Stream Pipeline
# app/streamutil.py — StringIO, BytesIO, TextIOWrapper, capture, pipe
from __future__ import annotations
import contextlib
import io
import sys
from typing import Any, BinaryIO, Generator, TextIO
# ─────────────────────────────────────────────────────────────────────────────
# 1. StringIO helpers
# ─────────────────────────────────────────────────────────────────────────────
def build_text(lines: list[str], sep: str = "\n") -> str:
"""
Concatenate lines into a string using an in-memory buffer.
Example:
text = build_text(["line 1", "line 2", "line 3"])
"""
buf = io.StringIO()
buf.write(sep.join(lines))
return buf.getvalue()
def text_lines(text: str) -> list[str]:
"""
Split text into lines using StringIO readline iteration.
Example:
lines = text_lines("a\nb\nc\n") # ["a\n", "b\n", "c\n"]
"""
buf = io.StringIO(text)
return buf.readlines()
def iter_text_lines(text: str) -> list[str]:
"""
Iterate over a string line by line as if it were a file.
Strips trailing newlines.
Example:
for line in iter_text_lines(response_body):
process(line)
"""
buf = io.StringIO(text)
return [line.rstrip("\n") for line in buf]
@contextlib.contextmanager
def string_buffer() -> Generator[io.StringIO, None, None]:
"""
Context manager yielding a StringIO; auto-closes.
Example:
with string_buffer() as buf:
csv.writer(buf).writerows(rows)
result = buf.getvalue()
"""
buf = io.StringIO()
try:
yield buf
finally:
buf.close()
# ─────────────────────────────────────────────────────────────────────────────
# 2. BytesIO helpers
# ─────────────────────────────────────────────────────────────────────────────
def concat_bytes(*chunks: bytes) -> bytes:
"""
Concatenate byte chunks via BytesIO (avoids N intermediate copies).
Example:
data = concat_bytes(header, body, footer)
"""
buf = io.BytesIO()
for chunk in chunks:
buf.write(chunk)
return buf.getvalue()
def bytes_to_lines(data: bytes, encoding: str = "utf-8") -> list[str]:
"""
Decode bytes and return lines using TextIOWrapper over BytesIO.
Example:
lines = bytes_to_lines(response.content)
"""
wrapper = io.TextIOWrapper(io.BytesIO(data), encoding=encoding)
return [line.rstrip("\n") for line in wrapper]
def encode_text(text: str, encoding: str = "utf-8") -> bytes:
"""
Encode text to bytes via a BytesIO round-trip.
Useful for streaming text to a binary API.
Example:
payload = encode_text("Hello, world!")
"""
buf = io.BytesIO()
wrapper = io.TextIOWrapper(buf, encoding=encoding, write_through=True)
wrapper.write(text)
wrapper.flush()
return buf.getvalue()
@contextlib.contextmanager
def byte_buffer(initial: bytes = b"") -> Generator[io.BytesIO, None, None]:
"""
Context manager yielding a BytesIO buffer; auto-closes.
Example:
with byte_buffer() as buf:
PIL.Image.fromarray(arr).save(buf, format="PNG")
png_bytes = buf.getvalue()
"""
buf = io.BytesIO(initial)
try:
yield buf
finally:
buf.close()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Stdout / stderr capture
# ─────────────────────────────────────────────────────────────────────────────
@contextlib.contextmanager
def capture_stdout() -> Generator[io.StringIO, None, None]:
"""
Capture everything printed to stdout in a StringIO buffer.
Example:
with capture_stdout() as out:
print("hello")
print("world")
assert out.getvalue() == "hello\nworld\n"
"""
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
yield buf
@contextlib.contextmanager
def capture_stderr() -> Generator[io.StringIO, None, None]:
"""
Capture everything written to stderr.
Example:
with capture_stderr() as err:
print("oops", file=sys.stderr)
assert "oops" in err.getvalue()
"""
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
yield buf
@contextlib.contextmanager
def capture_output() -> Generator[dict[str, io.StringIO], None, None]:
"""
Capture stdout and stderr simultaneously.
Yields {"stdout": StringIO, "stderr": StringIO}.
Example:
with capture_output() as streams:
run_command()
print(streams["stdout"].getvalue())
"""
out_buf = io.StringIO()
err_buf = io.StringIO()
with contextlib.redirect_stdout(out_buf), contextlib.redirect_stderr(err_buf):
yield {"stdout": out_buf, "stderr": err_buf}
# ─────────────────────────────────────────────────────────────────────────────
# 4. CSV and JSON in-memory
# ─────────────────────────────────────────────────────────────────────────────
def rows_to_csv_bytes(
rows: list[dict],
fieldnames: list[str] | None = None,
encoding: str = "utf-8",
) -> bytes:
"""
Serialize dicts to CSV bytes without touching the filesystem.
Example:
data = rows_to_csv_bytes([{"name": "Alice", "score": 95}])
response.headers["Content-Type"] = "text/csv"
response.body = data
"""
import csv
text_buf = io.StringIO()
fn = fieldnames or list(rows[0].keys()) if rows else []
writer = csv.DictWriter(text_buf, fieldnames=fn, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)
return text_buf.getvalue().encode(encoding)
def json_to_bytes(obj: Any, indent: int | None = None) -> bytes:
"""
Serialize to JSON bytes in-memory.
Example:
payload = json_to_bytes({"event": "charge.created", "amount": 4999})
"""
import json
buf = io.BytesIO()
wrapper = io.TextIOWrapper(buf, encoding="utf-8", write_through=True)
json.dump(obj, wrapper, indent=indent, default=str)
wrapper.flush()
return buf.getvalue()
def zip_in_memory(files: dict[str, bytes]) -> bytes:
"""
Create a ZIP archive in memory without writing to disk.
Example:
zipped = zip_in_memory({
"report.csv": csv_bytes,
"summary.json": json_bytes,
})
"""
import zipfile
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for name, data in files.items():
zf.writestr(name, data)
return buf.getvalue()
# ─────────────────────────────────────────────────────────────────────────────
# 5. Stream position utilities
# ─────────────────────────────────────────────────────────────────────────────
def rewind(stream: io.IOBase) -> io.IOBase:
"""Seek to position 0 and return the stream."""
stream.seek(0)
return stream
def stream_size(stream: io.IOBase) -> int:
"""Return total byte/char size of a seekable stream."""
pos = stream.tell()
stream.seek(0, io.SEEK_END)
size = stream.tell()
stream.seek(pos)
return size
def read_chunks(
stream: BinaryIO,
chunk_size: int = 65536,
) -> list[bytes]:
"""
Read a binary stream in fixed-size chunks.
Example:
for chunk in read_chunks(response.raw):
hasher.update(chunk)
"""
chunks = []
while True:
chunk = stream.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
return chunks
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== io demo ===")
print("\n--- build_text / iter_text_lines ---")
text = build_text(["alpha", "beta", "gamma"])
print(f" built: {text!r}")
print(f" lines: {iter_text_lines(text)}")
print("\n--- string_buffer (CSV in memory) ---")
import csv
with string_buffer() as buf:
w = csv.writer(buf)
w.writerows([["name", "score"], ["Alice", 95], ["Bob", 82]])
csv_text = buf.getvalue()
print(f" csv:\n{csv_text}", end="")
print("\n--- bytes_to_lines ---")
raw = b"line one\nline two\nline three"
print(f" lines: {bytes_to_lines(raw)}")
print("\n--- concat_bytes ---")
merged = concat_bytes(b"hello", b" ", b"world")
print(f" merged: {merged!r}")
print("\n--- encode_text ---")
encoded = encode_text("Hello, \u4e16\u754c!")
print(f" encoded: {encoded!r}")
print("\n--- capture_stdout ---")
with capture_stdout() as out:
print("captured line 1")
print("captured line 2")
print(f" output: {out.getvalue()!r}")
print("\n--- capture_output ---")
with capture_output() as streams:
print("stdout message")
print("stderr message", file=sys.stderr)
print(f" stdout: {streams['stdout'].getvalue()!r}")
print(f" stderr: {streams['stderr'].getvalue()!r}")
print("\n--- rows_to_csv_bytes ---")
rows = [{"name": "Alice", "score": 95}, {"name": "Bob", "score": 82}]
b = rows_to_csv_bytes(rows)
print(f" {len(b)} bytes: {b[:40]!r}...")
print("\n--- json_to_bytes ---")
from datetime import datetime, timezone
payload = {"event": "charge.created", "ts": datetime.now(timezone.utc)}
jb = json_to_bytes(payload, indent=None)
print(f" {jb!r}")
print("\n--- zip_in_memory ---")
zipped = zip_in_memory({"data.csv": b"a,b\n1,2", "meta.txt": b"version=1"})
print(f" zip: {len(zipped)} bytes")
print("\n--- stream_size ---")
buf = io.BytesIO(b"hello world")
buf.seek(3)
print(f" size={stream_size(buf)} pos restored to {buf.tell()}")
print("\n=== done ===")
For the tempfile alternative — tempfile.SpooledTemporaryFile(max_size=N) uses in-memory storage (like BytesIO) for data under max_size, then spills to disk automatically when the limit is exceeded; io.BytesIO stays in memory always — use SpooledTemporaryFile for large or unknown-size streams that may exceed available RAM, io.BytesIO/io.StringIO when the data is bounded and fits comfortably in memory. For the pyarrow / fsspec alternative — pyarrow.BufferOutputStream and fsspec.implementations.memory.MemoryFileSystem provide high-performance in-memory I/O integrated with Arrow IPC, Parquet, and cloud filesystem abstractions; io.BytesIO has no Arrow integration but zero overhead — use PyArrow or fsspec streams when building data pipelines that produce Parquet or Arrow IPC in memory for S3/GCS upload, io.BytesIO for general-purpose HTTP response bodies, in-memory archives, and test fixtures. The Claude Skills 360 bundle includes io skill sets covering build_text()/text_lines()/iter_text_lines()/string_buffer() StringIO helpers, concat_bytes()/bytes_to_lines()/encode_text()/byte_buffer() BytesIO helpers, capture_stdout()/capture_stderr()/capture_output() stream capture, rows_to_csv_bytes()/json_to_bytes()/zip_in_memory() serialization helpers, and rewind()/stream_size()/read_chunks() stream position utilities. Start with the free tier to try in-memory streaming and io pipeline code generation.