Metaflow is Netflix’s Python ML framework for scalable data science workflows. pip install metaflow. A flow is a Python class extending FlowSpec: class ChurnFlow(FlowSpec):. Steps are methods decorated with @step. self.next(self.next_step) defines transitions. self.artifact = value persists data between steps automatically. python flow.py run runs locally. python flow.py run --with batch runs each step on AWS Batch. @batch(cpu=4, memory=16000, image="python:3.11") pins a step to Batch. @kubernetes(cpu=2, memory=8192) runs on Kubernetes. @resources(cpu=4, memory=16000) sets resource hints for any orchestrator. @conda(libraries={"scikit-learn": ">=1.2", "pandas": ">=2.0"}) isolates step dependencies. @retry(times=3, minutes_between_retries=1) adds fault tolerance. @timeout(minutes=30) sets a deadline. @catch(var="error") catches exceptions and continues. @parallel fans out steps using foreach — self.next(self.train, foreach="param_combinations"). self.input accesses the current foreach value. @card(type="default") generates HTML step visualizations. @card(type="markdown") for custom cards with current.card.append(Markdown("# Results")). Artifact access: from metaflow import Flow; run = Flow("ChurnFlow").latest_run; run["train"].task.data.model. namespace("production") scopes artifact access. @project(name="churn") enables branch-based namespacing. Claude Code generates Metaflow FlowSpecs, @batch step configs, foreach sweeps, card visualizations, and artifact access patterns.
CLAUDE.md for Metaflow
## Metaflow Stack
- Version: metaflow >= 2.11
- Flow: class MyFlow(FlowSpec): — steps are @step-decorated methods
- Transitions: self.next(self.step_name) or self.next(self.step, foreach="list_attr")
- Artifacts: self.attr = value — auto-persisted, typed, versioned
- Local: python flow.py run
- Cloud: python flow.py run --with batch (AWS) or --with kubernetes
- Per-step: @batch(cpu, memory, image) @conda(libraries) @retry @timeout
- Cards: @card → current.card.append(Image/Markdown/Table/Component)
- Access: Flow("MyFlow").latest_run["step"].task.data.artifact
ChurnFlow
# flows/churn_flow.py — complete Metaflow ML pipeline
from __future__ import annotations
import json
from metaflow import (
FlowSpec,
Parameter,
batch,
card,
catch,
conda_base,
current,
project,
resources,
retry,
step,
timeout,
)
from metaflow.cards import Markdown, Table, Image
@project(name="churn")
@conda_base(python="3.11", libraries={
"scikit-learn": ">=1.2",
"pandas": ">=2.0",
"numpy": ">=1.26",
})
class ChurnFlow(FlowSpec):
"""
End-to-end churn prediction pipeline:
start → load_data → preprocess → train[foreach] → select_best → evaluate → end
"""
# ── Parameters ───────────────────────────────────────────────────────────
data_path = Parameter("data_path", default="data/customers.csv")
test_size = Parameter("test_size", default=0.2)
n_trials = Parameter("n_trials", default=5, help="Number of hyperparameter trials")
min_auc = Parameter("min_auc", default=0.75, help="Minimum AUC to promote model")
# ── Start ────────────────────────────────────────────────────────────────
@card(type="default")
@step
def start(self):
"""Initialize the flow and prepare parameter grid."""
import itertools
# Hyperparameter grid for foreach sweep
lrs = [0.01, 0.05, 0.1]
depths = [3, 4, 5]
all_params = [
{"lr": lr, "depth": d, "n_estimators": 200}
for lr, d in itertools.product(lrs, depths)
]
# Subset to n_trials
self.param_grid = all_params[: self.n_trials]
current.card.append(Markdown(f"# ChurnFlow\n\nRunning {len(self.param_grid)} trials."))
current.card.append(Table(
[["lr", "depth", "n_estimators"]] +
[[str(p["lr"]), str(p["depth"]), str(p["n_estimators"])] for p in self.param_grid]
))
self.next(self.load_data)
# ── Load data ────────────────────────────────────────────────────────────
@retry(times=3, minutes_between_retries=1)
@timeout(minutes=10)
@step
def load_data(self):
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(self.data_path)
feature_cols = ["age", "tenure_days", "monthly_spend", "support_tickets", "last_login_days"]
target_col = "churned"
X = df[feature_cols].values
y = df[target_col].values
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y, test_size=self.test_size, stratify=y, random_state=42
)
self.feature_names = feature_cols
self.n_samples = len(df)
print(f"Loaded {self.n_samples} samples. Train: {len(self.X_train)}, Test: {len(self.X_test)}")
self.next(self.train, foreach="param_grid")
# ── Train (parallel foreach) ──────────────────────────────────────────────
@catch(var="train_error", print_exception=True)
@batch(cpu=2, memory=8192)
@card(type="default")
@step
def train(self):
"""Train one model config — runs in parallel per param combination."""
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import numpy as np
params = self.input # Current foreach item
self.params = params
self.train_error = None # Reset catch variable
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", GradientBoostingClassifier(
n_estimators=params["n_estimators"],
learning_rate=params["lr"],
max_depth=params["depth"],
random_state=42,
)),
])
pipeline.fit(self.X_train, self.y_train)
cv_auc = float(np.mean(
cross_val_score(pipeline, self.X_train, self.y_train, cv=5, scoring="roc_auc", n_jobs=-1)
))
self.model = pipeline
self.cv_auc = cv_auc
current.card.append(Markdown(f"## Trial\n\n**Params:** {params}\n\n**CV AUC:** {cv_auc:.4f}"))
print(f"Trial {params} → CV AUC: {cv_auc:.4f}")
self.next(self.select_best)
# ── Select best model ─────────────────────────────────────────────────────
@card(type="default")
@step
def select_best(self, inputs):
"""Join foreach branches — pick best model by CV AUC."""
import numpy as np
# Filter out failed trials
valid = [inp for inp in inputs if inp.train_error is None]
if not valid:
raise ValueError("All training trials failed!")
best = max(valid, key=lambda inp: inp.cv_auc)
self.model = best.model
self.best_params = best.params
self.best_cv_auc = best.cv_auc
# Propagate shared data
self.merge_artifacts(inputs, include=["X_test", "y_test", "feature_names"])
rows = [[str(inp.params["lr"]), str(inp.params["depth"]), f"{inp.cv_auc:.4f}"]
for inp in valid]
rows.sort(key=lambda r: r[-1], reverse=True)
current.card.append(Markdown(f"# Best Model\n\n**CV AUC:** {self.best_cv_auc:.4f}"))
current.card.append(Table([["lr", "depth", "cv_auc"]] + rows))
self.next(self.evaluate)
# ── Evaluate ──────────────────────────────────────────────────────────────
@card(type="default")
@step
def evaluate(self):
"""Evaluate best model on hold-out test set."""
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import (
RocCurveDisplay,
classification_report,
roc_auc_score,
)
y_proba = self.model.predict_proba(self.X_test)[:, 1]
y_pred = self.model.predict(self.X_test)
self.test_auc = float(roc_auc_score(self.y_test, y_proba))
self.is_promoted = self.test_auc >= self.min_auc
print(f"\nTest AUC: {self.test_auc:.4f}")
print(classification_report(self.y_test, y_pred, target_names=["no_churn", "churn"]))
# Add ROC curve to card
fig, ax = plt.subplots(figsize=(6, 5))
RocCurveDisplay.from_predictions(self.y_test, y_proba, ax=ax, name="Best Model")
ax.set_title(f"ROC Curve (AUC={self.test_auc:.4f})")
current.card.append(Image.from_matplotlib(fig))
plt.close(fig)
self.next(self.end)
# ── End ───────────────────────────────────────────────────────────────────
@step
def end(self):
"""Summarize and optionally promote the model."""
status = "PROMOTED" if self.is_promoted else "NOT PROMOTED"
print(f"\n{'='*40}")
print(f"Flow complete. Model {status}.")
print(f" Best params: {self.best_params}")
print(f" CV AUC: {self.best_cv_auc:.4f}")
print(f" Test AUC: {self.test_auc:.4f}")
print(f" Threshold: {self.min_auc}")
if __name__ == "__main__":
ChurnFlow()
Artifact Access Script
# scripts/load_model.py — access Metaflow artifacts from a past run
from metaflow import Flow, namespace
def load_latest_model(project: str = "churn"):
"""Load the most recent promoted model."""
namespace(f"project:{project}/main")
flow = Flow("ChurnFlow")
for run in flow.runs():
if run.successful and run.data.is_promoted:
print(f"Found promoted run: {run.id}")
print(f" Test AUC: {run.data.test_auc:.4f}")
print(f" Best params: {run.data.best_params}")
return run.data.model
raise ValueError("No promoted run found")
def compare_runs(n: int = 5):
"""Compare last N successful runs."""
flow = Flow("ChurnFlow")
runs = [r for r in flow.runs() if r.successful][:n]
print(f"{'Run ID':<20} {'Test AUC':<12} {'CV AUC':<12} {'Promoted'}")
print("-" * 60)
for run in runs:
print(
f"{run.id:<20} "
f"{run.data.test_auc:.4f} "
f"{run.data.best_cv_auc:.4f} "
f"{'YES' if run.data.is_promoted else 'no'}"
)
if __name__ == "__main__":
compare_runs()
model = load_latest_model()
print(f"\nLoaded model: {type(model).__name__}")
For the ZenML alternative when needing a stack-portable pipeline framework that runs the same pipeline YAML on local/Kubeflow/SageMaker/Vertex without code changes and provides a Model Control Plane for model versioning — ZenML’s stack abstraction gives more portability while Metaflow’s @batch and @kubernetes decorators are more Pythonic for data scientists, with better local→cloud parity and Netflix-proven scaling patterns for large data science teams. For the Prefect alternative when wanting a general-purpose workflow orchestrator with a rich UI, dynamic task mapping, and cloud-hosted task execution without any AWS account setup — Prefect is infrastructure-agnostic while Metaflow is specifically designed around AWS infrastructure (S3 for artifacts, Batch for compute) and provides the best developer experience for teams fully committed to the AWS ecosystem. The Claude Skills 360 bundle includes Metaflow skill sets covering FlowSpec definitions, @batch and @kubernetes step configs, foreach sweeps, card visualizations, and artifact access. Start with the free tier to try AWS ML workflow generation.