tqdm wraps iterables to show progress bars. pip install tqdm. Basic: from tqdm import tqdm. for item in tqdm(items): process(item). Range: from tqdm import trange; for i in trange(100): .... Manual: with tqdm(total=1000) as pbar: pbar.update(10); pbar.set_postfix(loss=0.42). Description: tqdm(items, desc="Processing"). Unit: tqdm(items, unit="file"). Bytes: tqdm(items, unit="B", unit_scale=True, unit_divisor=1024). Nested: for epoch in trange(10, desc="Epochs"): for batch in tqdm(loader, desc="Batch", leave=False): .... leave=False — clear bar when done. Position: tqdm(items, position=0) for nested. Postfix: pbar.set_postfix({"loss": f"{loss:.4f}", "acc": f"{acc:.2%}"}). Prefix: pbar.set_description(f"Epoch {epoch}"). Write: tqdm.write(f"Error: {msg}") — prints above bar. Disable: tqdm(items, disable=not verbose). Performance: miniters=100, mininterval=0.5 — refresh at most every 0.5s or 100 iters. dynamic_miniters=True. Pandas: from tqdm import tqdm; tqdm.pandas(); df["col"].progress_apply(fn). df.groupby("key").progress_apply(fn). Notebook: from tqdm.notebook import tqdm — Jupyter widget. Auto: from tqdm.auto import tqdm — notebook if in Jupyter else CLI. Concurrent: wrap futures.as_completed(fs) with tqdm. asyncio: from tqdm.asyncio import tqdm as atqdm. async for item in atqdm(aiter): .... atqdm.gather(*coros). Bar format: bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]". Colour: tqdm(items, colour="green"). Initial: tqdm(items, initial=offset, total=total) — resume from checkpoint. Claude Code generates tqdm wrappers, nested progress patterns, and pandas progress_apply integration.
CLAUDE.md for tqdm
## tqdm Stack
- Version: tqdm >= 4.66 | pip install tqdm
- Basic: for item in tqdm(items, desc="Label", unit="file"): ...
- Range: trange(N, desc="Epochs") — shorthand for tqdm(range(N))
- Manual: with tqdm(total=N) as pbar: pbar.update(n); pbar.set_postfix(key=val)
- Pandas: tqdm.pandas(); df["col"].progress_apply(fn)
- Auto: from tqdm.auto import tqdm — CLI or Jupyter widget automatically
- Perf: miniters=100, mininterval=0.5 — limit render overhead in hot loops
tqdm Progress Bar Pipeline
# app/progress.py — tqdm patterns for CLI, data pipelines, and async
from __future__ import annotations
import asyncio
import math
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Callable, Iterable, Iterator, TypeVar
from tqdm import tqdm
from tqdm import trange
from tqdm.asyncio import tqdm as atqdm
T = TypeVar("T")
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic iterable wrapping
# ─────────────────────────────────────────────────────────────────────────────
def process_files(paths: list[Path]) -> list[dict]:
"""Wrap a list with tqdm — shows bar, ETA, and iteration rate."""
results = []
for path in tqdm(paths, desc="Processing files", unit="file"):
# Simulate work
time.sleep(0.005)
results.append({"path": str(path), "size": path.stat().st_size if path.exists() else 0})
return results
def process_batches(records: list[dict], batch_size: int = 64) -> list[dict]:
"""Progress over batches — show batch count and postfix metrics."""
batches = [records[i:i + batch_size] for i in range(0, len(records), batch_size)]
results: list[dict] = []
with tqdm(total=len(records), desc="Records", unit="rec") as pbar:
for batch in batches:
# Simulate batch processing
time.sleep(0.002 * len(batch))
batch_results = [{"id": r.get("id"), "processed": True} for r in batch]
results.extend(batch_results)
pbar.update(len(batch))
pbar.set_postfix(batches_done=len(results) // batch_size)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 2. Training loop — epoch + batch nested bars
# ─────────────────────────────────────────────────────────────────────────────
def simulate_training(
epochs: int = 5,
batches_per_epoch: int = 20,
verbose: bool = True,
) -> list[dict]:
"""
Two-level nested bars: outer (epochs) + inner (batches).
leave=False on the inner bar clears it when epoch completes.
"""
history: list[dict] = []
for epoch in trange(epochs, desc="Epochs", disable=not verbose):
epoch_loss = 0.0
epoch_acc = 0.0
with tqdm(
total=batches_per_epoch,
desc=f" Epoch {epoch + 1:02d}",
leave=False,
unit="batch",
disable=not verbose,
) as batch_bar:
for batch_idx in range(batches_per_epoch):
# Simulated loss/accuracy
loss = 1.0 / (epoch * batches_per_epoch + batch_idx + 1) + 0.01
acc = 1.0 - loss
epoch_loss += loss
epoch_acc += acc
batch_bar.update(1)
batch_bar.set_postfix(loss=f"{loss:.4f}", acc=f"{acc:.2%}")
time.sleep(0.002)
avg_loss = epoch_loss / batches_per_epoch
avg_acc = epoch_acc / batches_per_epoch
history.append({"epoch": epoch + 1, "loss": round(avg_loss, 4), "acc": round(avg_acc, 4)})
if verbose:
tqdm.write(f"Epoch {epoch + 1}: loss={avg_loss:.4f} acc={avg_acc:.2%}")
return history
# ─────────────────────────────────────────────────────────────────────────────
# 3. File download / byte-level progress
# ─────────────────────────────────────────────────────────────────────────────
def fake_download(url: str, chunk_size: int = 8192) -> bytes:
"""
Simulate a chunked HTTP download with byte-level progress.
unit="B", unit_scale=True, unit_divisor=1024 → shows KB/MB/GB.
"""
total_bytes = 1_024 * 512 # pretend 512 KB file
received = 0
chunks: list[bytes] = []
with tqdm(
total=total_bytes,
desc=f"Downloading {url.split('/')[-1]}",
unit="B",
unit_scale=True,
unit_divisor=1024,
miniters=1,
) as pbar:
while received < total_bytes:
chunk = bytes(min(chunk_size, total_bytes - received))
chunks.append(chunk)
n = len(chunk)
received += n
pbar.update(n)
time.sleep(0.001)
return b"".join(chunks)
# ─────────────────────────────────────────────────────────────────────────────
# 4. tqdm.write — log messages without breaking the bar
# ─────────────────────────────────────────────────────────────────────────────
def process_with_logging(items: list[str]) -> None:
"""
tqdm.write() prints above the progress bar — unlike print() which
overwrites it. Use this for warnings/errors discovered mid-loop.
"""
for item in tqdm(items, desc="Validating"):
time.sleep(0.01)
if item.startswith("bad_"):
# This prints cleanly above the bar
tqdm.write(f"[WARN] Skipping invalid item: {item!r}")
# ─────────────────────────────────────────────────────────────────────────────
# 5. Resumable progress — initial offset
# ─────────────────────────────────────────────────────────────────────────────
def process_with_resume(
items: list[Any],
checkpoint: int = 0,
) -> list[Any]:
"""
Start the bar at `checkpoint` to reflect already-processed items.
Useful when resuming a long-running pipeline after a crash.
"""
results: list[Any] = []
with tqdm(
total=len(items),
initial=checkpoint,
desc="Resumable pipeline",
unit="item",
) as pbar:
for item in items[checkpoint:]:
time.sleep(0.005)
results.append(item)
pbar.update(1)
return results
# ─────────────────────────────────────────────────────────────────────────────
# 6. concurrent.futures — wrap as_completed
# ─────────────────────────────────────────────────────────────────────────────
def _fetch(url: str) -> dict:
"""Simulate fetching a URL."""
time.sleep(0.05)
return {"url": url, "status": 200}
def parallel_fetch(urls: list[str], max_workers: int = 8) -> list[dict]:
"""
Wrap concurrent.futures.as_completed with tqdm.
Bar advances as each future completes, regardless of submission order.
"""
results: list[dict] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(_fetch, url): url for url in urls}
for future in tqdm(
as_completed(futures),
total=len(futures),
desc="Fetching URLs",
unit="req",
):
try:
results.append(future.result())
except Exception as exc:
tqdm.write(f"[ERROR] {futures[future]}: {exc}")
return results
# ─────────────────────────────────────────────────────────────────────────────
# 7. asyncio — tqdm.asyncio
# ─────────────────────────────────────────────────────────────────────────────
async def _async_fetch(url: str) -> dict:
await asyncio.sleep(0.02)
return {"url": url, "status": 200}
async def async_parallel_fetch(urls: list[str]) -> list[dict]:
"""
atqdm.gather wraps asyncio.gather with a progress bar.
Each coroutine completion advances the bar.
"""
coros = [_async_fetch(url) for url in urls]
results = await atqdm.gather(*coros, desc="Async fetch", unit="req")
return list(results)
async def async_stream_process(items: list[str]) -> list[str]:
"""Async for-loop with tqdm.asyncio — works with async iterators."""
async def _gen():
for item in items:
await asyncio.sleep(0.01)
yield item
results = []
async for item in atqdm(_gen(), total=len(items), desc="Stream", unit="event"):
results.append(item.upper())
return results
# ─────────────────────────────────────────────────────────────────────────────
# 8. Pandas progress_apply
# ─────────────────────────────────────────────────────────────────────────────
def pandas_progress_demo() -> None:
"""
tqdm.pandas() patches DataFrame/Series with .progress_apply()
so apply() shows a progress bar for long-running row-wise operations.
Must call tqdm.pandas() once before using progress_apply.
"""
try:
import pandas as pd
tqdm.pandas(desc="Transforming rows")
df = pd.DataFrame({
"name": [f"user_{i}" for i in range(200)],
"score": range(200),
})
# Drop-in replacement for .apply()
df["name_upper"] = df["name"].progress_apply(str.upper)
df["score_log"] = df["score"].progress_apply(lambda x: round(math.log1p(x), 4))
print(df.head(3))
except ImportError:
print("pandas not installed — skipping pandas_progress_demo")
# ─────────────────────────────────────────────────────────────────────────────
# 9. High-throughput loop — miniters / mininterval
# ─────────────────────────────────────────────────────────────────────────────
def high_throughput_loop(n: int = 1_000_000) -> int:
"""
For tight loops that run millions of iterations, rendering every iteration
is slower than the work itself. miniters=10_000 renders at most every
10_000 iterations; mininterval=0.5 adds a time-gate as well.
"""
total = 0
for i in tqdm(
range(n),
desc="Summing",
unit="iter",
miniters=10_000,
mininterval=0.5,
dynamic_miniters=True,
):
total += i
return total
# ─────────────────────────────────────────────────────────────────────────────
# 10. Custom bar format
# ─────────────────────────────────────────────────────────────────────────────
_BAR_FMT = "{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
def styled_bar(items: list[Any], desc: str = "Working") -> list[Any]:
"""Custom bar_format removes the default prefix clutter."""
results = []
for item in tqdm(items, desc=desc, bar_format=_BAR_FMT, colour="cyan"):
time.sleep(0.01)
results.append(item)
return results
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Basic file processing ===")
fake_paths = [Path(f"/tmp/file_{i}.txt") for i in range(20)]
process_files(fake_paths)
print("\n=== Training loop ===")
history = simulate_training(epochs=3, batches_per_epoch=10)
for h in history:
print(f" epoch={h['epoch']} loss={h['loss']} acc={h['acc']}")
print("\n=== Byte download ===")
fake_download("https://example.com/dataset.csv")
print("\n=== Logging mid-loop ===")
items = ["item_1", "bad_item", "item_2", "bad_two", "item_3"]
process_with_logging(items)
print("\n=== concurrent.futures ===")
urls = [f"https://api.example.com/users/{i}" for i in range(12)]
results = parallel_fetch(urls, max_workers=4)
print(f" fetched {len(results)} URLs")
print("\n=== asyncio ===")
async_results = asyncio.run(async_parallel_fetch(urls[:8]))
print(f" async fetched {len(async_results)} URLs")
print("\n=== High-throughput ===")
total = high_throughput_loop(200_000)
print(f" sum={total}")
print("\n=== Pandas ===")
pandas_progress_demo()
For the print(f"{i}/{total}") alternative — manual print-based progress clutters stdout, wraps at terminal width, and requires manual ETA calculation, while tqdm(items) computes elapsed time, ETA, and iteration rate automatically, updates in-place on a single terminal line, and exposes set_postfix() for live metrics without any print statements interfering with the output. For the alive-progress alternative — alive-progress offers animated spinners and richer visual styles than tqdm, while tqdm is the de-facto standard with native integration into pandas (.progress_apply()), asyncio (atqdm.gather), and Jupyter notebooks (from tqdm.notebook import tqdm), a disable flag for zero-overhead production builds, and miniters/mininterval tuning for hot loops where render overhead matters. The Claude Skills 360 bundle includes tqdm skill sets covering basic iterable wrapping with desc/unit/colour, trange for range loops, manual update with set_postfix for live metrics, nested bars with leave=False, tqdm.write for mid-loop logging, byte-level progress with unit_scale and unit_divisor, resumable pipelines with initial offset, concurrent.futures as_completed wrapping, tqdm.asyncio gather and async for-loop, pandas progress_apply, miniters/mininterval for high-throughput loops, and custom bar_format. Start with the free tier to try progress bar code generation.