Ray Serve scales ML inference across a cluster with actor-based deployments. pip install "ray[serve]". @serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 1}) decorates a class. serve.run(MyDeployment.bind()) starts locally. Class __init__ loads the model once per replica; async def __call__(self, request: Request) handles inference. DeploymentHandle for composition: self.preprocessor = preprocessor.options().bind(), then await self.preprocessor.remote(text) inside the parent deployment. @serve.ingress(app) integrates FastAPI — app = FastAPI(), @app.post("/predict") routes work natively. GPU: ray_actor_options={"num_gpus": 0.5} gives fractional allocation — two replicas share one GPU. Autoscaling: @serve.deployment(autoscaling_config={"min_replicas": 1, "max_replicas": 10, "target_num_ongoing_requests_per_replica": 5}). Traffic splitting: serve.run(Router.bind(handle_a=ModelA.bind(), handle_b=ModelB.bind())) with weighted random routing in __call__. serve.status() shows deployment health. serve.delete("app_name") tears down. Kubernetes: RayService CRD with serveConfigV2 block — ray.io/RayService deploys via KubeRay operator. serve build MyApp:app -o serve_config.yaml generates config. serve deploy serve_config.yaml hot-reloads without downtime. Multi-model pipeline: Preprocessor → Classifier → Postprocessor chained with DeploymentHandle.remote(). Claude Code generates Ray Serve deployments, FastAPI integrations, multi-model pipelines, autoscaling configs, and TypeScript API clients.
CLAUDE.md for Ray Serve
## Ray Serve Stack
- Version: ray[serve] >= 2.10
- Deployment: @serve.deployment(num_replicas, ray_actor_options, autoscaling_config) class with async __call__
- Composition: DeploymentHandle via .bind() — await handle.remote(input) for chained inference
- FastAPI: @serve.ingress(app) — full FastAPI routes work natively
- Run: serve.run(App.bind()) locally; serve deploy config.yaml for production
- GPU: ray_actor_options={"num_gpus": 0.5} for fractional GPU
- K8s: RayService CRD via KubeRay operator with serveConfigV2
Ray Serve Deployments
# serve_app.py — Ray Serve multi-model pipeline with FastAPI
from __future__ import annotations
import asyncio
import ray
from ray import serve
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional, List
import numpy as np
# ── Schema ─────────────────────────────────────────────────────────────────
class ClassifyRequest(BaseModel):
text: str
max_length: Optional[int] = 512
class ClassifyResponse(BaseModel):
label: str
score: float
latency_ms: float
class BatchRequest(BaseModel):
texts: List[str]
class BatchResponse(BaseModel):
results: List[ClassifyResponse]
# ── Preprocessor deployment ────────────────────────────────────────────────
@serve.deployment(
name="preprocessor",
num_replicas=2,
ray_actor_options={"num_cpus": 0.5},
)
class Preprocessor:
def __init__(self):
from transformers import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest")
async def __call__(self, text: str, max_length: int = 512) -> dict:
tokens = self.tokenizer(
text,
max_length=max_length,
truncation=True,
padding=True,
return_tensors="pt",
)
return {k: v.tolist() for k, v in tokens.items()}
# ── Classifier deployment (GPU) ────────────────────────────────────────────
@serve.deployment(
name="classifier",
num_replicas=1,
ray_actor_options={
"num_cpus": 2,
"num_gpus": 0.5, # Fractional GPU — share with another replica
},
autoscaling_config={
"min_replicas": 1,
"max_replicas": 4,
"target_num_ongoing_requests_per_replica": 8,
"upscale_delay_s": 10,
"downscale_delay_s": 60,
},
)
class Classifier:
def __init__(self):
import torch
from transformers import AutoModelForSequenceClassification
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = AutoModelForSequenceClassification.from_pretrained(
"cardiffnlp/twitter-roberta-base-sentiment-latest"
).to(self.device)
self.model.eval()
self.labels = ["negative", "neutral", "positive"]
async def predict(self, tokens: dict) -> dict:
import torch
with torch.no_grad():
input_tensors = {
k: torch.tensor(v).to(self.device)
for k, v in tokens.items()
}
outputs = self.model(**input_tensors)
probs = torch.softmax(outputs.logits, dim=-1)[0]
label_idx = probs.argmax().item()
return {
"label": self.labels[label_idx],
"score": float(probs[label_idx]),
}
# ── FastAPI-integrated serving deployment ─────────────────────────────────
app = FastAPI(title="Sentiment API", version="1.0.0")
@serve.deployment(
name="sentiment_app",
num_replicas=2,
ray_actor_options={"num_cpus": 1},
)
@serve.ingress(app)
class SentimentApp:
def __init__(
self,
preprocessor: serve.handle_type(Preprocessor),
classifier: serve.handle_type(Classifier),
):
self.preprocessor = preprocessor
self.classifier = classifier
@app.post("/classify", response_model=ClassifyResponse)
async def classify(self, req: ClassifyRequest) -> ClassifyResponse:
import time
t0 = time.perf_counter()
tokens = await self.preprocessor.remote(req.text, req.max_length)
result = await self.classifier.predict.remote(tokens)
return ClassifyResponse(
label=result["label"],
score=round(result["score"], 4),
latency_ms=round((time.perf_counter() - t0) * 1000, 2),
)
@app.post("/classify/batch", response_model=BatchResponse)
async def classify_batch(self, req: BatchRequest) -> BatchResponse:
if len(req.texts) > 64:
raise HTTPException(400, "Max 64 texts per batch")
tasks = [
asyncio.gather(
self.preprocessor.remote(text),
)
for text in req.texts
]
token_batches = await asyncio.gather(*tasks)
predict_tasks = [
self.classifier.predict.remote(tokens[0])
for tokens in token_batches
]
results = await asyncio.gather(*predict_tasks)
return BatchResponse(
results=[
ClassifyResponse(label=r["label"], score=round(r["score"], 4), latency_ms=0)
for r in results
]
)
@app.get("/health")
async def health(self) -> dict:
return {"status": "ok", "replicas": "active"}
# ── App entrypoint ─────────────────────────────────────────────────────────
sentiment_app = SentimentApp.bind(
preprocessor=Preprocessor.bind(),
classifier=Classifier.bind(),
)
Traffic Splitting (A/B Testing)
# ab_router.py — A/B traffic splitting between model versions
import random
from ray import serve
from fastapi import FastAPI, Request
@serve.deployment(name="model_v1", num_replicas=2, ray_actor_options={"num_cpus": 1})
class ModelV1:
def __init__(self):
# Load v1 model
self.version = "v1"
async def predict(self, text: str) -> dict:
# Simulate inference
return {"label": "positive", "score": 0.85, "version": self.version}
@serve.deployment(name="model_v2", num_replicas=2, ray_actor_options={"num_cpus": 1})
class ModelV2:
def __init__(self):
# Load v2 model (new architecture)
self.version = "v2"
async def predict(self, text: str) -> dict:
return {"label": "positive", "score": 0.92, "version": self.version}
ab_app = FastAPI()
@serve.deployment(name="ab_router", num_replicas=1)
@serve.ingress(ab_app)
class ABRouter:
"""Routes 20% traffic to v2, 80% to v1 for canary testing."""
def __init__(
self,
model_v1: serve.handle_type(ModelV1),
model_v2: serve.handle_type(ModelV2),
v2_fraction: float = 0.2,
):
self.model_v1 = model_v1
self.model_v2 = model_v2
self.v2_fraction = v2_fraction
@ab_app.post("/predict")
async def predict(self, request: Request) -> dict:
body = await request.json()
text = body.get("text", "")
handle = self.model_v2 if random.random() < self.v2_fraction else self.model_v1
return await handle.predict.remote(text)
ab_router = ABRouter.bind(
model_v1=ModelV1.bind(),
model_v2=ModelV2.bind(),
v2_fraction=0.2,
)
Kubernetes RayService
# k8s/ray-service.yaml — KubeRay RayService for production
apiVersion: ray.io/v1
kind: RayService
metadata:
name: sentiment-service
namespace: ml-serving
spec:
serviceUnhealthySecondThreshold: 300
deploymentUnhealthySecondThreshold: 300
serveConfigV2: |
applications:
- name: sentiment_app
import_path: serve_app:sentiment_app
route_prefix: /
runtime_env:
pip:
- transformers>=4.40.0
- torch>=2.2.0
deployments:
- name: preprocessor
num_replicas: 2
ray_actor_options:
num_cpus: 0.5
- name: classifier
autoscaling_config:
min_replicas: 1
max_replicas: 4
target_num_ongoing_requests_per_replica: 8
ray_actor_options:
num_cpus: 2
num_gpus: 0.5
- name: sentiment_app
num_replicas: 2
ray_actor_options:
num_cpus: 1
rayClusterConfig:
headGroupSpec:
rayStartParams:
dashboard-host: "0.0.0.0"
template:
spec:
containers:
- name: ray-head
image: rayproject/ray-ml:2.10.0-py311-gpu
resources:
requests:
cpu: "2"
memory: "8Gi"
limits:
cpu: "4"
memory: "16Gi"
workerGroupSpecs:
- replicas: 2
minReplicas: 1
maxReplicas: 8
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray-ml:2.10.0-py311-gpu
resources:
requests:
cpu: "4"
memory: "16Gi"
nvidia.com/gpu: "1"
limits:
cpu: "8"
memory: "32Gi"
nvidia.com/gpu: "1"
TypeScript Client
// lib/ray-serve/client.ts — TypeScript client for Ray Serve endpoint
const RAY_SERVE_URL = process.env.RAY_SERVE_URL ?? "http://localhost:8000"
export type ClassifyRequest = { text: string; max_length?: number }
export type ClassifyResponse = { label: string; score: number; latency_ms: number }
export type BatchResponse = { results: ClassifyResponse[] }
async function rayFetch<T>(path: string, body: unknown): Promise<T> {
const res = await fetch(`${RAY_SERVE_URL}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
})
if (!res.ok) throw new Error(`Ray Serve ${res.status}: ${await res.text()}`)
return res.json()
}
export async function classifyText(text: string, max_length = 512): Promise<ClassifyResponse> {
return rayFetch<ClassifyResponse>("/classify", { text, max_length })
}
export async function classifyBatch(texts: string[]): Promise<ClassifyResponse[]> {
const result = await rayFetch<BatchResponse>("/classify/batch", { texts })
return result.results
}
export async function healthCheck(): Promise<{ status: string }> {
const res = await fetch(`${RAY_SERVE_URL}/health`)
return res.json()
}
For the BentoML alternative when packaging individual models into self-contained Docker images with clear versioning and a managed model store — BentoML’s bentofile.yaml approach is friendlier for single-model deployments and CI/CD without needing a Ray cluster, while Ray Serve excels at large multi-model pipelines, fractional GPU sharing, and dynamic horizontal scaling across a Ray cluster with actor-based concurrency. For the Triton Inference Server alternative when needing NVIDIA’s high-performance inference engine with TensorRT optimization, dynamic batching, and model repository management for multiple frameworks (ONNX, TensorFlow, PyTorch) — Triton maximizes GPU utilization for high-throughput serving while Ray Serve provides a Pythonic API for complex ML pipelines with arbitrary preprocessing logic. The Claude Skills 360 bundle includes Ray Serve skill sets covering deployment composition, autoscaling, traffic splitting, and Kubernetes RayService configs. Start with the free tier to try distributed inference generation.