DeepSpeed trains large models across multiple GPUs with ZeRO memory optimization. pip install deepspeed. Native: model_engine, optimizer, _, _ = deepspeed.initialize(model=model, optimizer=optimizer, config=ds_config). model_engine(inputs).backward(loss). model_engine.step(). ds_config dict or path to ds_config.json. ZeRO-2: {"zero_optimization": {"stage": 2, "allgather_partitions": true, "allgather_bucket_size": 2e8, "overlap_comm": true, "reduce_scatter": true}}. ZeRO-3: {"stage": 3, "offload_optimizer": {"device": "cpu"}, "offload_param": {"device": "cpu"}} — partitions optimizer states, gradients, AND parameters across GPUs. With HuggingFace Trainer: TrainingArguments(deepspeed="ds_config.json", bf16=True, per_device_train_batch_size=4) — Trainer handles deepspeed.initialize internally. Accelerate: from accelerate import Accelerator, plugin = DeepSpeedPlugin(zero_stage=2, gradient_accumulation_steps=4), accelerator = Accelerator(deepspeed_plugin=plugin, mixed_precision="bf16"), model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader), accelerator.backward(loss). Mixed precision: {"fp16": {"enabled": true, "loss_scale": 0, "loss_scale_window": 1000}} or {"bf16": {"enabled": true}}. Gradient checkpointing: model.gradient_checkpointing_enable() before deepspeed.initialize. Launch: deepspeed --num_gpus 8 train.py --deepspeed ds_config.json or deepspeed --hostfile hostfile --num_nodes 2 --num_gpus 8 train.py. Pipeline parallelism: from deepspeed.pipe import PipelineModule, layers = [LayerSpec(EmbedLayer), LayerSpec(TransformerLayer, ...), LayerSpec(HeadLayer)], model = PipelineModule(layers=layers, num_stages=4). engine.train_batch() handles micro-batching across pipeline stages. Checkpoint: engine.save_checkpoint(save_dir, tag), engine.load_checkpoint(load_dir, tag). Claude Code generates DeepSpeed configs, ZeRO training loops, HuggingFace Trainer integration, Accelerate pipelines, and multi-node launch scripts.
CLAUDE.md for DeepSpeed
## DeepSpeed Stack
- Version: deepspeed >= 0.14, transformers >= 4.40, accelerate >= 0.30
- Native: deepspeed.initialize(model, optimizer, config=ds_config) → engine, optimizer, _, _
- HF Trainer: TrainingArguments(deepspeed="ds_config.json", bf16=True) — no manual init needed
- Accelerate: DeepSpeedPlugin(zero_stage=2) → Accelerator(deepspeed_plugin) → accelerator.prepare()
- ZeRO stages: 0=none, 1=optimizer states, 2=+gradients, 3=+parameters
- Offload: zero_optimization.offload_optimizer.device = "cpu" | "nvme"
- Launch: deepspeed --num_gpus N train.py --deepspeed ds_config.json
ZeRO Training Script
# train/deepspeed_train.py — DeepSpeed ZeRO distributed training
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
get_cosine_schedule_with_warmup,
)
import deepspeed
# ── ZeRO config builders ──────────────────────────────────────────────────────
def zero2_config(
batch_size: int = 16,
micro_batch: int = 2,
bf16: bool = True,
) -> dict:
"""ZeRO-2: partition optimizer states + gradients across GPUs."""
return {
"train_batch_size": batch_size,
"train_micro_batch_size_per_gpu": micro_batch,
"gradient_accumulation_steps": batch_size // micro_batch,
"bf16": {"enabled": bf16},
"fp16": {"enabled": not bf16},
"zero_optimization": {
"stage": 2,
"allgather_partitions": True,
"allgather_bucket_size": 200_000_000,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 200_000_000,
"contiguous_gradients": True,
},
"gradient_clipping": 1.0,
"steps_per_print": 50,
"wall_clock_breakdown": False,
}
def zero3_offload_config(
batch_size: int = 16,
micro_batch: int = 1,
offload_device: str = "cpu", # "cpu" or "nvme"
) -> dict:
"""ZeRO-3 + CPU offload: partition everything including model params."""
return {
"train_batch_size": batch_size,
"train_micro_batch_size_per_gpu": micro_batch,
"gradient_accumulation_steps": batch_size // micro_batch,
"bf16": {"enabled": True},
"zero_optimization": {
"stage": 3,
"overlap_comm": True,
"contiguous_gradients": True,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": True,
"offload_optimizer": {
"device": offload_device,
"pin_memory": True,
},
"offload_param": {
"device": offload_device,
"pin_memory": True,
},
},
"gradient_clipping": 1.0,
}
# ── Training loop ─────────────────────────────────────────────────────────────
class TextDataset(Dataset):
"""Simple tokenized text dataset."""
def __init__(self, texts: list[str], tokenizer, max_length: int = 512):
self.encodings = tokenizer(
texts,
truncation=True,
max_length=max_length,
padding="max_length",
return_tensors="pt",
)
def __len__(self) -> int:
return len(self.encodings["input_ids"])
def __getitem__(self, idx: int) -> dict[str, torch.Tensor]:
return {k: v[idx] for k, v in self.encodings.items()}
def get_dataloader(tokenizer, texts: list[str], batch_size: int = 4) -> DataLoader:
dataset = TextDataset(texts, tokenizer)
return DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
def train_with_deepspeed(
model_id: str = "meta-llama/Llama-3.2-3B-Instruct",
output_dir: str = "outputs/ds-finetune",
epochs: int = 3,
zero_stage: int = 2,
use_offload: bool = False,
) -> None:
"""Full DeepSpeed ZeRO training loop."""
# Load model + tokenizer
model = AutoModelForCausalLM.from_pretrained(
model_id,
torch_dtype=torch.bfloat16,
use_cache=False,
)
model.gradient_checkpointing_enable()
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Build ZeRO config
MICRO_BATCH = 2
GLOBAL_BATCH = 16
if zero_stage == 3 or use_offload:
ds_config = zero3_offload_config(
batch_size=GLOBAL_BATCH,
micro_batch=MICRO_BATCH,
offload_device="cpu" if use_offload else "none",
)
else:
ds_config = zero2_config(batch_size=GLOBAL_BATCH, micro_batch=MICRO_BATCH)
# Optimizer (AdamW — DeepSpeed will partition states with ZeRO)
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)
# Scheduler
total_steps = 1000 # Adjust to dataset size * epochs / grad_accum
scheduler = get_cosine_schedule_with_warmup(
optimizer, num_warmup_steps=int(0.05 * total_steps), num_training_steps=total_steps
)
# Initialize DeepSpeed engine
ds_config["scheduler"] = {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 2e-5,
"warmup_num_steps": int(0.05 * total_steps),
"total_num_steps": total_steps,
},
}
engine, optimizer, _, scheduler = deepspeed.initialize(
model=model,
optimizer=optimizer,
lr_scheduler=scheduler,
config=ds_config,
)
# Dummy training data (replace with real dataset)
sample_texts = ["The quick brown fox jumps over the lazy dog."] * 100
dataloader = get_dataloader(tokenizer, sample_texts, batch_size=MICRO_BATCH)
# Training loop
engine.train()
global_step = 0
for epoch in range(epochs):
for batch in dataloader:
input_ids = batch["input_ids"].to(engine.local_rank)
attention_mask = batch["attention_mask"].to(engine.local_rank)
# Labels = input_ids shifted left (causal LM)
labels = input_ids.clone()
labels[labels == tokenizer.pad_token_id] = -100
outputs = engine(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels,
)
loss = outputs.loss
engine.backward(loss)
engine.step()
global_step += 1
if global_step % 50 == 0 and engine.local_rank == 0:
print(f"Epoch {epoch+1} | Step {global_step} | loss={loss.item():.4f}")
# Save checkpoint
Path(output_dir).mkdir(parents=True, exist_ok=True)
engine.save_checkpoint(output_dir, tag=f"epoch-{epochs}")
if engine.local_rank == 0:
tokenizer.save_pretrained(output_dir)
print(f"Checkpoint saved: {output_dir}")
# ── HuggingFace Trainer + DeepSpeed ──────────────────────────────────────────
def save_ds_config(config: dict, path: str = "ds_config.json") -> str:
"""Save DeepSpeed config to JSON file for TrainingArguments."""
with open(path, "w") as f:
json.dump(config, f, indent=2)
print(f"DeepSpeed config saved: {path}")
return path
def train_with_hf_trainer(
model_id: str = "meta-llama/Llama-3.2-3B-Instruct",
output_dir: str = "outputs/hf-ds-finetune",
zero_stage: int = 2,
) -> None:
"""HuggingFace Trainer with DeepSpeed — simplest integration path."""
from datasets import load_dataset
from transformers import Trainer, TrainingArguments, DataCollatorForLanguageModeling
# Save config to disk (Trainer expects a file path)
ds_config = zero2_config() if zero_stage == 2 else zero3_offload_config()
# Remove batch size — Trainer manages this
ds_config.pop("train_batch_size", None)
ds_config.pop("train_micro_batch_size_per_gpu", None)
ds_config.pop("gradient_accumulation_steps", None)
config_path = save_ds_config(ds_config, "ds_config.json")
model = AutoModelForCausalLM.from_pretrained(model_id, use_cache=False)
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load and tokenize dataset
raw = load_dataset("iamtarun/python_code_instructions_18k_alpaca", split="train[:2000]")
tokenized = raw.map(
lambda ex: tokenizer(
ex["output"], truncation=True, max_length=512, padding="max_length"
),
batched=True,
remove_columns=raw.column_names,
)
training_args = TrainingArguments(
output_dir=output_dir,
deepspeed=config_path, # ← DeepSpeed integration point
num_train_epochs=3,
per_device_train_batch_size=2,
gradient_accumulation_steps=4,
gradient_checkpointing=True,
bf16=True,
learning_rate=2e-5,
lr_scheduler_type="cosine",
warmup_ratio=0.05,
logging_steps=25,
save_steps=200,
save_total_limit=2,
report_to=["tensorboard"],
dataloader_pin_memory=False,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized,
data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
print(f"Training with DeepSpeed ZeRO-{zero_stage}...")
trainer.train()
trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved: {output_dir}")
# ── Accelerate + DeepSpeed ────────────────────────────────────────────────────
def train_with_accelerate(
model_id: str = "meta-llama/Llama-3.2-3B-Instruct",
output_dir: str = "outputs/accel-ds-finetune",
zero_stage: int = 2,
) -> None:
"""Accelerate-based training loop with DeepSpeed plugin."""
from accelerate import Accelerator
from accelerate.utils import DeepSpeedPlugin
plugin = DeepSpeedPlugin(
zero_stage=zero_stage,
gradient_accumulation_steps=4,
gradient_clipping=1.0,
offload_optimizer_device="cpu" if zero_stage == 3 else "none",
offload_param_device="cpu" if zero_stage == 3 else "none",
)
accelerator = Accelerator(
mixed_precision="bf16",
deepspeed_plugin=plugin,
)
model = AutoModelForCausalLM.from_pretrained(model_id, use_cache=False)
tokenizer = AutoTokenizer.from_pretrained(model_id)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model.gradient_checkpointing_enable()
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
sample_texts = ["Accelerate + DeepSpeed training example."] * 50
dataloader = get_dataloader(tokenizer, sample_texts, batch_size=2)
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=50,
num_training_steps=len(dataloader) * 3,
)
for epoch in range(3):
model.train()
for batch in dataloader:
input_ids = batch["input_ids"]
labels = input_ids.clone()
labels[labels == tokenizer.pad_token_id] = -100
outputs = model(input_ids=input_ids, labels=labels)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
accelerator.wait_for_everyone()
unwrapped = accelerator.unwrap_model(model)
if accelerator.is_main_process:
Path(output_dir).mkdir(parents=True, exist_ok=True)
unwrapped.save_pretrained(output_dir, safe_serialization=True)
tokenizer.save_pretrained(output_dir)
print(f"Accelerate+DeepSpeed model saved: {output_dir}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--mode", choices=["native", "trainer", "accelerate"], default="trainer")
parser.add_argument("--zero-stage", type=int, choices=[0, 1, 2, 3], default=2)
parser.add_argument("--offload", action="store_true")
args = parser.parse_args()
if args.mode == "native":
train_with_deepspeed(zero_stage=args.zero_stage, use_offload=args.offload)
elif args.mode == "trainer":
train_with_hf_trainer(zero_stage=args.zero_stage)
else:
train_with_accelerate(zero_stage=args.zero_stage)
Launch commands:
# Single node, 8 GPUs — ZeRO-2
deepspeed --num_gpus 8 train/deepspeed_train.py --mode trainer --zero-stage 2
# Multi-node (hostfile lists node IPs and GPU counts)
# hostfile: worker1 slots=8
# worker2 slots=8
deepspeed --hostfile hostfile --num_nodes 2 --num_gpus 8 train/deepspeed_train.py --mode native --zero-stage 3 --offload
# With Accelerate launcher (reads accelerate config or plugin)
accelerate launch --multi_gpu --num_processes 8 train/deepspeed_train.py --mode accelerate
For the FSDP (Fully Sharded Data Parallel) alternative when staying within PyTorch native ecosystem and needing tight integration with torch.compile and the latest PyTorch performance features without an external C++ extension — PyTorch FSDP provides similar parameter sharding to ZeRO-3 but is built into PyTorch while DeepSpeed’s ZeRO-3 with CPU/NVMe offload is the only option for training models larger than total GPU memory and has battle-tested support for 70B+ parameter LLMs on commodity hardware. For the Megatron-LM alternative when training truly massive models (175B+) that require tensor parallelism and pipeline parallelism tightly fused with the transformer architecture — Megatron-LM requires rewriting the model to its specific layer implementations while DeepSpeed integrates with any HuggingFace model through a config file, making it the practical choice for most fine-tuning workflows. The Claude Skills 360 bundle includes DeepSpeed skill sets covering ZeRO config generation, HuggingFace Trainer integration, Accelerate plugin setup, and multi-node launch scripts. Start with the free tier to try distributed LLM training generation.