Locust is a distributed load testing tool written in Python. pip install locust. from locust import HttpUser, task, between. User class: class WebUser(HttpUser): host="https://api.example.com"; wait_time=between(1, 5). Task: @task def get_items(self): self.client.get("/items"). Weight: @task(3) def heavy_task(self) — 3× more often. TaskSet: from locust import TaskSet, tag, class UserTasks(TaskSet): @task def browse(self). Test: class AppUser(HttpUser): tasks=[UserTasks]. Auth: self.client.headers.update({"Authorization": "Bearer token"}). On start: def on_start(self): self.token = self.client.post("/login", json={...}).json()["token"]. Post: self.client.post("/orders", json={"qty":1}). Catch: with self.client.get("/status", catch_response=True) as r: if r.json()["status"]!="ok": r.failure("bad status"). Name: self.client.get(f"/users/{uid}", name="/users/[id]") — group URLs. CSV: locust --headless -u 100 -r 10 --run-time 60s --csv results --host https://api. Web: locust -f locustfile.py then visit http://localhost:8089. Distributed: locust --master + locust --worker. Shape: from locust import LoadTestShape, class StepShape(LoadTestShape): def tick(self): run_time = self.get_run_time(). FastHttp: from locust.contrib.fasthttp import FastHttpUser — faster C-based client. Events: from locust import events; @events.test_start.add_listener. Stats: environment.stats.total.avg_response_time. Claude Code generates Locust load test suites, custom shape profiles, and CI regression performance checks.
CLAUDE.md for Locust
## Locust Stack
- Version: locust >= 2.20
- User: class MyUser(HttpUser): host=..., wait_time=between(min, max)
- Tasks: @task(weight=1) — heavier weight = called more often
- Groups: TaskSet class | tasks=[TaskSet] on User
- Auth: on_start() to login | self.client.headers for session-level auth
- URL grouping: self.client.get(path, name="/template/[id]")
- Custom assertions: catch_response=True context manager
- CI: locust --headless -u USERS -r SPAWN_RATE --run-time DURATION --csv out
Locust Load Testing Pipeline
# tests/locustfile.py — load testing with Locust
# Run: locust -f tests/locustfile.py --headless -u 50 -r 5 --run-time 60s --host http://localhost:8000
from __future__ import annotations
import json
import random
import time
from typing import Any
from locust import (
HttpUser, FastHttpUser, LoadTestShape,
TaskSet, between, constant, events,
tag, task,
)
from locust.contrib.fasthttp import FastHttpUser
# ── 0. Simple API load test ───────────────────────────────────────────────────
class SimpleApiUser(HttpUser):
"""
Basic HttpUser simulating a REST API client.
Each user runs tasks at random with 1–3 second think time between requests.
"""
host = "http://localhost:8000"
wait_time = between(1, 3)
def on_start(self) -> None:
"""Called once per simulated user at spawn time — authenticate here."""
resp = self.client.post(
"/auth/login",
json={"username": "testuser", "password": "testpassword"},
name="/auth/login",
)
if resp.status_code == 200:
token = resp.json().get("access_token", "")
self.client.headers.update({"Authorization": f"Bearer {token}"})
else:
resp.failure(f"Login failed with status {resp.status_code}")
@task(5)
def list_items(self) -> None:
"""Heavyweight read — call 5× more often than create."""
page = random.randint(1, 10)
self.client.get(f"/items?page={page}&limit=20", name="/items")
@task(2)
def get_item(self) -> None:
"""Single item fetch with URL grouping."""
item_id = random.randint(1, 1000)
self.client.get(f"/items/{item_id}", name="/items/[id]")
@task(1)
def create_item(self) -> None:
"""Write operation — call least often."""
payload = {
"name": f"item_{random.randint(1, 10000)}",
"quantity": random.randint(1, 100),
"price": round(random.uniform(0.99, 999.99), 2),
}
with self.client.post("/items", json=payload, catch_response=True,
name="/items [POST]") as resp:
if resp.status_code == 201:
resp.success()
elif resp.status_code == 400:
resp.failure(f"Validation error: {resp.text[:200]}")
else:
resp.failure(f"Unexpected status: {resp.status_code}")
@task(1)
def search_items(self) -> None:
"""Search with custom response validation."""
q = random.choice(["widget", "gadget", "device", "tool"])
with self.client.get(f"/items/search?q={q}", catch_response=True,
name="/items/search") as resp:
if resp.status_code != 200:
resp.failure(f"Search returned {resp.status_code}")
return
data = resp.json()
if not isinstance(data, (list, dict)):
resp.failure("Search response is not JSON list or dict")
else:
resp.success()
def on_stop(self) -> None:
"""Cleanup: logout when user stops."""
self.client.post("/auth/logout", name="/auth/logout")
# ── 1. TaskSet for grouped workflows ─────────────────────────────────────────
class BrowseTaskSet(TaskSet):
"""
Simulate a user browsing a catalog.
TaskSets group related tasks; users can nest multiple TaskSets.
"""
def on_start(self):
self.category_ids = list(range(1, 20))
@task(3)
@tag("catalog", "read")
def view_catalog(self) -> None:
cat = random.choice(self.category_ids)
self.client.get(f"/categories/{cat}/items", name="/categories/[id]/items")
@task(1)
@tag("catalog", "write")
def add_to_cart(self) -> None:
item_id = random.randint(1, 500)
self.client.post("/cart/items", json={"item_id": item_id, "qty": 1},
name="/cart/items [POST]")
@task(1)
@tag("catalog")
def view_recommendations(self) -> None:
self.client.get("/recommendations", name="/recommendations")
class CheckoutTaskSet(TaskSet):
"""Simulate the checkout flow."""
@task
def view_cart(self) -> None:
self.client.get("/cart", name="/cart")
@task
def apply_coupon(self) -> None:
code = random.choice(["SAVE10", "FIRST20", "SUMMER"])
with self.client.post("/cart/coupon", json={"code": code},
catch_response=True, name="/cart/coupon") as resp:
if resp.status_code in (200, 400):
resp.success() # Both valid and invalid coupons are expected
else:
resp.failure(f"Unexpected: {resp.status_code}")
@task
def checkout(self) -> None:
payload = {
"payment_method": "card",
"address_id": random.randint(1, 5),
}
with self.client.post("/orders", json=payload, catch_response=True,
name="/orders [POST]") as resp:
if resp.status_code == 201:
order_id = resp.json().get("id")
resp.success()
else:
resp.failure(f"Checkout failed: {resp.status_code}")
class EcommerceUser(HttpUser):
"""Full e-commerce user that browses and then sometimes checks out."""
host = "http://localhost:8000"
wait_time = between(0.5, 2.0)
tasks = {BrowseTaskSet: 4, CheckoutTaskSet: 1}
# ── 2. Custom load shape ──────────────────────────────────────────────────────
class StepLoadShape(LoadTestShape):
"""
A step load profile:
- Ramp up in steps of 10 users every 30 seconds
- Hold peak for 60 seconds
- Ramp down
Replaces --users / --spawn-rate CLI flags when active.
"""
step_time = 30 # seconds per step
step_users = 10 # users added each step
spawn_rate = 5 # users per second per step
peak_users = 100 # max users
hold_seconds = 60 # hold at peak
def tick(self):
run_time = self.get_run_time()
# Calculate ramp-up steps
step_count = int(run_time / self.step_time)
ramp_users = step_count * self.step_users
if ramp_users < self.peak_users:
# Still ramping
return (ramp_users, self.spawn_rate)
# Hold at peak
ramp_duration = (self.peak_users / self.step_users) * self.step_time
if run_time < ramp_duration + self.hold_seconds:
return (self.peak_users, self.spawn_rate)
# Ramp down
down_time = run_time - (ramp_duration + self.hold_seconds)
down_users = max(0, self.peak_users - int(down_time / self.step_time) * self.step_users)
if down_users > 0:
return (down_users, self.spawn_rate)
return None # None signals Locust to stop the test
class SpikeShape(LoadTestShape):
"""
Simulate traffic spikes: stable low → spike → back to stable.
Useful for testing auto-scaling and circuit breakers.
"""
stages = [
{"duration": 30, "users": 10, "spawn_rate": 5}, # warm-up
{"duration": 60, "users": 10, "spawn_rate": 5}, # stable
{"duration": 75, "users": 200, "spawn_rate": 50}, # spike
{"duration": 90, "users": 10, "spawn_rate": 5}, # recover
{"duration": 120, "users": 10, "spawn_rate": 5}, # stable
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time <= stage["duration"]:
return (stage["users"], stage["spawn_rate"])
return None
# ── 3. High-throughput with FastHttpUser ──────────────────────────────────────
class HighThroughputUser(FastHttpUser):
"""
Use FastHttpUser (gevent-based C client) for >10k RPS scenarios.
API is the same as HttpUser but uses a faster underlying HTTP client.
"""
host = "http://localhost:8000"
wait_time = constant(0) # no wait — maximum throughput
@task
def ping(self) -> None:
self.client.get("/health")
@task(2)
def read_heavy(self) -> None:
uid = random.randint(1, 50000)
self.client.get(f"/users/{uid}", name="/users/[id]")
# ── 4. Event hooks for custom reporting ──────────────────────────────────────
@events.test_start.add_listener
def on_test_start(environment, **kwargs) -> None:
"""Called once when the load test starts."""
print(f"\n[Locust] Test starting against {environment.host}")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs) -> None:
"""Called once when the test ends — print a summary."""
stats = environment.stats
total = stats.total
print(f"\n[Locust] Test complete:")
print(f" Total requests: {total.num_requests}")
print(f" Failures: {total.num_failures} ({total.fail_ratio:.1%})")
print(f" Avg response time: {total.avg_response_time:.0f}ms")
print(f" p95 response time: {total.get_response_time_percentile(0.95):.0f}ms")
print(f" RPS: {total.current_rps:.1f}")
@events.request.add_listener
def on_request(
request_type, name, response_time, response_length,
exception, context, **kwargs
) -> None:
"""Called after every request — useful for sending to a time-series DB."""
if exception:
# Example: send to Prometheus/Datadog
pass
# ── 5. CI helper — run and check thresholds ───────────────────────────────────
PERFORMANCE_THRESHOLDS = {
"avg_response_time_ms": 200,
"p95_response_time_ms": 500,
"error_rate_pct": 1.0,
"min_rps": 10.0,
}
def check_thresholds(stats) -> list[str]:
"""
Check performance stats against defined thresholds.
Returns list of failure messages (empty = passed).
"""
failures = []
total = stats.total
p95 = total.get_response_time_percentile(0.95)
avg = total.avg_response_time
err_pct = total.fail_ratio * 100
rps = total.current_rps
if avg > PERFORMANCE_THRESHOLDS["avg_response_time_ms"]:
failures.append(f"Avg response {avg:.0f}ms > {PERFORMANCE_THRESHOLDS['avg_response_time_ms']}ms")
if p95 > PERFORMANCE_THRESHOLDS["p95_response_time_ms"]:
failures.append(f"p95 response {p95:.0f}ms > {PERFORMANCE_THRESHOLDS['p95_response_time_ms']}ms")
if err_pct > PERFORMANCE_THRESHOLDS["error_rate_pct"]:
failures.append(f"Error rate {err_pct:.1f}% > {PERFORMANCE_THRESHOLDS['error_rate_pct']}%")
if rps < PERFORMANCE_THRESHOLDS["min_rps"]:
failures.append(f"RPS {rps:.1f} < {PERFORMANCE_THRESHOLDS['min_rps']}")
return failures
For the Apache JMeter alternative — JMeter requires XML test plan files and a Java runtime while Locust test scripts are plain Python where @task methods call self.client.get/post directly, the entire user behavior including authentication flow, random data generation, and conditional assertions lives in one readable .py file, and LoadTestShape.tick() returns (users, spawn_rate) tuples to define any ramp profile in pure Python without clicking through a GUI. For the wrk/k6 alternative — wrk is a single-URL benchmark tool and k6 requires JavaScript while Locust simulates realistic user sessions across multiple endpoints with weighted task selection, TaskSet flows like browse-then-checkout with per-user session state, events.test_stop hooks for threshold checks as CI gates, and --processes N distributes workers across CPU cores without a separate coordinator process. The Claude Skills 360 bundle includes Locust skill sets covering HttpUser with on_start login, @task with weights, URL grouping with name=, catch_response custom assertions, TaskSet grouped flows, LoadTestShape step and spike profiles, FastHttpUser for high throughput, events.test_start/stop hooks, CI threshold enforcement, and —headless CSV export. Start with the free tier to try load testing code generation.