FastAPI combines Python type hints, Pydantic validation, and async support into a high-productivity API framework. The advanced patterns — dependency injection for auth and database access, background tasks for async work, WebSocket handlers, and lifespan events for startup/shutdown — require deliberate design. Claude Code implements FastAPI applications that are testable, deploy well, and leverage Pydantic v2’s performance improvements.
CLAUDE.md for FastAPI Projects
## FastAPI Stack
- Python 3.12+, FastAPI 0.111+, Pydantic v2
- Database: async SQLAlchemy 2.0 with asyncpg driver
- Auth: JWT tokens (python-jose), OAuth2PasswordBearer
- Background tasks: FastAPI BackgroundTasks for quick tasks, Celery for heavy work
- Testing: pytest-asyncio, httpx AsyncClient
- Migrations: Alembic
- All endpoints are async — no sync DB calls in route handlers
- Exception handling: custom HTTPException subclasses, not raw HTTP status codes
- Settings: pydantic-settings with .env file for local dev
Application Setup with Lifespan
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import logging
from .database import engine, Base
from .routers import orders, auth, users
from .settings import settings
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting application...")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # Dev only; use Alembic in prod
# Initialize connection pools, caches, etc.
yield
# Shutdown
logger.info("Shutting down...")
await engine.dispose()
app = FastAPI(
title="Order API",
version="1.0.0",
lifespan=lifespan,
docs_url="/api/docs",
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(orders.router, prefix="/api/orders", tags=["orders"])
app.include_router(users.router, prefix="/api/users", tags=["users"])
Dependency Injection
# dependencies.py — reusable dependencies for auth and database
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError
from .database import async_session_factory
from .settings import settings
# Database session dependency
async def get_db() -> AsyncSession:
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Auth dependency
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await db.get(User, user_id)
if user is None or not user.is_active:
raise credentials_exception
return user
# Role-based auth: factory function returns a dependency
def require_role(*roles: str):
async def dependency(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return current_user
return dependency
Route Handlers
# routers/orders.py
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks, Query
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from ..dependencies import get_db, get_current_user, require_role
from ..models import Order, OrderItem
from ..schemas import OrderCreate, OrderResponse, OrderListResponse
from ..tasks import send_order_confirmation
router = APIRouter()
@router.post("/", response_model=OrderResponse, status_code=201)
async def create_order(
order_in: OrderCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> OrderResponse:
# Validate items exist
for item in order_in.items:
product = await db.get(Product, item.product_id)
if not product:
raise HTTPException(
status_code=404,
detail=f"Product {item.product_id} not found",
)
order = Order(
customer_id=current_user.id,
total_cents=sum(i.quantity * i.unit_price_cents for i in order_in.items),
status="PENDING",
)
db.add(order)
await db.flush() # Get the order ID without committing
for item in order_in.items:
db.add(OrderItem(
order_id=order.id,
product_id=item.product_id,
quantity=item.quantity,
unit_price_cents=item.unit_price_cents,
))
# Commit happens in get_db dependency
await db.commit()
await db.refresh(order)
# Non-blocking background task
background_tasks.add_task(send_order_confirmation, order.id, current_user.email)
return OrderResponse.model_validate(order)
@router.get("/", response_model=OrderListResponse)
async def list_orders(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
status: str | None = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> OrderListResponse:
query = (
select(Order)
.where(Order.customer_id == current_user.id)
.options(selectinload(Order.items).selectinload(OrderItem.product))
.order_by(Order.created_at.desc())
)
if status:
query = query.where(Order.status == status)
total = await db.scalar(select(func.count()).select_from(query.subquery()))
orders = (await db.execute(
query.offset((page - 1) * page_size).limit(page_size)
)).scalars().all()
return OrderListResponse(
items=[OrderResponse.model_validate(o) for o in orders],
total=total,
page=page,
page_size=page_size,
)
Pydantic v2 Schemas
# schemas.py
from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import datetime
class OrderItemCreate(BaseModel):
product_id: str
quantity: int = Field(gt=0, le=100)
unit_price_cents: int = Field(gt=0)
class OrderCreate(BaseModel):
items: list[OrderItemCreate] = Field(min_length=1)
payment_method_id: str
@field_validator('items')
@classmethod
def items_must_be_unique(cls, items: list[OrderItemCreate]) -> list[OrderItemCreate]:
product_ids = [i.product_id for i in items]
if len(product_ids) != len(set(product_ids)):
raise ValueError('Duplicate product IDs in order')
return items
class OrderResponse(BaseModel):
id: str
customer_id: str
status: str
total_cents: int
created_at: datetime
items: list['OrderItemResponse'] = []
model_config = {'from_attributes': True} # Enable ORM mode
@property
def total_dollars(self) -> float:
return self.total_cents / 100
WebSocket Endpoint
# WebSocket for real-time order status updates
from fastapi import WebSocket, WebSocketDisconnect
class OrderStatusManager:
def __init__(self):
self._connections: dict[str, set[WebSocket]] = {} # customer_id -> connections
async def connect(self, customer_id: str, websocket: WebSocket):
await websocket.accept()
self._connections.setdefault(customer_id, set()).add(websocket)
def disconnect(self, customer_id: str, websocket: WebSocket):
if customer_id in self._connections:
self._connections[customer_id].discard(websocket)
async def notify(self, customer_id: str, message: dict):
dead = set()
for ws in self._connections.get(customer_id, set()):
try:
await ws.send_json(message)
except:
dead.add(ws)
for ws in dead:
self.disconnect(customer_id, ws)
status_manager = OrderStatusManager()
@router.websocket("/ws/{customer_id}")
async def order_status_ws(
websocket: WebSocket,
customer_id: str,
token: str = Query(...),
):
# Validate token before accepting
try:
payload = jwt.decode(token, settings.jwt_secret, algorithms=["HS256"])
if payload.get("sub") != customer_id:
await websocket.close(code=4403)
return
except JWTError:
await websocket.close(code=4401)
return
await status_manager.connect(customer_id, websocket)
try:
while True:
await websocket.receive_text() # Keep connection open, handle pings
except WebSocketDisconnect:
status_manager.disconnect(customer_id, websocket)
Testing
# tests/test_orders.py
import pytest
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
@pytest.fixture
async def db_session():
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(engine, expire_on_commit=False)
async with factory() as session:
yield session
await engine.dispose()
@pytest.fixture
async def client(db_session):
app.dependency_overrides[get_db] = lambda: db_session
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.asyncio
async def test_create_order(client, auth_headers):
response = await client.post(
"/api/orders/",
json={"items": [{"product_id": "prod-1", "quantity": 2, "unit_price_cents": 1000}],
"payment_method_id": "pm_test"},
headers=auth_headers,
)
assert response.status_code == 201
assert response.json()["status"] == "PENDING"
For the async background task patterns that handle long-running work from FastAPI endpoints, see the MLOps guide for Celery/async processing. For the authentication patterns paired with FastAPI, the OAuth2 guide covers JWT and session management. The Claude Skills 360 bundle includes FastAPI skill sets covering dependency injection, Pydantic v2, async SQLAlchemy, and WebSocket patterns. Start with the free tier to try FastAPI application generation.