MLflow tracks machine learning experiments, packages models for reproducibility, and manages the model lifecycle from development to production. mlflow.autolog() captures parameters, metrics, and model artifacts automatically for scikit-learn, PyTorch, and XGBoost. The model registry stores versioned models with staging/production state transitions. MLflow Projects define reproducible training environments with MLproject files. MLflow Serving exposes registered models as REST endpoints. Claude Code generates MLflow tracking code, custom metric logging, registry operations, and the CI/CD integration that gates production promotions on evaluation results.
CLAUDE.md for MLflow Projects
## MLflow Stack
- Version: mlflow >= 2.14
- Tracking: MLflow Tracking Server (PostgreSQL backend + S3/GCS artifact store)
- Registry: MLflow Model Registry (promotes: Staging → Production → Archived)
- Autolog: enabled for sklearn, torch, xgboost — disable for custom training loops
- Artifacts: model, feature importance plots, confusion matrix, sample predictions
- Metrics: log every epoch for deep learning; log final for XGBoost/sklearn
- Tags: track git commit, PR, dataset version, author
- Remote: MLFLOW_TRACKING_URI env var (never hardcode server URL)
Experiment Tracking with Autolog
# training/train_classifier.py — sklearn with autolog
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
roc_auc_score, f1_score, precision_recall_curve, average_precision_score,
ConfusionMatrixDisplay, classification_report,
)
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import subprocess
import os
# Configure tracking server
mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
mlflow.set_experiment("order-churn-prediction")
# Enable autolog — captures params, metrics, model, feature importance
mlflow.sklearn.autolog(
log_input_examples=True,
log_model_signatures=True,
max_tuning_runs=5,
)
def train(data_path: str, n_estimators: int = 200, max_depth: int = 5):
"""Train churn prediction model with full MLflow tracking."""
# Load and split data
df = pd.read_parquet(data_path)
X = df.drop(columns=["churned", "customer_id"])
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=42
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
with mlflow.start_run(run_name=f"gbt_n{n_estimators}_d{max_depth}"):
# Log metadata tags
mlflow.set_tags({
"git.commit": subprocess.getoutput("git rev-parse --short HEAD"),
"data.version": "v3.1",
"author": os.environ.get("USER", "ci"),
})
# Log dataset info
mlflow.log_params({
"train_size": len(X_train),
"test_size": len(X_test),
"feature_count": X.shape[1],
"positive_rate_train": float(y_train.mean()),
})
model = GradientBoostingClassifier(
n_estimators=n_estimators,
max_depth=max_depth,
learning_rate=0.1,
subsample=0.8,
random_state=42,
)
# Train — autolog captures all GBT params automatically
model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = model.predict(X_test_scaled)
y_proba = model.predict_proba(X_test_scaled)[:, 1]
# Log custom metrics autolog doesn't capture
auc = roc_auc_score(y_test, y_proba)
ap = average_precision_score(y_test, y_proba)
mlflow.log_metrics({
"test_roc_auc": auc,
"test_avg_precision": ap,
"test_f1": f1_score(y_test, y_pred),
})
# Log confusion matrix as artifact
fig, ax = plt.subplots()
ConfusionMatrixDisplay.from_predictions(y_test, y_pred, ax=ax)
fig.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
plt.close()
# Log feature importance plot
feat_imp = pd.Series(model.feature_importances_, index=X.columns)
fig, ax = plt.subplots(figsize=(10, 6))
feat_imp.nlargest(20).plot(kind='barh', ax=ax)
ax.set_title("Top 20 Feature Importances")
fig.savefig("feature_importance.png", bbox_inches='tight')
mlflow.log_artifact("feature_importance.png")
plt.close()
# Log classification report as text artifact
report = classification_report(y_test, y_pred)
with open("classification_report.txt", "w") as f:
f.write(report)
mlflow.log_artifact("classification_report.txt")
print(f"ROC-AUC: {auc:.4f}, Avg Precision: {ap:.4f}")
return model, auc
if __name__ == "__main__":
train("data/features_v3.parquet")
Custom Logging for Deep Learning
# training/train_pytorch.py — manual logging for PyTorch
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from pathlib import Path
mlflow.set_experiment("order-value-prediction")
def train_neural_net(
train_loader: DataLoader,
val_loader: DataLoader,
epochs: int = 50,
lr: float = 1e-3,
):
model = OrderValueNet(input_dim=128)
optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
optimizer, max_lr=lr, total_steps=epochs * len(train_loader)
)
criterion = nn.MSELoss()
with mlflow.start_run():
mlflow.log_params({
"model_type": "OrderValueNet",
"epochs": epochs,
"lr": lr,
"batch_size": train_loader.batch_size,
"optimizer": "AdamW",
"scheduler": "OneCycleLR",
})
best_val_loss = float("inf")
patience = 0
for epoch in range(epochs):
# Training loop
model.train()
train_loss = 0.0
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
pred = model(batch_x)
loss = criterion(pred.squeeze(), batch_y)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
scheduler.step()
train_loss += loss.item()
# Validation loop
model.eval()
val_loss = 0.0
with torch.no_grad():
for batch_x, batch_y in val_loader:
pred = model(batch_x)
val_loss += criterion(pred.squeeze(), batch_y).item()
avg_train = train_loss / len(train_loader)
avg_val = val_loss / len(val_loader)
# Log per-epoch metrics — creates training curves in MLflow UI
mlflow.log_metrics(
{
"train_loss": avg_train,
"val_loss": avg_val,
"lr": scheduler.get_last_lr()[0],
},
step=epoch,
)
# Save best model as MLflow artifact
if avg_val < best_val_loss:
best_val_loss = avg_val
patience = 0
mlflow.pytorch.log_model(
model,
artifact_path="model",
input_example=torch.randn(1, 128),
registered_model_name="order-value-predictor",
)
else:
patience += 1
if patience >= 5:
print(f"Early stopping at epoch {epoch}")
break
mlflow.set_tag("best_val_loss", best_val_loss)
return model
Model Registry Operations
# registry/promote.py — model lifecycle management
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType
client = MlflowClient()
MODEL_NAME = "order-churn-predictor"
def find_best_run(experiment_name: str, metric: str = "test_roc_auc") -> str:
"""Find the best run by a given metric."""
experiment = client.get_experiment_by_name(experiment_name)
runs = client.search_runs(
experiment_ids=[experiment.experiment_id],
filter_string=f"metrics.{metric} > 0.85",
run_view_type=ViewType.ACTIVE_ONLY,
order_by=[f"metrics.{metric} DESC"],
max_results=1,
)
return runs[0].info.run_id if runs else None
def register_and_promote(run_id: str, stage: str = "Staging"):
"""Register a trained model and promote to a stage."""
result = mlflow.register_model(
model_uri=f"runs:/{run_id}/model",
name=MODEL_NAME,
tags={"run_id": run_id},
)
version = result.version
client.transition_model_version_stage(
name=MODEL_NAME,
version=version,
stage=stage,
archive_existing_versions=stage == "Production", # Archive old production models
)
client.update_model_version(
name=MODEL_NAME,
version=version,
description=f"Promoted to {stage} by CI pipeline. Run: {run_id}",
)
print(f"Model v{version} promoted to {stage}")
return version
def promote_staging_to_production(min_auc: float = 0.90):
"""Promote latest Staging model to Production if it meets threshold."""
# Get current Staging version
staging_versions = client.get_latest_versions(MODEL_NAME, stages=["Staging"])
if not staging_versions:
raise ValueError("No model in Staging")
staging_version = staging_versions[0]
run = client.get_run(staging_version.run_id)
auc = float(run.data.metrics.get("test_roc_auc", 0))
if auc < min_auc:
raise ValueError(f"Staging model AUC {auc:.3f} < threshold {min_auc}")
client.transition_model_version_stage(
name=MODEL_NAME,
version=staging_version.version,
stage="Production",
archive_existing_versions=True,
)
print(f"Promoted v{staging_version.version} to Production (AUC={auc:.3f})")
# CI usage
if __name__ == "__main__":
run_id = find_best_run("order-churn-prediction")
version = register_and_promote(run_id, stage="Staging")
promote_staging_to_production(min_auc=0.88)
Loading Models from Registry
# inference/predict.py — load production model for serving
import mlflow.pyfunc
import pandas as pd
import os
# Load latest Production model — version-agnostic
model = mlflow.pyfunc.load_model(
model_uri=f"models:/order-churn-predictor/Production"
)
def predict_churn(features: pd.DataFrame) -> dict:
"""Score customers for churn risk."""
predictions = model.predict(features)
return {
"predictions": predictions.tolist(),
"model_version": model.metadata.run_id,
}
# FastAPI serving wrapper
from fastapi import FastAPI
import numpy as np
app = FastAPI()
@app.post("/predict/churn")
async def churn_endpoint(data: dict):
df = pd.DataFrame(data["features"])
result = predict_churn(df)
return result
For the PyTorch model training that MLflow tracks, see the PyTorch training guide for distributed training and optimization patterns. For the Weights & Biases alternative with richer visualization, the MLOps patterns here complement MLOps guide for production deployment pipelines. The Claude Skills 360 bundle includes MLflow skill sets covering experiment tracking, model registry, and CI/CD integration. Start with the free tier to try ML experiment logging generation.