joblib parallelizes loops and caches expensive function calls. pip install joblib. Parallel: from joblib import Parallel, delayed. results = Parallel(n_jobs=-1)(delayed(fn)(x) for x in items). n_jobs=-1 = all CPUs. n_jobs=4 = 4 workers. n_jobs=-2 = all CPUs minus 1. Backend: Parallel(n_jobs=4, backend="loky") — default, process-based. backend="threading" — for GIL-releasing code (numpy, IO). backend="multiprocessing". Verbose: Parallel(n_jobs=4, verbose=10) — progress. Batch: Parallel(n_jobs=4, batch_size=10). Memory: from joblib import Memory; mem = Memory("/tmp/cache", verbose=0). @mem.cache def fn(x): return x**2. fn(5) — first call runs. fn(5) — second call returns cached. Ignore params: @mem.cache(ignore=["logger"]). fn.clear() — clear this function’s cache. mem.clear() — clear all. mem.reduce_size() — evict old results. Dump/load: from joblib import dump, load; dump(model, "model.pkl"). model = load("model.pkl"). Array: dump uses numpy memmap for large arrays — much faster than pickle. dump(array, "arr.pkl", compress=3). Compression: 0=none, 1-9=zlib. Claude Code generates joblib Parallel loops, Memory caching decorators, and parallel pipeline patterns.
CLAUDE.md for joblib
## joblib Stack
- Version: joblib >= 1.4 | pip install joblib
- Parallel: Parallel(n_jobs=-1)(delayed(fn)(x) for x in items)
- Backend: loky (default, processes) | threading (IO/numpy) | multiprocessing
- Memory: Memory("/tmp/cache") | @mem.cache | @mem.cache(ignore=["verbose"])
- Clear: fn.clear() (one function) | mem.clear() (all) | mem.reduce_size()
- Dump: dump(obj, "file.pkl") | load("file.pkl") — faster than pickle for arrays
- Progress: Parallel(n_jobs=4, verbose=10) — prints per-job status
joblib Parallel Computing Pipeline
# app/parallel.py — joblib Parallel, Memory caching, and numpy serialization
from __future__ import annotations
import logging
import math
import os
import time
from pathlib import Path
from typing import Any, Callable
from joblib import Memory, Parallel, delayed, dump, load
log = logging.getLogger(__name__)
CACHE_DIR = Path(os.environ.get("JOBLIB_CACHE", "/tmp/joblib_cache"))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
memory = Memory(str(CACHE_DIR), verbose=0)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Parallel — CPU-bound work
# ─────────────────────────────────────────────────────────────────────────────
def _expensive_computation(x: float, power: float = 2.5) -> float:
"""Simulates a CPU-bound task."""
time.sleep(0.01)
return sum(math.pow(x, i) for i in range(1, int(power * 10)))
def parallel_map(items: list[float], n_jobs: int = -1) -> list[float]:
"""
Parallel(n_jobs=-1) uses all available CPUs via the loky backend.
delayed() wraps the function call — Parallel dispatches them across workers.
Returns results in the same order as input.
"""
return Parallel(n_jobs=n_jobs)(
delayed(_expensive_computation)(x) for x in items
)
def parallel_with_args(records: list[dict], n_jobs: int = -1) -> list[dict]:
"""Multiple arguments — pass via delayed."""
def process(record: dict, multiplier: float = 1.0) -> dict:
time.sleep(0.005)
return {**record, "processed": True, "value": record.get("value", 0) * multiplier}
return Parallel(n_jobs=n_jobs)(
delayed(process)(record, multiplier=1.5) for record in records
)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Threading backend — IO-bound or numpy operations
# ─────────────────────────────────────────────────────────────────────────────
def _fetch_url(url: str) -> dict:
"""Simulated IO-bound fetch — releases the GIL."""
time.sleep(0.05)
return {"url": url, "status": 200, "size": 1024}
def parallel_fetch(urls: list[str], n_jobs: int = -1) -> list[dict]:
"""
backend="threading" avoids process spawn overhead for IO-bound tasks.
Also correct for functions that call numpy/scipy (GIL-releasing C extensions).
"""
return Parallel(n_jobs=n_jobs, backend="threading")(
delayed(_fetch_url)(url) for url in urls
)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Verbose progress reporting
# ─────────────────────────────────────────────────────────────────────────────
def parallel_with_progress(items: list[Any], fn: Callable, n_jobs: int = 4) -> list:
"""
verbose=10 prints a line per completed job — useful for long-running batches.
verbose=50 prints per-job detail. verbose=0 (default) = silent.
"""
return Parallel(n_jobs=n_jobs, verbose=5)(
delayed(fn)(item) for item in items
)
# ─────────────────────────────────────────────────────────────────────────────
# 4. @memory.cache — transparent disk memoization
# ─────────────────────────────────────────────────────────────────────────────
@memory.cache
def load_and_parse_dataset(path: str) -> list[dict]:
"""
First call: runs the function, writes result to disk.
Subsequent calls with same `path` arg: loads from disk instantly.
Survives process restarts — cache persists across runs.
"""
log.info("loading_dataset", extra={"path": path})
time.sleep(0.2) # simulate slow file parsing
return [{"row": i, "value": i * 1.5, "path": path} for i in range(100)]
@memory.cache
def train_model(dataset_path: str, n_estimators: int = 100, max_depth: int = 5) -> dict:
"""
Cache key is derived from all arguments — different hyperparameters
produce different cache entries, enabling cheap grid search replay.
"""
log.info("training_model", extra={"n_estimators": n_estimators, "max_depth": max_depth})
time.sleep(0.5) # simulate training
return {
"model_type": "random_forest",
"n_estimators": n_estimators,
"max_depth": max_depth,
"accuracy": 0.923,
"trained_at": time.time(),
}
# ─────────────────────────────────────────────────────────────────────────────
# 5. ignore= — exclude volatile parameters from cache key
# ─────────────────────────────────────────────────────────────────────────────
@memory.cache(ignore=["logger", "verbose"])
def feature_extraction(
records: list[dict],
feature_names: list[str],
logger=None, # excluded from cache key
verbose: bool = False, # excluded from cache key
) -> list[dict]:
"""
logger and verbose change between calls but don't affect the result.
ignore=["logger","verbose"] ensures the cache key only uses records + features.
"""
if verbose:
print(f"Extracting {len(feature_names)} features from {len(records)} records")
time.sleep(0.1)
return [
{feat: record.get(feat, 0) for feat in feature_names}
for record in records
]
# ─────────────────────────────────────────────────────────────────────────────
# 6. Cache management
# ─────────────────────────────────────────────────────────────────────────────
def cache_management_demo() -> None:
# Call once to populate
load_and_parse_dataset("/data/sample.csv")
train_model("/data/sample.csv", n_estimators=50)
# Clear a single function's cache
load_and_parse_dataset.clear()
print("Cleared load_and_parse_dataset cache")
# Check call count via call_and_shelve
result_ref = train_model.call_and_shelve("/data/sample.csv", n_estimators=100)
print(f"Shelved result ID: {result_ref.store_backend_options}")
# Reduce cache size — evict oldest entries
memory.reduce_size()
# Clear everything
# memory.clear(warn=False)
# ─────────────────────────────────────────────────────────────────────────────
# 7. dump / load — fast numpy/array serialization
# ─────────────────────────────────────────────────────────────────────────────
def fast_array_io_demo(output_dir: Path) -> None:
"""
joblib.dump uses numpy memmap for arrays — 10-100× faster than pickle
for large numeric arrays. Use for model weights, feature matrices, etc.
"""
try:
import numpy as np
array = np.random.rand(1000, 100) # 800 KB float64 array
path_no_compress = output_dir / "array.pkl"
path_compressed = output_dir / "array_z3.pkl"
dump(array, path_no_compress)
dump(array, path_compressed, compress=3)
loaded = load(path_no_compress)
assert loaded.shape == array.shape
sizes = {
"uncompressed": path_no_compress.stat().st_size,
"compressed": path_compressed.stat().st_size,
}
print(f"Array serialization: {sizes}")
except ImportError:
print("numpy not installed — skipping array demo")
# ─────────────────────────────────────────────────────────────────────────────
# 8. Parallel + Memory — parallel training with result caching
# ─────────────────────────────────────────────────────────────────────────────
def grid_search_parallel(
dataset_path: str,
param_grid: list[dict],
n_jobs: int = -1,
) -> list[dict]:
"""
Run hyperparameter grid search in parallel.
Each call to train_model is cached — re-running the grid search
after adding new param combinations only trains the new ones.
"""
return Parallel(n_jobs=n_jobs, backend="loky")(
delayed(train_model)(dataset_path, **params)
for params in param_grid
)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Parallel CPU-bound ===")
items = list(range(20))
start = time.perf_counter()
results = parallel_map(items, n_jobs=4)
elapsed = time.perf_counter() - start
print(f" {len(results)} results in {elapsed:.2f}s (4 workers)")
print("\n=== Parallel IO-bound (threading) ===")
urls = [f"https://api.example.com/data/{i}" for i in range(12)]
fetched = parallel_fetch(urls, n_jobs=6)
print(f" fetched {len(fetched)} URLs")
print("\n=== Memory cache (first call) ===")
t0 = time.perf_counter()
data = load_and_parse_dataset("/data/demo.csv")
first = time.perf_counter() - t0
print("\n=== Memory cache (second call — from disk) ===")
t0 = time.perf_counter()
data2 = load_and_parse_dataset("/data/demo.csv")
second = time.perf_counter() - t0
print(f" First: {first:.3f}s Cached: {second:.4f}s ({first/max(second,1e-9):.0f}× speedup)")
print("\n=== Grid search (parallel + cached) ===")
grid = [
{"n_estimators": 50, "max_depth": 3},
{"n_estimators": 100, "max_depth": 5},
{"n_estimators": 200, "max_depth": 7},
]
models = grid_search_parallel("/data/demo.csv", grid, n_jobs=3)
best = max(models, key=lambda m: m["accuracy"])
print(f" Best: n_estimators={best['n_estimators']} acc={best['accuracy']:.3f}")
print("\n=== Array serialization ===")
fast_array_io_demo(CACHE_DIR)
For the multiprocessing.Pool.map alternative — Pool.map(fn, items) requires the pool to be created and closed manually, does not cache results between runs, and serializes errors as strings that lose their traceback, while Parallel(n_jobs=-1)(delayed(fn)(x) for x in items) handles pool lifecycle automatically, propagates full exceptions with tracebacks, supports backend="threading" for IO-bound work without changing the call site, and composes directly with @memory.cache so the same function gains both parallelism and result caching with one decorator. For the functools.lru_cache + pickle alternative — @lru_cache is in-process and lost on restart, requiring manual serialization with pickle.dump/load and cache invalidation, while @memory.cache transparently hashes the function name, source code hash, and all arguments to build a unique cache key, writes the result to SQLite or a directory of pickled files, and on next run looks up that key before calling the function — the cache is also transparent to scikit-learn Pipeline objects which use joblib internally. The Claude Skills 360 bundle includes joblib skill sets covering Parallel with n_jobs=-1, loky/threading/multiprocessing backends, delayed wrapping for multi-argument functions, verbose progress reporting, Memory persistent disk caching, @memory.cache with ignore list, fn.clear and memory.reduce_size cache management, call_and_shelve for deferred result retrieval, dump/load with numpy compression levels, and parallel grid search with memory caching. Start with the free tier to try parallel computing and caching code generation.