MLOps bridges the gap between model notebooks and production ML systems. The gap is large: a notebook that achieves 94% accuracy is very different from a model that serves predictions reliably at scale, degrades gracefully when data drifts, and can be rolled back when a new version underperforms. Claude Code builds the infrastructure that makes the journey from notebook to production repeatable.
Experiment Tracking with MLflow
CLAUDE.md for MLOps Projects
## MLOps Stack
- Experiment tracking: MLflow (self-hosted, PostgreSQL backend + S3 artifact store)
- Training: PyTorch with Hugging Face Accelerate for multi-GPU
- Feature store: Feast with Redis online store + Snowflake offline store
- Model registry: MLflow Model Registry (staging/production/archived stages)
- Serving: FastAPI + ONNX Runtime for inference; Ray Serve for high-throughput
- Monitoring: Evidently for data/model drift, Prometheus + Grafana for latency/throughput
## Model Lifecycle
- Experiment → register in MLflow → staging review → production promotion
- All model versions track: training data version, feature schema, hyperparameters, eval metrics
- Canary deployments: 5% traffic to new model, monitor for 24h before full rollout
# train.py — structured training with MLflow tracking
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from typing import Dict, Any
import os
mlflow.set_tracking_uri(os.environ['MLFLOW_TRACKING_URI'])
mlflow.set_experiment("fraud-detection-v2")
def train(config: Dict[str, Any]):
with mlflow.start_run(run_name=f"fraud-{config['model_type']}-{config['lr']}") as run:
# Log all hyperparameters
mlflow.log_params(config)
# Log the dataset version (crucial for reproducibility)
mlflow.log_param("train_data_version", config['data_version'])
mlflow.log_param("feature_schema_version", config['feature_schema'])
model = build_model(config)
optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'], weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=config['epochs'])
train_loader = get_dataloader(split='train', version=config['data_version'])
val_loader = get_dataloader(split='val', version=config['data_version'])
best_val_auc = 0
for epoch in range(config['epochs']):
# Training loop
model.train()
train_loss = 0
for batch_idx, (features, labels) in enumerate(train_loader):
optimizer.zero_grad()
logits = model(features)
loss = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([config['pos_weight']]))(logits, labels.float())
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
with torch.no_grad():
val_metrics = evaluate(model, val_loader)
# Log metrics per epoch
mlflow.log_metrics({
'train_loss': train_loss / len(train_loader),
'val_auc_roc': val_metrics['auc_roc'],
'val_precision': val_metrics['precision'],
'val_recall': val_metrics['recall'],
'val_f1': val_metrics['f1'],
'learning_rate': scheduler.get_last_lr()[0],
}, step=epoch)
# Save best model
if val_metrics['auc_roc'] > best_val_auc:
best_val_auc = val_metrics['auc_roc']
mlflow.pytorch.log_model(
model,
artifact_path="model",
registered_model_name="fraud-detector",
# Log input/output schema — enables automatic validation at serving time
signature=mlflow.models.infer_signature(
features.numpy(),
logits.detach().numpy(),
),
)
mlflow.log_metric('best_val_auc', best_val_auc)
scheduler.step()
# Log final eval on held-out test set
test_metrics = evaluate(model, get_dataloader(split='test', version=config['data_version']))
mlflow.log_metrics({f'test_{k}': v for k, v in test_metrics.items()})
print(f"Run {run.info.run_id} complete. Best val AUC: {best_val_auc:.4f}")
return run.info.run_id
Model Promotion Pipeline
After training, I need a pipeline that:
1. Runs validation checks on the new model
2. Compares against the current production model
3. Promotes to production if it's at least as good
# promote.py — automated model promotion with gates
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
def promote_model(run_id: str, min_auc: float = 0.85, max_regression: float = 0.02) -> bool:
"""
Promote model to production if:
- Test AUC >= min_auc (absolute threshold)
- Test AUC >= current production AUC - max_regression (no significant regression)
"""
# Get the candidate model's metrics
run = client.get_run(run_id)
candidate_auc = run.data.metrics.get('test_auc_roc', 0)
candidate_precision = run.data.metrics.get('test_precision', 0)
print(f"Candidate: AUC={candidate_auc:.4f}, Precision={candidate_precision:.4f}")
# Gate 1: Absolute quality threshold
if candidate_auc < min_auc:
print(f"REJECTED: AUC {candidate_auc:.4f} < minimum {min_auc}")
return False
# Gate 2: No regression vs current production
prod_versions = client.get_latest_versions("fraud-detector", stages=["Production"])
if prod_versions:
prod_run_id = prod_versions[0].run_id
prod_run = client.get_run(prod_run_id)
prod_auc = prod_run.data.metrics.get('test_auc_roc', 0)
if candidate_auc < prod_auc - max_regression:
print(f"REJECTED: Regression. Candidate {candidate_auc:.4f} vs prod {prod_auc:.4f}")
return False
print(f"Production baseline: AUC={prod_auc:.4f}")
# Move current production to archived
client.transition_model_version_stage(
name="fraud-detector",
version=prod_versions[0].version,
stage="Archived",
)
# Promote candidate to production
# First, find the registered model version for this run
versions = client.search_model_versions(f"run_id='{run_id}'")
if not versions:
print("REJECTED: No registered model version found for this run")
return False
client.transition_model_version_stage(
name="fraud-detector",
version=versions[0].version,
stage="Production",
archive_existing_versions=False,
)
print(f"PROMOTED version {versions[0].version} to Production. AUC: {candidate_auc:.4f}")
return True
Model Serving with FastAPI + ONNX
The PyTorch model is too slow for our 50ms latency SLA.
Export to ONNX and serve with FastAPI.
# export_onnx.py — export PyTorch model to ONNX
import torch
import mlflow.pytorch
import onnx
import onnxruntime as ort
def export_to_onnx(run_id: str, output_path: str = "fraud_model.onnx"):
# Load from MLflow
model = mlflow.pytorch.load_model(f"runs:/{run_id}/model")
model.eval()
# Create dummy input matching the model's expected input shape
dummy_input = torch.randn(1, 128) # batch_size=1, features=128
# Export with dynamic batch size
torch.onnx.export(
model,
dummy_input,
output_path,
input_names=['features'],
output_names=['logits'],
dynamic_axes={'features': {0: 'batch_size'}, 'logits': {0: 'batch_size'}},
opset_version=17,
do_constant_folding=True,
)
# Validate the export
onnx_model = onnx.load(output_path)
onnx.checker.check_model(onnx_model)
print(f"ONNX model validated: {output_path}")
# serve.py — FastAPI inference service
from fastapi import FastAPI, HTTPException
import onnxruntime as ort
import numpy as np
from pydantic import BaseModel
app = FastAPI()
# Load ONNX model with optimization
session_options = ort.SessionOptions()
session_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
session_options.intra_op_num_threads = 4
ort_session = ort.InferenceSession(
"fraud_model.onnx",
session_options,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider'], # GPU if available
)
class PredictionRequest(BaseModel):
features: list[float] # 128-dim feature vector
class PredictionResponse(BaseModel):
fraud_probability: float
is_fraud: bool
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
if len(request.features) != 128:
raise HTTPException(400, f"Expected 128 features, got {len(request.features)}")
input_array = np.array([request.features], dtype=np.float32)
outputs = ort_session.run(
output_names=['logits'],
input_feed={'features': input_array},
)
logit = outputs[0][0][0]
probability = float(1 / (1 + np.exp(-logit))) # sigmoid
return PredictionResponse(
fraud_probability=round(probability, 4),
is_fraud=probability > 0.5,
model_version=os.environ.get('MODEL_VERSION', 'unknown'),
)
Data Drift Detection
Our model's precision dropped 8% last week.
Set up monitoring to detect data drift before it causes this.
# drift_detection.py — detect when production data drifts from training data
import pandas as pd
import numpy as np
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
from evidently.metrics import *
import json
def run_drift_report(
reference_df: pd.DataFrame, # Training/validation data
current_df: pd.DataFrame, # Recent production data
output_path: str = "drift_report.html",
) -> dict:
"""
Generate drift report and return summary metrics.
Raises if critical drift detected.
"""
column_mapping = ColumnMapping(
target='label',
prediction='prediction_probability',
numerical_features=[f'feature_{i}' for i in range(128)],
)
report = Report(metrics=[
DataDriftPreset(
drift_share=0.3, # Alert if > 30% of features are drifted
stattest='psi', # PSI detects distribution shift well
stattest_threshold=0.2,
),
TargetDriftPreset(stattest='chi2'),
RegressionPreset() if False else ClassificationPreset(
probas_threshold=0.5,
),
])
report.run(
reference_data=reference_df,
current_data=current_df,
column_mapping=column_mapping,
)
report.save_html(output_path)
# Extract key metrics
report_dict = report.as_dict()
metrics = report_dict['metrics']
drift_summary = {
'dataset_drift': metrics[0]['result']['dataset_drift'],
'drift_share': metrics[0]['result']['drift_share'],
'drifted_feature_count': metrics[0]['result']['number_of_drifted_columns'],
'target_drift': metrics[1]['result'].get('drift_detected', False),
}
# Alert on significant drift
if drift_summary['drift_share'] > 0.5:
print(f"⚠️ HIGH DRIFT: {drift_summary['drifted_feature_count']} features drifted")
send_slack_alert(drift_summary, report_path=output_path)
return drift_summary
def send_slack_alert(summary: dict, report_path: str):
import httpx
httpx.post(os.environ['SLACK_WEBHOOK_URL'], json={
"text": (
f"🚨 *Model Drift Alert* — fraud-detector\n"
f"• Drift share: {summary['drift_share']:.1%} of features\n"
f"• Drifted features: {summary['drifted_feature_count']}\n"
f"• Target drift detected: {summary['target_drift']}\n"
f"Full report: {os.environ['REPORT_BASE_URL']}/{os.path.basename(report_path)}"
)
})
For the machine learning foundations including data engineering pipelines that feed ML models, see the data engineering guide. For deploying ML services on Kubernetes with GPU nodegroups and autoscaling, the Kubernetes guide covers GPU workload deployment. The Claude Skills 360 bundle includes MLOps skill sets covering experiment tracking, model serving, and monitoring patterns. Start with the free tier to try training pipeline scaffolding.