Dask parallelizes Python data workflows with lazy evaluation and a flexible task graph. pip install dask[dataframe,distributed]. import dask.dataframe as dd, import dask.array as da, import dask.bag as db. DataFrame: ddf = dd.read_csv("data/*.csv"), ddf = dd.from_pandas(df, npartitions=4). Operations: ddf.groupby("col").agg({"val": "sum"}), ddf.merge(ddf2, on="key"), ddf.map_partitions(fn). Compute: result = ddf.compute() — triggers execution. Array: x = da.from_array(np_array, chunks=(1000, 1000)), da.blockwise(fn, "ij", x, "ij", y, "ij"). Array ops: standard NumPy — da.dot(A, B), da.linalg.svd(x), da.fft.rfft(x). Bag: b = db.from_sequence(records, npartitions=8), b.map(fn).filter(pred).fold(combine), b.to_dataframe(). Delayed: @dask.delayed, result = my_fn(a, b), dask.compute(r1, r2, r3) — run in parallel. Distributed: from dask.distributed import Client, LocalCluster, cluster = LocalCluster(n_workers=4, threads_per_worker=2), client = Client(cluster). Persist: ddf = client.persist(ddf) — keep partitions in worker memory. Futures: future = client.submit(fn, *args), result = future.result(), futures = client.map(fn, items). Progress: from dask.diagnostics import ProgressBar, with ProgressBar(): ddf.compute(). Visualize: ddf.visualize("graph.svg"). Config: dask.config.set({"dataframe.shuffle.method": "tasks", "distributed.worker.memory.target": 0.6}). Partitions: ddf.npartitions, ddf.repartition(npartitions=20). Read Parquet: dd.read_parquet("s3://bucket/path/*.parquet", columns=["a","b"]). Claude Code generates Dask ETL pipelines, distributed ML workflows, out-of-core data aggregations, and parallel feature engineering scripts.
CLAUDE.md for Dask
## Dask Stack
- Version: dask >= 2024.1, distributed >= 2024.1
- DataFrame: dd.read_csv/read_parquet | dd.from_pandas(df, npartitions=N)
- Array: da.from_array(arr, chunks=(...)) | da.blockwise | da.linalg/fft
- Bag: db.from_sequence(items) | .map/.filter/.fold/.to_dataframe()
- Delayed: @dask.delayed decorator | dask.compute(*delayed_list)
- Distributed: Client(LocalCluster(n_workers=4)) | client.persist/submit/map
- Trigger: .compute() locally | client.compute(ddf) for distributed
- Memory: repartition() to balance | client.persist() to pin in memory
Dask Parallel Computing Pipeline
# data/dask_pipeline.py — parallel and out-of-core computing with Dask
from __future__ import annotations
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Callable, Any
import dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
from dask.diagnostics import ProgressBar
# ── 1. Dask DataFrame ─────────────────────────────────────────────────────────
def load_csv_glob(
pattern: str,
dtype: dict = None,
parse_dates: list[str] = None,
npartitions: int = None,
) -> dd.DataFrame:
"""
Load many CSV files matching a glob pattern as a single Dask DataFrame.
pattern: e.g. "data/sales_*.csv" or "s3://bucket/path/*.csv"
"""
kwargs = {}
if dtype:
kwargs["dtype"] = dtype
if parse_dates:
kwargs["parse_dates"] = parse_dates
ddf = dd.read_csv(pattern, **kwargs)
if npartitions:
ddf = ddf.repartition(npartitions=npartitions)
return ddf
def load_parquet(
path: str,
columns: list[str] = None,
filters: list = None,
engine: str = "pyarrow",
) -> dd.DataFrame:
"""
Load Parquet files (local or S3) into a Dask DataFrame.
filters: pyarrow filter pushdown, e.g. [("year", "==", 2024)]
"""
return dd.read_parquet(
path, columns=columns, filters=filters, engine=engine
)
def aggregate_by_group(
ddf: dd.DataFrame,
group: str | list[str],
aggs: dict,
) -> pd.DataFrame:
"""
GroupBy aggregation on a Dask DataFrame.
aggs: {"col": "sum", "col2": ["mean", "count"]}
Computes and returns a Pandas DataFrame.
"""
result = ddf.groupby(group).agg(aggs)
with ProgressBar():
return result.compute()
def parallel_merge(
left: dd.DataFrame,
right: dd.DataFrame,
on: str | list[str],
how: str = "inner",
shuffle: str = "tasks",
) -> dd.DataFrame:
"""
Merge two Dask DataFrames.
shuffle="tasks" avoids disk I/O; use "disk" for very large joins.
"""
with dask.config.set({"dataframe.shuffle.method": shuffle}):
return left.merge(right, on=on, how=how)
def rolling_features(
ddf: dd.DataFrame,
value_col: str,
windows: list[int] = [7, 14, 30],
sort_col: str = None,
) -> dd.DataFrame:
"""
Add rolling mean and std features across partitions.
NOTE: Rolling across partition boundaries requires map_partitions
and assumes data is sorted within partitions.
"""
if sort_col:
ddf = ddf.sort_values(sort_col)
def _add_rolling(partition: pd.DataFrame) -> pd.DataFrame:
for w in windows:
partition[f"{value_col}_rma{w}"] = partition[value_col].rolling(w).mean()
partition[f"{value_col}_rstd{w}"] = partition[value_col].rolling(w).std()
return partition
return ddf.map_partitions(_add_rolling)
def write_parquet_partitioned(
ddf: dd.DataFrame,
output_dir: str,
partition_on: list[str] = None,
engine: str = "pyarrow",
compression: str = "snappy",
) -> None:
"""Write Dask DataFrame to Parquet with optional Hive partitioning."""
ddf.to_parquet(
output_dir,
engine=engine,
compression=compression,
partition_on=partition_on,
write_index=False,
overwrite=True,
)
print(f"Parquet written to {output_dir}")
# ── 2. Dask Array ─────────────────────────────────────────────────────────────
def chunked_svd(
X: np.ndarray,
n_components: int = 10,
chunk_size: int = 1000,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Truncated SVD on a large matrix using Dask arrays.
Useful when X doesn't fit in memory — processed chunk by chunk.
"""
X_da = da.from_array(X, chunks=(chunk_size, X.shape[1]))
U, s, Vt = da.linalg.svd_compressed(X_da, k=n_components, seed=42)
with ProgressBar():
U_np, s_np, Vt_np = dask.compute(U, s, Vt)
return U_np, s_np, Vt_np
def chunked_batch_norm(
X: np.ndarray,
chunk_size: int = 10_000,
) -> np.ndarray:
"""
Compute mean and std over a very large array in chunks, then normalize.
Memory-efficient: never loads the full array at once (lazy).
"""
X_da = da.from_array(X, chunks=(chunk_size, X.shape[1]))
mu = X_da.mean(axis=0)
sigma = X_da.std(axis=0)
X_norm = (X_da - mu) / (sigma + 1e-8)
with ProgressBar():
return X_norm.compute()
def parallel_fft(
signals: np.ndarray, # (N_signals, signal_length)
chunk_rows: int = 100,
) -> np.ndarray:
"""
Compute FFT for many signals in parallel using Dask array.
"""
signals_da = da.from_array(signals, chunks=(chunk_rows, signals.shape[1]))
spectra = da.fft.rfft(signals_da, axis=1)
power = da.abs(spectra) ** 2
with ProgressBar():
return power.compute()
# ── 3. Dask Bag ───────────────────────────────────────────────────────────────
def parallel_text_pipeline(
documents: list[str],
process_fn: Callable[[str], dict],
npartitions: int = 8,
filter_fn: Callable[[dict], bool] = None,
) -> list[dict]:
"""
Process a large list of documents in parallel via Dask Bag.
process_fn: maps a string → result dict
filter_fn: optional filter over results
Returns list of result dicts (calls .compute()).
"""
bag = db.from_sequence(documents, npartitions=npartitions)
bag = bag.map(process_fn)
if filter_fn:
bag = bag.filter(filter_fn)
with ProgressBar():
return bag.compute()
def bag_to_dataframe(
records: list[dict],
npartitions: int = 4,
) -> pd.DataFrame:
"""Convert list of dicts to Pandas DataFrame via Dask Bag."""
bag = db.from_sequence(records, npartitions=npartitions)
return bag.to_dataframe().compute()
# ── 4. dask.delayed for custom pipelines ──────────────────────────────────────
@dask.delayed
def _load_partition(path: str) -> pd.DataFrame:
"""Delayed single-file load."""
return pd.read_parquet(path)
@dask.delayed
def _transform_partition(df: pd.DataFrame, features: list[str]) -> pd.DataFrame:
"""Delayed feature selection and type cast."""
return df[features].astype(np.float32)
@dask.delayed
def _score_partition(df: pd.DataFrame, model) -> np.ndarray:
"""Delayed batch inference."""
return model.predict_proba(df.values)[:, 1]
def parallel_batch_inference(
file_paths: list[str],
features: list[str],
model,
) -> np.ndarray:
"""
Run model inference in parallel over many parquet files.
Uses dask.delayed to build a lazy task graph, then compute all at once.
"""
delayed_scores = []
for path in file_paths:
df = _load_partition(path)
df_feat = _transform_partition(df, features)
scores = _score_partition(df_feat, model)
delayed_scores.append(scores)
with ProgressBar():
results = dask.compute(*delayed_scores)
return np.concatenate(results)
def parallel_feature_engineering(
df: pd.DataFrame,
transforms: list[Callable[[pd.DataFrame], pd.Series]],
npartitions: int = 4,
) -> pd.DataFrame:
"""
Apply many independent feature transforms in parallel using dask.delayed.
Each transform receives the full DataFrame and returns a new column.
"""
ddf = dd.from_pandas(df, npartitions=npartitions)
delayed_cols = [
dask.delayed(fn)(partition)
for fn in transforms
for partition in ddf.partitions
]
@dask.delayed
def _combine(cols: list) -> pd.DataFrame:
return pd.concat(cols, axis=1)
with ProgressBar():
return dask.compute(_combine(delayed_cols))[0]
# ── 5. Distributed scheduler ──────────────────────────────────────────────────
def create_local_cluster(
n_workers: int = 4,
threads_per_worker: int = 2,
memory_limit: str = "4GB",
) -> tuple:
"""
Create a local Dask distributed cluster.
Returns (cluster, client) — call client.close() and cluster.close() when done.
dashboard: http://localhost:8787
"""
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
memory_limit=memory_limit,
)
client = Client(cluster)
print(f"Dashboard: {client.dashboard_link}")
return cluster, client
def distributed_groupby(
file_pattern: str,
group_col: str,
value_col: str,
client,
) -> pd.DataFrame:
"""
Read and aggregate a large CSV dataset on a distributed cluster.
client.persist() keeps partitions in worker RAM between operations.
"""
ddf = dd.read_csv(file_pattern)
ddf = client.persist(ddf) # Cache in cluster memory
result = ddf.groupby(group_col)[value_col].agg(["sum", "mean", "count"])
return client.compute(result).result()
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("Dask Parallel Computing Demo")
print("=" * 50)
# Create a large sample CSV
n = 500_000
df = pd.DataFrame({
"date": pd.date_range("2022-01-01", periods=n, freq="T"),
"region": np.random.choice(["north", "south", "east", "west"], n),
"product": np.random.choice(["A", "B", "C"], n),
"sales": np.random.exponential(100, n),
"units": np.random.randint(1, 50, n),
})
# Dask DataFrame
import tempfile, os
with tempfile.TemporaryDirectory() as tmpdir:
csv_path = os.path.join(tmpdir, "sales.csv")
df.to_csv(csv_path, index=False)
ddf = dd.read_csv(csv_path, parse_dates=["date"])
print(f"\nDask DataFrame: {ddf.npartitions} partitions, {len(df):,} rows")
# GroupBy
print("\nRegion × Product sales aggregation:")
agg_df = aggregate_by_group(
ddf, group=["region", "product"],
aggs={"sales": ["sum", "mean"], "units": "sum"}
)
print(agg_df.head())
# Dask Array: chunked SVD
print("\nChunked SVD on 5000×200 array:")
X = np.random.randn(5000, 200).astype(np.float32)
U, s, Vt = chunked_svd(X, n_components=10, chunk_size=1000)
print(f" U: {U.shape}, s: {s.shape}, Vt: {Vt.shape}")
print(f" Top-5 singular values: {s[:5].round(2)}")
# Dask delayed: parallel feature engineering
print("\nParallel feature engineering (dask.delayed):")
df_small = pd.DataFrame({
"a": np.random.randn(10000),
"b": np.random.exponential(1, 10000),
"c": np.random.randint(0, 5, 10000),
})
@dask.delayed
def log_transform(df): return np.log1p(df["b"])
@dask.delayed
def zscore(df): return (df["a"] - df["a"].mean()) / df["a"].std()
with ProgressBar():
log_b, z_a = dask.compute(log_transform(df_small), zscore(df_small))
print(f" log1p(b): mean={log_b.mean():.3f}, z(a): std={z_a.std():.3f}")
For the Pandas alternative for in-memory datasets — Pandas is simpler and faster for data that fits in RAM, while Dask’s lazy evaluation and multi-partition design enables processing DataFrames with billions of rows on a single machine without loading all data at once, and dd.read_parquet with filter pushdown reads only needed columns and row groups from S3 or GCS, making Dask the right choice when data is 10x larger than available RAM. For the Ray alternative for distributed ML — Ray Data and Ray Core handle heterogeneous tasks (mixing CPU and GPU operators) while Dask’s dask.delayed and dask.distributed scheduler produce human-readable task graph visualizations via ddf.visualize(), the LocalCluster setup launches in two lines without any infrastructure, and the Dask DataFrame API is identical to Pandas, making adoption nearly zero-friction for data engineers already fluent in pandas. The Claude Skills 360 bundle includes Dask skill sets covering parallel CSV and Parquet loading, groupby aggregation, distributed merge, rolling features via map_partitions, chunked SVD and batch normalization, Dask Bag text pipelines, dask.delayed custom task graphs, and LocalCluster distributed scheduler setup. Start with the free tier to try parallel data pipeline code generation.