FastAPI’s type-driven development model — where Pydantic schemas define both validation and documentation — makes it excellent for building self-documenting APIs. Claude Code generates FastAPI code with correct async patterns, proper dependency chains, and Pydantic v2 models that validate correctly.
This guide covers advanced FastAPI with Claude Code: dependency injection, background tasks, WebSocket endpoints, middleware, and async testing.
CLAUDE.md for FastAPI Projects
## FastAPI Architecture
- Python 3.12, FastAPI 0.110, Pydantic v2
- Async SQLAlchemy 2.0 with asyncpg driver
- Authentication: JWT (python-jose), refresh tokens in httpOnly cookies
- Background tasks: Celery + Redis (not FastAPI BackgroundTasks — those die with request)
- Testing: pytest + pytest-asyncio + httpx AsyncClient
## Patterns
- All DB operations: async Session — never use sync Session
- Pydantic models: Input models (request), Output models (response) — separate classes
- Dependencies: yield pattern for DB session, raise HTTPException in deps for auth errors
- Never expose internal exceptions to clients — catch in exception handlers
- Repository pattern: crud.py per resource, never raw SQL in routers
Dependency Injection
Create a dependency chain that:
1. Gets DB session
2. Gets current user from JWT token
3. Verifies user has a specific permission
# app/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError
from app.database import async_session_factory
from app.models import User
from app.crud import users as users_crud
from app.config import settings
security = HTTPBearer()
# Dependency 1: Database session (yield pattern — auto-closes on request end)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Dependency 2: Current authenticated user
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
credentials.credentials,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await users_crud.get(db, id=user_id)
if user is None or not user.is_active:
raise credentials_exception
return user
# Dependency 3: Permission check factory (returns a dependency)
def require_permission(permission: str):
async def check_permission(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if not current_user.has_permission(permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission required: {permission}",
)
return current_user
return check_permission
# Type aliases for cleaner route signatures
DBSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(require_permission("admin"))]
# app/routers/orders.py
from fastapi import APIRouter, HTTPException, status, BackgroundTasks
from app.dependencies import DBSession, CurrentUser, AdminUser
from app.schemas.orders import OrderCreate, OrderResponse, OrderListResponse
from app.crud import orders as orders_crud
from app.tasks import send_order_confirmation
router = APIRouter(prefix="/orders", tags=["orders"])
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(
body: OrderCreate,
db: DBSession,
current_user: CurrentUser,
background_tasks: BackgroundTasks,
):
order = await orders_crud.create(db, user_id=current_user.id, data=body)
# Queue confirmation email (runs after response is sent)
background_tasks.add_task(send_order_confirmation, order_id=str(order.id))
return order
@router.get("/", response_model=OrderListResponse)
async def list_orders(
db: DBSession,
current_user: CurrentUser,
page: int = 1,
per_page: int = 20,
):
orders, total = await orders_crud.get_by_user(
db, user_id=current_user.id, page=page, per_page=per_page,
)
return OrderListResponse(
items=orders,
total=total,
page=page,
pages=(total + per_page - 1) // per_page,
)
# Admin-only endpoint
@router.patch("/{order_id}/status", response_model=OrderResponse)
async def update_order_status(
order_id: str,
status_update: dict,
db: DBSession,
_admin: AdminUser, # _ signals "not used in body, just for auth"
):
order = await orders_crud.get(db, id=order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return await orders_crud.update_status(db, order=order, status=status_update["status"])
Pydantic v2 Models
Create Pydantic models for Order with proper validation,
computed fields, and different schemas for create/read.
# app/schemas/orders.py
from datetime import datetime
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel, Field, computed_field, model_validator, UUID4
class OrderItemCreate(BaseModel):
product_id: UUID4
quantity: int = Field(gt=0, le=100)
class OrderCreate(BaseModel):
items: list[OrderItemCreate] = Field(min_length=1, max_length=50)
shipping_address_id: UUID4
@model_validator(mode='after')
def check_no_duplicate_products(self) -> 'OrderCreate':
product_ids = [item.product_id for item in self.items]
if len(product_ids) != len(set(product_ids)):
raise ValueError("Duplicate products in order — use quantity instead")
return self
class OrderItemResponse(BaseModel):
id: UUID4
product_id: UUID4
product_name: str
quantity: int
price_cents: int
@computed_field
@property
def line_total_cents(self) -> int:
return self.price_cents * self.quantity
model_config = {"from_attributes": True} # For SQLAlchemy model → schema conversion
class OrderResponse(BaseModel):
id: UUID4
user_id: UUID4
status: str
items: list[OrderItemResponse]
total_cents: int
created_at: datetime
updated_at: datetime
@computed_field
@property
def total_usd(self) -> Decimal:
return Decimal(self.total_cents) / 100
model_config = {"from_attributes": True}
class OrderListResponse(BaseModel):
items: list[OrderResponse]
total: int
page: int
pages: int
WebSocket Endpoints
Add a WebSocket endpoint for real-time order status updates.
Only authenticated users should connect, and only for their own orders.
# app/routers/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, status
from app.dependencies import get_db
from app.crud import users as users_crud, orders as orders_crud
from jose import jwt, JWTError
from app.config import settings
import asyncio
import json
router = APIRouter()
class ConnectionManager:
"""Track WebSocket connections per user."""
def __init__(self):
# {user_id: set of WebSocket connections}
self._connections: dict[str, set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self._connections.setdefault(user_id, set()).add(websocket)
def disconnect(self, websocket: WebSocket, user_id: str):
if user_id in self._connections:
self._connections[user_id].discard(websocket)
async def send_to_user(self, user_id: str, message: dict):
"""Broadcast to all of a user's connections (multiple tabs/devices)."""
connections = self._connections.get(user_id, set())
dead = set()
for ws in connections:
try:
await ws.send_text(json.dumps(message))
except Exception:
dead.add(ws)
for ws in dead:
connections.discard(ws)
manager = ConnectionManager()
@router.websocket("/ws/orders")
async def order_updates(websocket: WebSocket):
# Auth: token passed as query param (WS doesn't support headers easily)
token = websocket.query_params.get("token")
if not token:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
user_id = payload.get("sub")
except JWTError:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket, user_id)
try:
while True:
# Keep connection alive — client can send pings
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
# Called from order status update logic:
async def notify_order_update(user_id: str, order_id: str, new_status: str):
await manager.send_to_user(user_id, {
"type": "order_status_changed",
"order_id": order_id,
"status": new_status,
})
Async Testing with pytest-asyncio
Write async tests for the orders endpoints.
Use httpx AsyncClient — not the sync TestClient.
# tests/test_orders.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.main import app
from app.dependencies import get_db
from app.models import Base
from tests.factories import UserFactory, ProductFactory
TEST_DB_URL = "postgresql+asyncpg://test:test@localhost/test_db"
@pytest_asyncio.fixture
async def db():
engine = create_async_engine(TEST_DB_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
async with session_factory() as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest_asyncio.fixture
async def client(db):
app.dependency_overrides[get_db] = lambda: db
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
app.dependency_overrides.clear()
@pytest_asyncio.fixture
async def auth_headers(db, client):
user = await UserFactory.create(db)
response = await client.post("/auth/login", json={
"email": user.email,
"password": "testpassword",
})
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}, user
class TestCreateOrder:
async def test_creates_order_successfully(self, db, client, auth_headers):
headers, user = auth_headers
product = await ProductFactory.create(db, stock=10, price_cents=2000)
address = await ShippingAddressFactory.create(db, user=user)
response = await client.post(
"/orders/",
json={
"items": [{"product_id": str(product.id), "quantity": 2}],
"shipping_address_id": str(address.id),
},
headers=headers,
)
assert response.status_code == 201
data = response.json()
assert data["total_cents"] == 4000
assert data["status"] == "pending"
assert len(data["items"]) == 1
async def test_rejects_insufficient_stock(self, db, client, auth_headers):
headers, user = auth_headers
product = await ProductFactory.create(db, stock=1)
address = await ShippingAddressFactory.create(db, user=user)
response = await client.post(
"/orders/",
json={"items": [{"product_id": str(product.id), "quantity": 5}], "shipping_address_id": str(address.id)},
headers=headers,
)
assert response.status_code == 422
async def test_requires_authentication(self, client, db):
response = await client.post("/orders/", json={"items": [], "shipping_address_id": "test"})
assert response.status_code == 403
For Python microservices running on Kubernetes, see the Kubernetes guide. For integrating machine learning models and embeddings with FastAPI, see the machine learning guide. The Claude Skills 360 bundle includes Python FastAPI skill sets covering async patterns, streaming responses, and production deployment. Start with the free tier to try FastAPI code generation.