AWS SageMaker manages the full ML lifecycle — training, registering, and deploying models. pip install sagemaker. Estimator runs training jobs: sklearn = SKLearn(entry_point="train.py", role=role, instance_type="ml.m5.xlarge", framework_version="1.2-1"). sklearn.fit({"train": "s3://bucket/train/", "test": "s3://bucket/test/"}). Custom containers: Estimator(image_uri="account.dkr.ecr.region.amazonaws.com/my-image:latest", ...). HuggingFace: HuggingFace(task="text-classification", transformers_version="4.36", pytorch_version="2.1", ...). Endpoint: predictor = sklearn.deploy(initial_instance_count=1, instance_type="ml.m5.xlarge"). predictor.predict([[1.0, 2.0, 3.0]]). Separate Model class: model = Model(image_uri=..., model_data="s3://bucket/model.tar.gz", role=role). model.deploy(endpoint_name="prod-endpoint", ...). Multi-variant A/B: ProductionVariant with InitialVariantWeight. Batch Transform: transformer = model.transformer(instance_count=2, instance_type="ml.m5.xlarge"), transformer.transform("s3://input/", content_type="text/csv"). SageMaker Pipelines: Pipeline(name="churn-pipeline", steps=[processing_step, training_step, eval_step, register_step, deploy_step]). Step types: ProcessingStep, TrainingStep, TransformStep, ConditionStep, RegisterModel. Model Registry: model_package_group_name, approval_status="PendingManualApproval". Feature Store: FeatureGroup with record_identifier_name, feature_definitions, online_store_config, offline_store_config. feature_group.ingest(df). feature_store_runtime.get_record(FeatureGroupName, RecordIdentifierValueAsString). Claude Code generates SageMaker training scripts, pipeline definitions, endpoint configs, and feature store integrations.
CLAUDE.md for SageMaker
## SageMaker Stack
- SDK: sagemaker >= 2.200 + boto3
- Training: Estimator(entry_point, role, instance_type, framework_version).fit({"train": s3://})
- Deploy: predictor = estimator.deploy(instance_count, instance_type, endpoint_name)
- Model class: Model(image_uri, model_data, role).deploy(...)
- Batch: model.transformer(...).transform(s3_input)
- Pipelines: Pipeline(steps=[ProcessingStep, TrainingStep, ConditionStep, RegisterModel])
- Registry: register model with ModelPackageGroupName + approval workflow
- Features: FeatureGroup with online + offline store — ingest(df) and get_record()
Training Script
# train.py — SageMaker training entry point (runs inside SageMaker container)
from __future__ import annotations
import argparse
import json
import os
import pickle
import numpy as np
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
# SageMaker passes hyperparameters as CLI args
parser.add_argument("--n-estimators", type=int, default=200)
parser.add_argument("--learning-rate", type=float, default=0.05)
parser.add_argument("--max-depth", type=int, default=4)
parser.add_argument("--min-samples-leaf", type=int, default=10)
# SageMaker environment variables for data and model paths
parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR", "/opt/ml/model"))
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
parser.add_argument("--output-data-dir", type=str, default=os.environ.get("SM_OUTPUT_DATA_DIR"))
return parser.parse_args()
def load_data(data_dir: str) -> pd.DataFrame:
files = [f for f in os.listdir(data_dir) if f.endswith(".csv")]
return pd.concat([pd.read_csv(os.path.join(data_dir, f)) for f in files], ignore_index=True)
def main():
args = parse_args()
print(f"Hyperparameters: {vars(args)}")
# Load data
train_df = load_data(args.train)
test_df = load_data(args.test) if args.test else train_df.sample(frac=0.2)
feature_cols = ["age", "tenure_days", "monthly_spend", "support_tickets", "last_login_days"]
target_col = "churned"
X_train = train_df[feature_cols].values
y_train = train_df[target_col].values
X_test = test_df[feature_cols].values
y_test = test_df[target_col].values
# Train pipeline: scaler + GBM
pipeline = Pipeline([
("scaler", StandardScaler()),
("classifier", GradientBoostingClassifier(
n_estimators=args.n_estimators,
learning_rate=args.learning_rate,
max_depth=args.max_depth,
min_samples_leaf=args.min_samples_leaf,
random_state=42,
)),
])
pipeline.fit(X_train, y_train)
# Evaluate
y_pred = pipeline.predict(X_test)
y_proba = pipeline.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_proba)
print(f"\nAUC-ROC: {auc:.4f}")
print(classification_report(y_test, y_pred))
# Save metrics for SageMaker Model Registry evaluation
if args.output_data_dir:
os.makedirs(args.output_data_dir, exist_ok=True)
metrics = {"auc": auc, "n_test": len(y_test)}
with open(os.path.join(args.output_data_dir, "metrics.json"), "w") as f:
json.dump(metrics, f)
# Save model — SageMaker expects model.pkl in SM_MODEL_DIR
os.makedirs(args.model_dir, exist_ok=True)
with open(os.path.join(args.model_dir, "model.pkl"), "wb") as f:
pickle.dump(pipeline, f)
print(f"Model saved to {args.model_dir}")
if __name__ == "__main__":
main()
SageMaker Workflow (SDK)
# ml/sagemaker_workflow.py — training, registration, and deployment
from __future__ import annotations
import boto3
import sagemaker
from sagemaker.sklearn import SKLearn
from sagemaker.model import Model
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.pipeline_context import PipelineSession
ROLE = "arn:aws:iam::123456789012:role/SageMakerExecutionRole"
BUCKET = "my-ml-bucket"
PREFIX = "churn-model"
REGION = "us-east-1"
def get_session() -> tuple[sagemaker.Session, PipelineSession]:
boto_session = boto3.Session(region_name=REGION)
sm_session = sagemaker.Session(boto_session=boto_session, default_bucket=BUCKET)
pipeline_session = PipelineSession(boto_session=boto_session, default_bucket=BUCKET)
return sm_session, pipeline_session
def create_training_step(pipeline_session: PipelineSession) -> TrainingStep:
"""Define a SageMaker training step for Pipelines."""
estimator = SKLearn(
entry_point="train.py",
source_dir=".",
role=ROLE,
instance_type="ml.m5.xlarge",
instance_count=1,
framework_version="1.2-1",
py_version="py3",
sagemaker_session=pipeline_session,
hyperparameters={
"n-estimators": 200,
"learning-rate": 0.05,
"max-depth": 4,
"min-samples-leaf": 10,
},
output_path=f"s3://{BUCKET}/{PREFIX}/output",
metric_definitions=[
{"Name": "auc", "Regex": "AUC-ROC: ([0-9\\.]+)"},
],
)
return TrainingStep(
name="TrainChurnModel",
estimator=estimator,
inputs={
"train": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{BUCKET}/{PREFIX}/data/train/",
content_type="text/csv",
),
"test": sagemaker.inputs.TrainingInput(
s3_data=f"s3://{BUCKET}/{PREFIX}/data/test/",
content_type="text/csv",
),
},
)
def deploy_endpoint(
model_data: str,
endpoint_name: str = "churn-prod",
instance_type: str = "ml.m5.large",
) -> Predictor:
"""Deploy a registered model to a real-time endpoint."""
sm_session, _ = get_session()
model = Model(
model_data=model_data,
image_uri=sagemaker.image_uris.retrieve(
framework="sklearn",
region=REGION,
version="1.2-1",
),
role=ROLE,
sagemaker_session=sm_session,
)
predictor = model.deploy(
endpoint_name=endpoint_name,
initial_instance_count=1,
instance_type=instance_type,
serializer=CSVSerializer(),
deserializer=JSONDeserializer(),
)
# Configure autoscaling
client = boto3.client("application-autoscaling", region_name=REGION)
resource_id = f"endpoint/{endpoint_name}/variant/AllTraffic"
client.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
MinCapacity=1,
MaxCapacity=10,
)
client.put_scaling_policy(
PolicyName="InvocationsPerInstance",
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
PolicyType="TargetTrackingScaling",
TargetTrackingScalingPolicyConfiguration={
"TargetValue": 500,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "SageMakerVariantInvocationsPerInstance",
},
"ScaleInCooldown": 300,
"ScaleOutCooldown": 60,
},
)
print(f"Endpoint deployed with autoscaling: {endpoint_name}")
return predictor
def batch_transform(
model_data: str,
s3_input: str,
s3_output: str,
instance_type: str = "ml.m5.xlarge",
) -> None:
"""Run batch scoring on S3 input."""
sm_session, _ = get_session()
model = Model(
model_data=model_data,
image_uri=sagemaker.image_uris.retrieve("sklearn", REGION, "1.2-1"),
role=ROLE,
sagemaker_session=sm_session,
)
transformer = model.transformer(
instance_count=1,
instance_type=instance_type,
output_path=s3_output,
strategy="MultiRecord",
accept="text/csv",
assemble_with="Line",
)
transformer.transform(
data=s3_input,
content_type="text/csv",
split_type="Line",
wait=True,
)
print(f"Batch transform complete. Output at: {s3_output}")
TypeScript Client
// lib/sagemaker/client.ts — invoke SageMaker endpoints from TypeScript
import {
SageMakerRuntimeClient,
InvokeEndpointCommand,
} from "@aws-sdk/client-sagemaker-runtime"
const client = new SageMakerRuntimeClient({ region: process.env.AWS_REGION ?? "us-east-1" })
export async function invokeEndpoint(
endpointName: string,
features: number[][],
): Promise<number[]> {
const body = features.map(row => row.join(",")).join("\n")
const cmd = new InvokeEndpointCommand({
EndpointName: endpointName,
Body: Buffer.from(body),
ContentType: "text/csv",
Accept: "application/json",
})
const response = await client.send(cmd)
const text = new TextDecoder().decode(response.Body as Uint8Array)
return JSON.parse(text)
}
export async function predictChurn(
endpointName: string,
records: Array<{
age: number
tenure_days: number
monthly_spend: number
support_tickets: number
last_login_days: number
}>,
): Promise<Array<{ churn_probability: number; risk_tier: string }>> {
const features = records.map(r => [
r.age, r.tenure_days, r.monthly_spend, r.support_tickets, r.last_login_days,
])
const predictions = await invokeEndpoint(endpointName, features)
return predictions.map((prob: number) => ({
churn_probability: Math.round(prob * 10000) / 10000,
risk_tier: prob > 0.7 ? "HIGH" : prob > 0.3 ? "MEDIUM" : "LOW",
}))
}
For the Vertex AI alternative when already using Google Cloud and wanting managed training with AutoML, pre-built containers, Vertex Pipelines, and Vertex Feature Store on GCP — Vertex AI mirrors SageMaker’s feature set on GCP with tighter BigQuery integration while SageMaker is the natural choice for AWS-native teams with data already in S3 and existing IAM/VPC infrastructure. For the KServe alternative when needing a Kubernetes-native open-source serving layer that runs on any cloud or on-prem without vendor lock-in — KServe’s InferenceService CRD gives full infrastructure control while SageMaker manages the underlying infrastructure automatically with built-in monitoring, A/B testing, and AWS-native autoscaling. The Claude Skills 360 bundle includes SageMaker skill sets covering training scripts, pipeline definitions, endpoint deployment, batch transform, and TypeScript SDK clients. Start with the free tier to try AWS ML workflow generation.