Beanie is an async MongoDB ODM based on Pydantic. pip install beanie motor. Define document: from beanie import Document, Indexed; class User(Document): email: Indexed(str, unique=True); name: str. Init: await init_beanie(database=db, document_models=[User, Product]). Insert: user = await User.insert(User(email="[email protected]", name="Alice")). Insert many: await User.insert_many([...]). Find: users = await User.find(User.is_active == True).to_list(). Find one: user = await User.find_one(User.email == "[email protected]"). Get by ID: user = await User.get(id). Filter: User.find(User.age >= 18, User.role == "admin"). Sort: .sort(-User.created_at). Limit/skip: .limit(20).skip(offset). Count: await User.find(...).count(). Update: await user.set({User.name: "Bob"}). await User.find(...).update(Set({User.is_active: False})). Delete: await user.delete(). await User.find(...).delete(). Aggregate: await User.aggregate([{"$group":{"_id":"$role","count":{"$sum":1}}}]).to_list(). Link: class Order(Document): user: Link[User]. await order.fetch_link(Order.user). BackLink: class User(Document): orders: list[BackLink[Order]] = []. BulkWriter: async with BulkWriter() as bw: await User.insert(user, bulk_writer=bw). Settings override: class Settings: name="users_collection"; indexes=[...]. PydanticObjectId — Beanie’s _id type. Optional[PydanticObjectId]. Document.model_config = ConfigDict(populate_by_name=True). Indexed(str) — single field index. Indexed(str, index_type=pymongo.TEXT) — text search. @before_event(Insert, Replace) async def update_timestamp(self): self.updated_at = datetime.utcnow(). Claude Code generates Beanie Document models, aggregation pipelines, and FastAPI MongoDB CRUD services.
CLAUDE.md for Beanie
## Beanie Stack
- Version: beanie >= 1.26 | pip install beanie motor
- Init: await init_beanie(database=AsyncIOMotorDatabase, document_models=[...])
- Model: class Doc(Document): field: type = ... | Indexed(type, unique=True)
- Insert: await Doc.insert(Doc(...)) | await Doc.insert_many([...])
- Find: Doc.find(*conditions).sort().limit().skip().to_list()
- Update: await doc.set({Doc.field: val}) | Doc.find(...).update(Set({...}))
- Aggregate: Doc.aggregate([pipeline stages]).to_list()
- Links: Link[OtherDoc] + fetch_link() | BackLink for reverse references
Beanie Async MongoDB Pipeline
# app/documents.py — Beanie document model definitions
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal
from enum import Enum
from typing import Annotated, Optional
import pymongo
from beanie import (
BackLink,
BulkWriter,
Document,
Indexed,
Link,
PydanticObjectId,
WriteRules,
before_event,
)
from beanie.operators import In, Set
from pydantic import ConfigDict, EmailStr, Field
class UserRole(str, Enum):
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
class OrderStatus(str, Enum):
PENDING = "pending"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
# ── Embedded documents (not collections, just nested Pydantic models) ──────────
from pydantic import BaseModel
class Address(BaseModel):
street: str
city: str
state: str
postal_code: str
country: str = "US"
# ── Tag document ───────────────────────────────────────────────────────────────
class Tag(Document):
name: Indexed(str, unique=True) # type: ignore[valid-type]
slug: Indexed(str, unique=True) # type: ignore[valid-type]
class Settings:
name = "tags"
# ── User document ──────────────────────────────────────────────────────────────
class User(Document):
email: Indexed(EmailStr, unique=True) # type: ignore[valid-type]
first_name: str
last_name: str
role: UserRole = UserRole.USER
is_active: bool = True
address: Optional[Address] = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
# orders is populated via BackLink from Order.user
# Access: await user.fetch_link(User.orders) after fetching with links=True
model_config = ConfigDict(populate_by_name=True)
class Settings:
name = "users"
indexes = [
pymongo.IndexModel([("role", pymongo.ASCENDING), ("is_active", pymongo.ASCENDING)]),
pymongo.IndexModel([("first_name", pymongo.TEXT), ("last_name", pymongo.TEXT)],
name="user_text_idx"),
]
@before_event([ # type: ignore[list-item]
"update",
])
async def update_timestamp(self) -> None:
self.updated_at = datetime.now(timezone.utc)
@property
def full_name(self) -> str:
return f"{self.first_name} {self.last_name}"
# ── Product document ───────────────────────────────────────────────────────────
class Product(Document):
name: str
sku: Indexed(str, unique=True) # type: ignore[valid-type]
price: float
stock: int = 0
is_active: bool = True
tags: list[Link[Tag]] = []
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Settings:
name = "products"
# ── Order document with Link to User ──────────────────────────────────────────
class OrderLine(BaseModel):
product_id: PydanticObjectId
product_sku: str
quantity: int
unit_price: float
@property
def subtotal(self) -> float:
return self.quantity * self.unit_price
class Order(Document):
user: Link[User]
lines: list[OrderLine] = []
status: OrderStatus = OrderStatus.PENDING
total: float = 0.0
notes: Optional[str] = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class Settings:
name = "orders"
indexes = [
pymongo.IndexModel([("status", pymongo.ASCENDING)]),
pymongo.IndexModel([("created_at", pymongo.DESCENDING)]),
]
@before_event(["update"]) # type: ignore[list-item]
async def touch(self) -> None:
self.updated_at = datetime.now(timezone.utc)
# app/services.py — async CRUD services using Beanie
from __future__ import annotations
from beanie.operators import In, Set, Inc
from app.documents import Order, OrderLine, OrderStatus, Product, Tag, User, UserRole
class UserService:
async def create(self, email: str, first_name: str, last_name: str) -> User:
user = User(email=email, first_name=first_name, last_name=last_name)
return await user.insert()
async def get_by_id(self, user_id: str) -> User | None:
return await User.get(user_id)
async def get_by_email(self, email: str) -> User | None:
return await User.find_one(User.email == email)
async def list_active(self, page: int = 1, page_size: int = 20) -> list[User]:
return await (
User.find(User.is_active == True)
.sort(User.last_name)
.skip((page - 1) * page_size)
.limit(page_size)
.to_list()
)
async def search(self, query: str) -> list[User]:
"""MongoDB text search using the text index."""
return await User.find({"$text": {"$search": query}}).to_list()
async def count_by_role(self) -> list[dict]:
return await User.aggregate([
{"$group": {"_id": "$role", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]).to_list()
async def deactivate(self, user_id: str) -> bool:
result = await User.find_one(User.id == user_id)
if result is None:
return False
await result.set({User.is_active: False})
return True
class OrderService:
async def create_order(self, user_id: str, lines_data: list[dict]) -> Order:
"""Create an order and decrement product stock."""
user = await User.get(user_id)
if user is None:
raise ValueError(f"User {user_id} not found")
product_ids = [line["product_id"] for line in lines_data]
products = {str(p.id): p
async for p in Product.find(In(Product.id, product_ids))}
lines = []
total = 0.0
for line_data in lines_data:
pid = line_data["product_id"]
product = products.get(str(pid))
if product is None:
raise ValueError(f"Product {pid} not found")
if product.stock < line_data["quantity"]:
raise ValueError(f"Insufficient stock for {product.sku}")
line = OrderLine(
product_id=product.id,
product_sku=product.sku,
quantity=line_data["quantity"],
unit_price=product.price,
)
lines.append(line)
total += line.subtotal
order = Order(user=user, lines=lines, total=total)
await order.insert()
# Decrement stock for each product
for line_data in lines_data:
await Product.find_one(Product.id == line_data["product_id"]).update(
Inc({Product.stock: -line_data["quantity"]})
)
return order
async def get_orders_with_users(self, limit: int = 20) -> list[Order]:
"""Fetch orders and resolve User links."""
return await (
Order.find()
.sort(-Order.created_at)
.limit(limit)
.fetch_links()
.to_list()
)
async def revenue_summary(self) -> dict:
"""Aggregate total revenue by status."""
pipeline = [
{"$group": {
"_id": "$status",
"total": {"$sum": "$total"},
"count": {"$sum": 1},
}},
{"$sort": {"total": -1}},
]
results = await Order.aggregate(pipeline).to_list()
return {r["_id"]: {"total": r["total"], "count": r["count"]} for r in results}
# app/config.py — Beanie init and FastAPI integration
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from beanie import init_beanie
from fastapi import FastAPI
from motor.motor_asyncio import AsyncIOMotorClient
from app.documents import Order, Product, Tag, User
MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017")
MONGODB_DB = os.environ.get("MONGODB_DB", "myapp")
@asynccontextmanager
async def lifespan(app: FastAPI):
client = AsyncIOMotorClient(MONGODB_URL)
db = client[MONGODB_DB]
await init_beanie(
database=db,
document_models=[User, Product, Tag, Order],
)
yield
client.close()
def create_app() -> FastAPI:
return FastAPI(lifespan=lifespan)
For the Motor directly alternative — Motor provides async pymongo with collection.find({}), collection.aggregate([]), and collection.update_one() but all data comes back as raw dicts requiring manual type casting, while Beanie wraps Motor with Pydantic validation so await User.find_one(User.email == email) returns a typed User instance, Indexed(str, unique=True) declares an index in Python that Beanie creates on init_beanie, and .sort(-User.created_at) uses the field directly instead of the magic string "-created_at". For the MongoEngine alternative — MongoEngine is synchronous and not compatible with asyncio/FastAPI, while Beanie’s Link[User] stores a DBRef and .fetch_links() resolves all links in a batch query, @before_event hooks run validation or timestamp updates before insert or replace operations without manual override, and BulkWriter batches multiple inserts and updates into one round-trip for high-throughput ingestion. The Claude Skills 360 bundle includes Beanie skill sets covering Document model definition, Indexed field types, init_beanie with FastAPI lifespan, insert/find/update/delete CRUD, find query chaining with sort/limit/skip, Link and BackLink relationships, aggregate pipeline, before_event hooks, BulkWriter for batch operations, and PydanticObjectId for typed IDs. Start with the free tier to try async MongoDB code generation.