Claude Code for FastAPI: Dependency Injection, Background Tasks, and Production Patterns — Claude Skills 360 Blog
Blog / Development / Claude Code for FastAPI: Dependency Injection, Background Tasks, and Production Patterns
Development

Claude Code for FastAPI: Dependency Injection, Background Tasks, and Production Patterns

Published: August 13, 2026
Read time: 9 min read
By: Claude Skills 360

FastAPI’s type-driven development model — where Pydantic schemas define both validation and documentation — makes it excellent for building self-documenting APIs. Claude Code generates FastAPI code with correct async patterns, proper dependency chains, and Pydantic v2 models that validate correctly.

This guide covers advanced FastAPI with Claude Code: dependency injection, background tasks, WebSocket endpoints, middleware, and async testing.

CLAUDE.md for FastAPI Projects

## FastAPI Architecture
- Python 3.12, FastAPI 0.110, Pydantic v2
- Async SQLAlchemy 2.0 with asyncpg driver
- Authentication: JWT (python-jose), refresh tokens in httpOnly cookies
- Background tasks: Celery + Redis (not FastAPI BackgroundTasks — those die with request)
- Testing: pytest + pytest-asyncio + httpx AsyncClient

## Patterns
- All DB operations: async Session — never use sync Session
- Pydantic models: Input models (request), Output models (response) — separate classes
- Dependencies: yield pattern for DB session, raise HTTPException in deps for auth errors  
- Never expose internal exceptions to clients — catch in exception handlers
- Repository pattern: crud.py per resource, never raw SQL in routers

Dependency Injection

Create a dependency chain that:
1. Gets DB session
2. Gets current user from JWT token
3. Verifies user has a specific permission
# app/dependencies.py
from typing import Annotated, AsyncGenerator
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from jose import jwt, JWTError

from app.database import async_session_factory
from app.models import User
from app.crud import users as users_crud
from app.config import settings

security = HTTPBearer()

# Dependency 1: Database session (yield pattern — auto-closes on request end)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Dependency 2: Current authenticated user
async def get_current_user(
    credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(
            credentials.credentials,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm],
        )
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = await users_crud.get(db, id=user_id)
    if user is None or not user.is_active:
        raise credentials_exception

    return user

# Dependency 3: Permission check factory (returns a dependency)
def require_permission(permission: str):
    async def check_permission(
        current_user: Annotated[User, Depends(get_current_user)],
    ) -> User:
        if not current_user.has_permission(permission):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission required: {permission}",
            )
        return current_user
    return check_permission

# Type aliases for cleaner route signatures
DBSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(require_permission("admin"))]
# app/routers/orders.py
from fastapi import APIRouter, HTTPException, status, BackgroundTasks
from app.dependencies import DBSession, CurrentUser, AdminUser
from app.schemas.orders import OrderCreate, OrderResponse, OrderListResponse
from app.crud import orders as orders_crud
from app.tasks import send_order_confirmation

router = APIRouter(prefix="/orders", tags=["orders"])

@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
async def create_order(
    body: OrderCreate,
    db: DBSession,
    current_user: CurrentUser,
    background_tasks: BackgroundTasks,
):
    order = await orders_crud.create(db, user_id=current_user.id, data=body)
    
    # Queue confirmation email (runs after response is sent)
    background_tasks.add_task(send_order_confirmation, order_id=str(order.id))
    
    return order

@router.get("/", response_model=OrderListResponse)
async def list_orders(
    db: DBSession,
    current_user: CurrentUser,
    page: int = 1,
    per_page: int = 20,
):
    orders, total = await orders_crud.get_by_user(
        db, user_id=current_user.id, page=page, per_page=per_page,
    )
    return OrderListResponse(
        items=orders,
        total=total,
        page=page,
        pages=(total + per_page - 1) // per_page,
    )

