Python’s pipes module (Unix only) constructs shell command pipelines programmatically. import pipes. Create template: t = pipes.Template(), then .append(cmd, kind) where cmd is a shell string with $IN and $OUT placeholders and kind is one of 'f' (filter: reads stdin, writes stdout), '--' (source: no stdin, reads no file), '-f' (reads file $IN, writes stdout), 'f-' (reads stdin, writes $OUT), or 'ff' (reads $IN, writes $OUT). Run: t.open(infile, mode) returns a file-like object; mode 'r' reads from the pipeline, 'w' writes into it. Clone: t2 = t.clone(). Reset: t.reset(). Quote: pipes.quote(filename) — safely shell-escapes a filename (same as shlex.quote). pipes.STDIN, pipes.STDOUT — sentinel strings. Example: append 'gzip -c $IN > $OUT', 'ff' to compress. Note: pipes is deprecated in Python 3.11 and removed in 3.13 — always include a try/except ImportError guard; subprocess.Popen chains are the recommended replacement. Claude Code generates composable text pipelines, filter chains, multi-file transformers, and shell command constructors.
CLAUDE.md for pipes
## pipes Stack
- Stdlib: import pipes (deprecated 3.11, removed 3.13 — guard with try/except)
- Create: t = pipes.Template()
- Append: t.append("gzip -c $IN > $OUT", "ff") # file→file
- t.append("wc -l", "f") # filter stdin→stdout
- Run: f = t.open("input.txt", "r") # read output of pipeline
- f = t.open("output.txt", "w") # write into pipeline
- Quote: pipes.quote(filename) # same as shlex.quote()
- Note: kind flags — 'f'=filter 'ff'=file-to-file '-f'=read-file 'f-'=write-file
pipes Shell Pipeline Constructor Pipeline
# app/pipesutil.py — template builder, subprocess fallback, composable chains
from __future__ import annotations
import io
import os
import platform
import shlex
import subprocess
import sys
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
_PIPES_AVAILABLE = platform.system() != "Windows"
# Guard for Python 3.13+ where pipes is removed
try:
import pipes as _pipes
_PIPES_MODULE_AVAILABLE = True
except ImportError:
_PIPES_MODULE_AVAILABLE = False
# ─────────────────────────────────────────────────────────────────────────────
# 1. Shell-safe quoting (works without pipes module)
# ─────────────────────────────────────────────────────────────────────────────
def shell_quote(s: str) -> str:
"""
Return a shell-safe quoted version of a string.
Uses shlex.quote (same implementation as pipes.quote).
Example:
cmd = f"cat {shell_quote(filename)} | wc -l"
"""
return shlex.quote(s)
def shell_join(args: list[str]) -> str:
"""
Join a list of arguments into a shell-safe string.
Example:
cmd = shell_join(["grep", "-r", "pattern", "/some path/with spaces"])
"""
return shlex.join(args)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Subprocess-based pipeline (Python 3.13 replacement for pipes.Template)
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class PipeStep:
"""A single filter step in a pipeline."""
cmd: str # shell command string; use {input} and {output} for file→file
kind: str = "f" # 'f'=filter, 'ff'=file-to-file
def __str__(self) -> str:
return f"PipeStep({self.cmd!r}, kind={self.kind!r})"
class Pipeline:
"""
Composable shell pipeline built on subprocess.
Drop-in conceptual replacement for pipes.Template with explicit subprocess wiring.
Example:
pl = Pipeline()
pl.append("tr a-z A-Z") # uppercase filter
pl.append("grep -v '^$'") # remove blank lines
result = pl.run_text("hello world\n\ngoodbye")
"""
def __init__(self) -> None:
self._steps: list[PipeStep] = []
def append(self, cmd: str, kind: str = "f") -> "Pipeline":
"""Add a filter step to the end of the pipeline. Returns self for chaining."""
self._steps.append(PipeStep(cmd=cmd, kind=kind))
return self
def prepend(self, cmd: str, kind: str = "f") -> "Pipeline":
"""Insert a filter step at the beginning. Returns self for chaining."""
self._steps.insert(0, PipeStep(cmd=cmd, kind=kind))
return self
def clone(self) -> "Pipeline":
"""Return a shallow copy of this pipeline."""
new = Pipeline()
new._steps = list(self._steps)
return new
def reset(self) -> "Pipeline":
"""Remove all steps."""
self._steps.clear()
return self
def run_bytes(self, input_data: bytes = b"") -> bytes:
"""
Run the pipeline, feeding input_data to stdin.
Returns stdout as bytes. Raises subprocess.CalledProcessError on failure.
Example:
out = Pipeline().append("gzip").run_bytes(b"hello world")
"""
if not _PIPES_AVAILABLE:
raise OSError("Pipelines require Unix (subprocess PIPE chain)")
steps = [s for s in self._steps if s.kind == "f"]
if not steps:
return input_data
# Build process chain
procs = []
prev_stdout = subprocess.PIPE
for i, step in enumerate(steps):
proc = subprocess.Popen(
step.cmd, shell=True,
stdin=subprocess.PIPE if i == 0 else procs[-1].stdout,
stdout=subprocess.PIPE,
stderr=None,
)
procs.append(proc)
# Write input to first process
stdout, _ = procs[-1].communicate()
procs[0].stdin.write(input_data)
procs[0].stdin.close()
# Wait for all processes
for proc in procs[:-1]:
proc.wait()
procs[-1].wait()
# Re-run with communicate for cleaner control
return _run_filter_chain(steps, input_data)
def run_text(self, text: str, encoding: str = "utf-8") -> str:
"""
Run the pipeline with text input. Returns text output.
Example:
result = Pipeline().append("tr a-z A-Z").run_text("hello")
# → "HELLO"
"""
raw = self.run_bytes(text.encode(encoding))
return raw.decode(encoding)
def run_file_to_file(self, src: str | Path, dst: str | Path) -> None:
"""
Run the pipeline reading from src and writing to dst.
Chains filter-kind steps between the input and output files.
Example:
Pipeline().append("gzip").run_file_to_file("data.txt", "data.txt.gz")
"""
src = Path(src)
dst = Path(dst)
steps = [s for s in self._steps if s.kind in ("f", "ff")]
if not steps:
import shutil
shutil.copy2(src, dst)
return
with src.open("rb") as fin, dst.open("wb") as fout:
input_data = fin.read()
result = _run_filter_chain(steps, input_data)
dst.write_bytes(result)
def __repr__(self) -> str:
return f"Pipeline({' | '.join(s.cmd for s in self._steps)})"
def _run_filter_chain(steps: list[PipeStep], input_data: bytes) -> bytes:
"""Run a chain of filter steps, threading bytes through subprocess.Popen."""
data = input_data
for step in steps:
result = subprocess.run(
step.cmd, shell=True,
input=data,
capture_output=True,
)
if result.returncode != 0:
raise subprocess.CalledProcessError(
result.returncode, step.cmd,
output=result.stdout, stderr=result.stderr
)
data = result.stdout
return data
# ─────────────────────────────────────────────────────────────────────────────
# 3. Wrapped pipes.Template (when available)
# ─────────────────────────────────────────────────────────────────────────────
def make_template():
"""
Return a pipes.Template object if available, else raise ImportError.
Use Pipeline() as first choice; this is for legacy compatibility.
Example:
t = make_template()
t.append("gzip -c $IN > $OUT", "ff")
"""
if not _PIPES_MODULE_AVAILABLE:
raise ImportError(
"pipes.Template not available (Python 3.13+ removed it). "
"Use Pipeline() instead."
)
return _pipes.Template()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Predefined useful pipelines
# ─────────────────────────────────────────────────────────────────────────────
def compress_pipeline(level: int = 6) -> Pipeline:
"""Returns a Pipeline that compresses bytes with gzip."""
return Pipeline().append(f"gzip -{level}")
def decompress_pipeline() -> Pipeline:
"""Returns a Pipeline that decompresses gzip bytes."""
return Pipeline().append("gunzip -c")
def word_count_pipeline() -> Pipeline:
"""Returns a Pipeline that counts words → output is 'lines words bytes\\n'."""
return Pipeline().append("wc")
def uppercase_pipeline() -> Pipeline:
"""Returns a Pipeline that converts text to uppercase."""
return Pipeline().append("tr a-z A-Z")
def line_count(text: str) -> int:
"""
Count lines in text using a shell wc -l pipeline.
Example:
n = line_count("hello\nworld\n")
"""
raw = Pipeline().append("wc -l").run_text(text)
return int(raw.strip())
def grep_filter(pattern: str, invert: bool = False) -> Pipeline:
"""
Returns a Pipeline that filters lines matching pattern.
Example:
out = grep_filter("ERROR").run_text(log_text)
"""
flag = "-v " if invert else ""
return Pipeline().append(f"grep {flag}{shell_quote(pattern)}")
def sort_unique_pipeline(reverse: bool = False) -> Pipeline:
"""Returns a Pipeline that sorts lines and removes duplicates."""
sort_flags = "-r" if reverse else ""
pl = Pipeline()
pl.append(f"sort {sort_flags}")
pl.append("uniq")
return pl
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== pipes demo ===")
if not _PIPES_AVAILABLE:
print(" pipes not available on Windows; "
"demonstrating shell_quote only:")
for s in ["/normal/path", "/path with spaces/file.txt",
"file; rm -rf /"]:
print(f" {s!r} → {shell_quote(s)!r}")
raise SystemExit(0)
# ── shell_quote ────────────────────────────────────────────────────────────
print("\n--- shell_quote ---")
for s in ["/normal/path", "/path with spaces/file.txt",
"file; rm -rf /", "it's a file"]:
print(f" {s!r} → {shell_quote(s)!r}")
# ── basic text pipeline ────────────────────────────────────────────────────
print("\n--- Pipeline: uppercase + remove blank lines ---")
text = "hello world\n\nfoo bar\n\nbaz\n"
pl = Pipeline().append("tr a-z A-Z").append("grep -v '^$'")
print(f" repr: {pl}")
result = pl.run_text(text)
print(f" input: {text!r}")
print(f" output: {result!r}")
# ── word count ─────────────────────────────────────────────────────────────
print("\n--- line_count ---")
sample = "line one\nline two\nline three\n"
print(f" '{sample.strip()[:20]}...' → {line_count(sample)} lines")
# ── grep filter ────────────────────────────────────────────────────────────
print("\n--- grep_filter ---")
log = ("INFO server started\nERROR disk full\n"
"INFO request ok\nERROR config missing\nINFO shutdown\n")
errors = grep_filter("ERROR").run_text(log)
print(f" ERROR lines:\n{errors.rstrip()}")
# ── sort_unique ────────────────────────────────────────────────────────────
print("\n--- sort_unique_pipeline ---")
words = "banana\napple\ncherry\napple\nbanana\norange\n"
unique = sort_unique_pipeline().run_text(words)
print(f" unique sorted: {unique.rstrip()!r}")
# ── clone and extend ───────────────────────────────────────────────────────
print("\n--- clone + prepend ---")
base = grep_filter("ERROR")
extended = base.clone().prepend("cat -n") # add line numbers
result2 = extended.run_text(log)
print(f" numbered errors:\n{result2.rstrip()}")
# ── gzip round-trip ───────────────────────────────────────────────────────
print("\n--- gzip round-trip ---")
original = b"hello from the pipes demo " * 20
compressed = compress_pipeline().run_bytes(original)
expanded = decompress_pipeline().run_bytes(compressed)
print(f" original: {len(original)}B "
f"compressed: {len(compressed)}B "
f"expanded: {len(expanded)}B "
f"match: {original == expanded}")
# ── pipes.Template (if available) ─────────────────────────────────────────
print("\n--- pipes.Template (legacy) ---")
if _PIPES_MODULE_AVAILABLE:
t = make_template()
t.append("tr a-z A-Z", "f")
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
tmp.write(b"hello pipes template\n")
tmp_path = tmp.name
try:
with t.open(tmp_path, "r") as f:
out = f.read()
print(f" pipes.Template output: {out!r}")
finally:
os.unlink(tmp_path)
else:
print(" pipes.Template not available (Python 3.13+) — using Pipeline()")
print("\n=== done ===")
For the subprocess alternative — subprocess.Popen(cmd, shell=True, stdin=prev.stdout, stdout=subprocess.PIPE) chains give you full control over process lifecycle, stdin/stdout/stderr routing, timeout, and error handling without the pipes.Template abstraction — use subprocess.Popen chains (or subprocess.run() for single-step commands) for all new code; the Pipeline class above wraps this pattern into the same fluid API that pipes.Template provided. For the shlex alternative — shlex.quote(s) (same implementation as pipes.quote()) and shlex.join(args) safely escape filenames and argument lists for shell commands — shlex.quote is the portable replacement for pipes.quote that remains available in Python 3.13+; use it whenever constructing shell command strings with user-supplied paths or values to prevent shell injection. The Claude Skills 360 bundle includes pipes skill sets covering shell_quote()/shell_join() safe quoting, PipeStep / Pipeline subprocess-based template replacement with run_bytes()/run_text()/run_file_to_file(), predefined compress_pipeline()/grep_filter()/sort_unique_pipeline() factories, line_count(), and make_template() legacy bridge. Start with the free tier to try shell pipeline patterns and pipes pipeline code generation.