ZenML builds portable ML pipelines with typed steps and stacks. pip install zenml. zenml init sets up the project. @step decorates a function — type hints define the artifact schema: def load_data() -> tuple[np.ndarray, np.ndarray]:. @pipeline connects steps: def training_pipeline(): data = load_data(); model = train_model(data). python run.py runs the pipeline locally. ZenML tracks all artifacts automatically — zenml artifact list shows versions. Steps are typed: Output[DatasetArtifact], Output[ModelArtifact]. Stacks: zenml stack register my-stack -o default -a s3-store -e mlflow-tracker. Stack components: -o orchestrator (local/airflow/kubeflow/sagemaker/vertex), -a artifact store (local/S3/GCS/Azure), -e experiment tracker (mlflow/wandb), -m model deployer (bentoml/seldon/kserve). zenml stack set my-cloud-stack activates a stack. Model Control Plane: @step decorates model registration — zenml.Model(name="churn", version="production"). model.set_stage("production"). model.get_artifact("sklearn_classifier"). @pipeline(model=Model(name="churn")) links the pipeline to a model. zenml server up --docker starts local server with dashboard. with Client() as client: client.get_pipeline_run("run-id") queries programmatically. zenml stack export stack.yaml exports stack config. zenml integration install mlflow bentoml installs integrations. @step(enable_cache=False) disables caching. Claude Code generates ZenML steps, pipeline definitions, stack configurations, and integration setups.
CLAUDE.md for ZenML
## ZenML Stack
- Version: zenml >= 0.55
- Steps: @step decorator — type hints are artifact types (np.ndarray → typed artifact)
- Pipelines: @pipeline connects steps — run with python script or zenml pipeline run
- Stacks: zenml stack register name -o orchestrator -a artifact-store -e experiment-tracker
- Artifacts: all step outputs auto-versioned — zenml artifact list
- Model CP: zenml.Model(name, version) + step outputs → model.set_stage("production")
- Server: zenml server up --docker for dashboard + team collaboration
- Cache: steps cached by default — @step(enable_cache=False) to disable
Steps and Pipeline
# pipelines/training_pipeline.py — ZenML ML pipeline
from __future__ import annotations
import pickle
from typing import Annotated, Tuple
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from zenml import ArtifactConfig, Model, get_step_context, pipeline, step
from zenml.integrations.mlflow.flavors.mlflow_experiment_tracker_flavor import (
MLFlowExperimentTrackerSettings,
)
FEATURE_COLS = ["age", "tenure_days", "monthly_spend", "support_tickets", "last_login_days"]
TARGET_COL = "churned"
# ── Step 1: Load data ────────────────────────────────────────────────────────
@step
def load_data(
data_path: str = "data/train.csv",
test_size: float = 0.2,
) -> Tuple[
Annotated[pd.DataFrame, "train_df"],
Annotated[pd.DataFrame, "test_df"],
]:
"""Load and split the training dataset."""
df = pd.read_csv(data_path)
split = int(len(df) * (1 - test_size))
df_shuffled = df.sample(frac=1, random_state=42).reset_index(drop=True)
return df_shuffled.iloc[:split], df_shuffled.iloc[split:]
# ── Step 2: Preprocess ───────────────────────────────────────────────────────
@step
def preprocess(
train_df: pd.DataFrame,
test_df: pd.DataFrame,
) -> Tuple[
Annotated[np.ndarray, "X_train"],
Annotated[np.ndarray, "X_test"],
Annotated[np.ndarray, "y_train"],
Annotated[np.ndarray, "y_test"],
]:
"""Extract features and labels."""
X_train = train_df[FEATURE_COLS].values.astype(np.float32)
X_test = test_df[FEATURE_COLS].values.astype(np.float32)
y_train = train_df[TARGET_COL].values.astype(np.int32)
y_test = test_df[TARGET_COL].values.astype(np.int32)
return X_train, X_test, y_train, y_test
# ── Step 3: Train ────────────────────────────────────────────────────────────
@step(
experiment_tracker="mlflow_tracker",
settings={
"experiment_tracker.mlflow": MLFlowExperimentTrackerSettings(
experiment_name="churn-classification",
tags={"framework": "sklearn"},
)
},
)
def train_model(
X_train: np.ndarray,
y_train: np.ndarray,
n_estimators: int = 200,
learning_rate: float = 0.05,
max_depth: int = 4,
) -> Annotated[Pipeline, ArtifactConfig(name="sklearn_classifier", is_model_artifact=True)]:
"""Train GBM churn classifier with MLflow tracking."""
import mlflow
with mlflow.start_run():
mlflow.log_params({
"n_estimators": n_estimators,
"learning_rate": learning_rate,
"max_depth": max_depth,
})
pipeline = Pipeline([
("scaler", StandardScaler()),
("clf", GradientBoostingClassifier(
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
random_state=42,
)),
])
pipeline.fit(X_train, y_train)
cv_auc = float(np.mean(
cross_val_score(pipeline, X_train, y_train, cv=5, scoring="roc_auc", n_jobs=-1)
))
mlflow.log_metric("cv_auc", cv_auc)
mlflow.sklearn.log_model(pipeline, "model")
print(f"CV AUC: {cv_auc:.4f}")
return pipeline
# ── Step 4: Evaluate ─────────────────────────────────────────────────────────
@step
def evaluate_model(
model: Pipeline,
X_test: np.ndarray,
y_test: np.ndarray,
) -> Annotated[float, "test_auc"]:
"""Evaluate the trained model on the test set."""
y_proba = model.predict_proba(X_test)[:, 1]
y_pred = model.predict(X_test)
auc = float(roc_auc_score(y_test, y_proba))
print(f"\nTest AUC: {auc:.4f}")
print(classification_report(y_test, y_pred, target_names=["no_churn", "churn"]))
return auc
# ── Step 5: Register model ───────────────────────────────────────────────────
@step
def register_model(
model: Pipeline,
test_auc: float,
min_auc: float = 0.75,
) -> Annotated[bool, "is_promoted"]:
"""Register model in ZenML Model Control Plane if AUC meets threshold."""
context = get_step_context()
zenml_model = context.model
if test_auc >= min_auc:
zenml_model.set_stage("staging", force=True)
print(f"Model promoted to staging (AUC={test_auc:.4f})")
return True
else:
print(f"Model NOT promoted — AUC {test_auc:.4f} < threshold {min_auc}")
return False
# ── Pipeline definition ──────────────────────────────────────────────────────
@pipeline(
name="churn-training-pipeline",
model=Model(
name="churn-classifier",
description="GBM churn prediction model",
tags=["sklearn", "churn"],
),
enable_cache=True,
)
def training_pipeline(
data_path: str = "data/train.csv",
n_estimators: int = 200,
learning_rate: float = 0.05,
max_depth: int = 4,
min_auc: float = 0.75,
):
train_df, test_df = load_data(data_path=data_path)
X_train, X_test, y_train, y_test = preprocess(train_df=train_df, test_df=test_df)
model = train_model(
X_train=X_train,
y_train=y_train,
n_estimators=n_estimators,
learning_rate=learning_rate,
max_depth=max_depth,
)
auc = evaluate_model(model=model, X_test=X_test, y_test=y_test)
register_model(model=model, test_auc=auc, min_auc=min_auc)
if __name__ == "__main__":
training_pipeline()
Stack Configuration
# stack.yaml — ZenML cloud stack with S3 + MLflow + SageMaker
zenml_version: "0.55.0"
stack_name: aws-production
components:
orchestrator:
name: sagemaker_orchestrator
type: orchestrator
flavor: sagemaker
configuration:
region: us-east-1
execution_role: arn:aws:iam::123456789012:role/ZenMLSageMakerRole
instance_type: ml.m5.xlarge
artifact_store:
name: s3_artifact_store
type: artifact_store
flavor: s3
configuration:
path: s3://my-ml-bucket/zenml-artifacts
experiment_tracker:
name: mlflow_tracker
type: experiment_tracker
flavor: mlflow
configuration:
tracking_uri: http://mlflow.internal:5000
model_deployer:
name: bentoml_deployer
type: model_deployer
flavor: bentoml
# scripts/setup_stack.py — programmatic stack management
from zenml.client import Client
def register_aws_stack():
"""Register AWS production stack via Python SDK."""
client = Client()
# Register components
s3_store = client.create_stack_component(
name="s3-artifact-store",
component_type="artifact_store",
flavor="s3",
configuration={"path": "s3://my-ml-bucket/zenml-artifacts"},
)
mlflow_tracker = client.create_stack_component(
name="mlflow-tracker",
component_type="experiment_tracker",
flavor="mlflow",
configuration={"tracking_uri": "http://mlflow.internal:5000"},
)
# Register and activate stack
stack = client.create_stack(
name="aws-prod",
components={
"orchestrator": "default", # local orchestrator
"artifact_store": s3_store.name,
"experiment_tracker": mlflow_tracker.name,
},
)
client.activate_stack(stack.id)
print(f"Stack '{stack.name}' activated")
For the Metaflow alternative when using AWS-native infrastructure and wanting tight AWS Step Functions and Batch integration with a Python-first API that decorates individual steps with @batch(cpu=4, memory=16000) for cloud execution — Metaflow is simpler for data scientists while ZenML provides stack portability across AWS/GCP/Azure and stronger typing with artifact lineage that makes pipelines reproducible across teams. For the Kubeflow Pipelines alternative when already running Kubernetes and wanting the standard CNCF pipeline solution with the full KFP SDK and Argo Workflows backend — KFP is more mature for K8s-native execution while ZenML abstracts over multiple orchestrators so the same Python pipeline code runs locally, on Kubeflow, SageMaker, or Vertex without modification. The Claude Skills 360 bundle includes ZenML skill sets covering typed steps, pipeline definitions, stack YAML configs, Model Control Plane, and MLflow integration. Start with the free tier to try ML pipeline generation.