# Admin-only endpoint
@router.patch("/{order_id}/status", response_model=OrderResponse)
async def update_order_status(
    order_id: str,
    status_update: dict,
    db: DBSession,
    _admin: AdminUser,  # _ signals "not used in body, just for auth"
):
    order = await orders_crud.get(db, id=order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return await orders_crud.update_status(db, order=order, status=status_update["status"])

Pydantic v2 Models

Create Pydantic models for Order with proper validation,
computed fields, and different schemas for create/read.
# app/schemas/orders.py
from datetime import datetime
from decimal import Decimal
from typing import Optional
from pydantic import BaseModel, Field, computed_field, model_validator, UUID4

class OrderItemCreate(BaseModel):
    product_id: UUID4
    quantity: int = Field(gt=0, le=100)

class OrderCreate(BaseModel):
    items: list[OrderItemCreate] = Field(min_length=1, max_length=50)
    shipping_address_id: UUID4

    @model_validator(mode='after')
    def check_no_duplicate_products(self) -> 'OrderCreate':
        product_ids = [item.product_id for item in self.items]
        if len(product_ids) != len(set(product_ids)):
            raise ValueError("Duplicate products in order — use quantity instead")
        return self

class OrderItemResponse(BaseModel):
    id: UUID4
    product_id: UUID4
    product_name: str
    quantity: int
    price_cents: int

    @computed_field
    @property
    def line_total_cents(self) -> int:
        return self.price_cents * self.quantity

    model_config = {"from_attributes": True}  # For SQLAlchemy model → schema conversion

class OrderResponse(BaseModel):
    id: UUID4
    user_id: UUID4
    status: str
    items: list[OrderItemResponse]
    total_cents: int
    created_at: datetime
    updated_at: datetime

    @computed_field
    @property
    def total_usd(self) -> Decimal:
        return Decimal(self.total_cents) / 100

    model_config = {"from_attributes": True}

class OrderListResponse(BaseModel):
    items: list[OrderResponse]
    total: int
    page: int
    pages: int

WebSocket Endpoints

Add a WebSocket endpoint for real-time order status updates.
Only authenticated users should connect, and only for their own orders.
# app/routers/ws.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, status
from app.dependencies import get_db
from app.crud import users as users_crud, orders as orders_crud
from jose import jwt, JWTError
from app.config import settings
import asyncio
import json

router = APIRouter()

class ConnectionManager:
    """Track WebSocket connections per user."""
    def __init__(self):
        # {user_id: set of WebSocket connections}
        self._connections: dict[str, set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        self._connections.setdefault(user_id, set()).add(websocket)

    def disconnect(self, websocket: WebSocket, user_id: str):
        if user_id in self._connections:
            self._connections[user_id].discard(websocket)

    async def send_to_user(self, user_id: str, message: dict):
        """Broadcast to all of a user's connections (multiple tabs/devices)."""
        connections = self._connections.get(user_id, set())
        dead = set()
        for ws in connections:
            try:
                await ws.send_text(json.dumps(message))
            except Exception:
                dead.add(ws)
        for ws in dead:
            connections.discard(ws)

manager = ConnectionManager()

@router.websocket("/ws/orders")
async def order_updates(websocket: WebSocket):
    # Auth: token passed as query param (WS doesn't support headers easily)
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    try:
        payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])
        user_id = payload.get("sub")
    except JWTError:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await manager.connect(websocket, user_id)

    try:
        while True:
            # Keep connection alive — client can send pings
            await websocket.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)

# Called from order status update logic:
async def notify_order_update(user_id: str, order_id: str, new_status: str):
    await manager.send_to_user(user_id, {
        "type": "order_status_changed",
        "order_id": order_id,
        "status": new_status,
    })

Async Testing with pytest-asyncio

Write async tests for the orders endpoints.
Use httpx AsyncClient — not the sync TestClient.
# tests/test_orders.py
import pytest
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

from app.main import app
from app.dependencies import get_db
from app.models import Base
from tests.factories import UserFactory, ProductFactory

TEST_DB_URL = "postgresql+asyncpg://test:test@localhost/test_db"

@pytest_asyncio.fixture
async def db():
    engine = create_async_engine(TEST_DB_URL)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    session_factory = async_sessionmaker(engine, expire_on_commit=False)
    async with session_factory() as session:
        yield session

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await engine.dispose()

@pytest_asyncio.fixture
async def client(db):
    app.dependency_overrides[get_db] = lambda: db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as client:
        yield client

    app.dependency_overrides.clear()

@pytest_asyncio.fixture
async def auth_headers(db, client):
    user = await UserFactory.create(db)
    response = await client.post("/auth/login", json={
        "email": user.email,
        "password": "testpassword",
    })
    token = response.json()["access_token"]
    return {"Authorization": f"Bearer {token}"}, user

class TestCreateOrder:
    async def test_creates_order_successfully(self, db, client, auth_headers):
        headers, user = auth_headers
        product = await ProductFactory.create(db, stock=10, price_cents=2000)
        address = await ShippingAddressFactory.create(db, user=user)

        response = await client.post(
            "/orders/",
            json={
                "items": [{"product_id": str(product.id), "quantity": 2}],
                "shipping_address_id": str(address.id),
            },
            headers=headers,
        )

        assert response.status_code == 201
        data = response.json()
        assert data["total_cents"] == 4000
        assert data["status"] == "pending"
        assert len(data["items"]) == 1

    async def test_rejects_insufficient_stock(self, db, client, auth_headers):
        headers, user = auth_headers
        product = await ProductFactory.create(db, stock=1)
        address = await ShippingAddressFactory.create(db, user=user)

        response = await client.post(
            "/orders/",
            json={"items": [{"product_id": str(product.id), "quantity": 5}], "shipping_address_id": str(address.id)},
            headers=headers,
        )

        assert response.status_code == 422

    async def test_requires_authentication(self, client, db):
        response = await client.post("/orders/", json={"items": [], "shipping_address_id": "test"})
        assert response.status_code == 403

For Python microservices running on Kubernetes, see the Kubernetes guide. For integrating machine learning models and embeddings with FastAPI, see the machine learning guide. The Claude Skills 360 bundle includes Python FastAPI skill sets covering async patterns, streaming responses, and production deployment. Start with the free tier to try FastAPI code generation.

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free