Motor is the official async MongoDB driver for Python. pip install motor. Client: from motor.motor_asyncio import AsyncIOMotorClient; client = AsyncIOMotorClient("mongodb://localhost:27017"). Database: db = client["mydb"]. Collection: col = db["users"]. Insert one: result = await col.insert_one({"name":"Alice","email":"[email protected]"}), result.inserted_id. Insert many: result = await col.insert_many([{...},{...}]), result.inserted_ids. Find: async for doc in col.find({"active":True}): .... Find one: doc = await col.find_one({"email":"[email protected]"}). Project: col.find({}, {"name":1,"email":1,"_id":0}). Sort/limit: col.find().sort("name",1).limit(20).skip(offset). Count: await col.count_documents({"active":True}). Estimate: await col.estimated_document_count(). Update one: await col.update_one({"_id":id}, {"$set":{"name":"Bob"}}). Update many: await col.update_many({"active":False}, {"$set":{"archived":True}}). Find and update: doc = await col.find_one_and_update(filter, update, return_document=ReturnDocument.AFTER). Delete: await col.delete_one({"_id":id}). Delete many: await col.delete_many({"active":False}). Aggregate: await col.aggregate([...]).to_list(length=None). Index: await col.create_index("email", unique=True). await col.create_index([("role",1),("active",1)]). Bulk: from pymongo import InsertOne, UpdateOne; await col.bulk_write([...]). Change stream: async with col.watch() as stream: async for change in stream: .... GridFS: from motor.motor_asyncio import AsyncIOMotorGridFSBucket; fs = AsyncIOMotorGridFSBucket(db). Transaction: async with await client.start_session() as s: async with s.start_transaction(): .... ObjectId: from bson import ObjectId. ObjectId(id_str). Claude Code generates Motor collection abstractions, aggregation pipelines, and change stream processors.
CLAUDE.md for Motor
## Motor Stack
- Version: motor >= 3.4 | pip install motor
- Client: AsyncIOMotorClient(url) | db = client["name"] | col = db["collection"]
- Insert: await col.insert_one(doc) | insert_many([...]) → result.inserted_ids
- Find: await col.find_one(filter) | col.find(filter).sort().limit().skip()
- Update: update_one/update_many(filter, {"$set":{...}, "$inc":{...}})
- Aggregate: await col.aggregate([{"$match":{}},{"$group":{...}}]).to_list(None)
- Index: await col.create_index(field, unique=True) / IndexModel for compound
- Transactions: async with session.start_transaction(): (replica set required)
Motor Async MongoDB Pipeline
# app/motor_db.py — Motor async MongoDB client and collection helpers
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timezone
from typing import Any, AsyncIterator
from bson import ObjectId
from motor.motor_asyncio import (
AsyncIOMotorClient,
AsyncIOMotorCollection,
AsyncIOMotorDatabase,
AsyncIOMotorGridFSBucket,
)
from pymongo import ASCENDING, DESCENDING, IndexModel, ReturnDocument
from pymongo.errors import BulkWriteError, DuplicateKeyError
logger = logging.getLogger(__name__)
MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017")
MONGODB_DB = os.environ.get("MONGODB_DB", "myapp")
# ─────────────────────────────────────────────────────────────────────────────
# Client and pool management
# ─────────────────────────────────────────────────────────────────────────────
_client: AsyncIOMotorClient | None = None
def get_client() -> AsyncIOMotorClient:
global _client
if _client is None:
_client = AsyncIOMotorClient(
MONGODB_URL,
maxPoolSize=20,
minPoolSize=5,
maxIdleTimeMS=300_000,
serverSelectionTimeoutMS=5_000,
)
return _client
def get_db(name: str = MONGODB_DB) -> AsyncIOMotorDatabase:
return get_client()[name]
async def close_client() -> None:
global _client
if _client is not None:
_client.close()
_client = None
# ─────────────────────────────────────────────────────────────────────────────
# Schema initialisation — create indexes once at startup
# ─────────────────────────────────────────────────────────────────────────────
async def create_indexes(db: AsyncIOMotorDatabase) -> None:
"""Create all collection indexes. Safe to call multiple times (idempotent)."""
await db["users"].create_indexes([
IndexModel([("email", ASCENDING)], unique=True, name="users_email_uniq"),
IndexModel([("role", ASCENDING), ("is_active", ASCENDING)], name="users_role_active"),
IndexModel([("$**", "text")], name="users_text"), # wildcard text search
])
await db["products"].create_indexes([
IndexModel([("sku", ASCENDING)], unique=True, name="products_sku_uniq"),
IndexModel([("is_active", ASCENDING), ("price", ASCENDING)], name="products_active_price"),
])
await db["orders"].create_indexes([
IndexModel([("user_id", ASCENDING)], name="orders_user"),
IndexModel([("status", ASCENDING)], name="orders_status"),
IndexModel([("created_at", DESCENDING)], name="orders_created_desc"),
])
logger.info("MongoDB indexes created/verified")
# ─────────────────────────────────────────────────────────────────────────────
# User repository
# ─────────────────────────────────────────────────────────────────────────────
class UserRepository:
def __init__(self, db: AsyncIOMotorDatabase) -> None:
self._col: AsyncIOMotorCollection = db["users"]
async def create(self, email: str, first_name: str, last_name: str,
role: str = "user") -> dict:
doc = {
"email": email,
"first_name": first_name,
"last_name": last_name,
"role": role,
"is_active": True,
"created_at": datetime.now(timezone.utc),
}
try:
result = await self._col.insert_one(doc)
except DuplicateKeyError:
raise ValueError(f"User with email {email!r} already exists")
doc["_id"] = result.inserted_id
return doc
async def get_by_id(self, user_id: str) -> dict | None:
return await self._col.find_one({"_id": ObjectId(user_id)})
async def get_by_email(self, email: str) -> dict | None:
return await self._col.find_one({"email": email})
async def list_active(self, page: int = 1, page_size: int = 20) -> list[dict]:
cursor = (
self._col
.find({"is_active": True}, {"_id": 1, "email": 1, "first_name": 1, "last_name": 1})
.sort("last_name", ASCENDING)
.skip((page - 1) * page_size)
.limit(page_size)
)
return await cursor.to_list(length=page_size)
async def search(self, query: str) -> list[dict]:
"""Full-text search using MongoDB text index."""
return await self._col.find(
{"$text": {"$search": query}},
{"score": {"$meta": "textScore"}},
).sort([("score", {"$meta": "textScore"})]).to_list(length=50)
async def update(self, user_id: str, updates: dict) -> dict | None:
updates["updated_at"] = datetime.now(timezone.utc)
return await self._col.find_one_and_update(
{"_id": ObjectId(user_id)},
{"$set": updates},
return_document=ReturnDocument.AFTER,
)
async def deactivate(self, user_id: str) -> bool:
result = await self._col.update_one(
{"_id": ObjectId(user_id), "is_active": True},
{"$set": {"is_active": False, "updated_at": datetime.now(timezone.utc)}},
)
return result.matched_count > 0
async def count_by_role(self) -> list[dict]:
"""Aggregate user counts grouped by role."""
return await self._col.aggregate([
{"$group": {"_id": "$role", "count": {"$sum": 1}}},
{"$sort": {"count": DESCENDING}},
{"$project": {"role": "$_id", "count": 1, "_id": 0}},
]).to_list(length=None)
async def bulk_create(self, users: list[dict]) -> int:
"""Bulk insert with ordered=False to continue past duplicate key errors."""
docs = [{
"email": u["email"],
"first_name": u["first_name"],
"last_name": u["last_name"],
"role": u.get("role", "user"),
"is_active": True,
"created_at": datetime.now(timezone.utc),
} for u in users]
try:
result = await self._col.insert_many(docs, ordered=False)
return len(result.inserted_ids)
except BulkWriteError as exc:
# Some inserts succeeded even though some failed
inserted = exc.details.get("nInserted", 0)
logger.warning("Bulk insert partial: %d succeeded, some failed", inserted)
return inserted
# ─────────────────────────────────────────────────────────────────────────────
# Change streams — real-time event processing
# ─────────────────────────────────────────────────────────────────────────────
async def watch_order_events(
db: AsyncIOMotorDatabase,
handler,
) -> None:
"""
Subscribe to order collection changes.
Emits events for insert, update, replace operations.
Requires a MongoDB replica set (or Atlas).
"""
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "replace"]}}}]
async with db["orders"].watch(pipeline, full_document="updateLookup") as stream:
logger.info("Watching orders collection for changes")
async for change in stream:
op = change["operationType"]
doc = change.get("fullDocument") or change.get("documentKey")
logger.debug("Order event op=%s id=%s", op, doc.get("_id"))
await handler(op, doc)
# ─────────────────────────────────────────────────────────────────────────────
# GridFS — large file storage
# ─────────────────────────────────────────────────────────────────────────────
class FileRepository:
"""Store and retrieve large files via GridFS."""
def __init__(self, db: AsyncIOMotorDatabase) -> None:
self._fs = AsyncIOMotorGridFSBucket(db, bucket_name="uploads")
async def upload(self, filename: str, data: bytes, metadata: dict | None = None) -> str:
grid_in = self._fs.open_upload_stream(
filename,
metadata=metadata or {},
)
await grid_in.write(data)
await grid_in.close()
file_id = str(grid_in._id)
logger.info("Uploaded file=%s id=%s size=%d", filename, file_id, len(data))
return file_id
async def download(self, file_id: str) -> bytes:
grid_out = await self._fs.open_download_stream(ObjectId(file_id))
return await grid_out.read()
async def delete(self, file_id: str) -> None:
await self._fs.delete(ObjectId(file_id))
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = """
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from motor.motor_asyncio import AsyncIOMotorDatabase
from app.motor_db import get_db, close_client, create_indexes, UserRepository
@asynccontextmanager
async def lifespan(app: FastAPI):
db = get_db()
await create_indexes(db)
yield
await close_client()
app = FastAPI(lifespan=lifespan)
async def get_users(db: AsyncIOMotorDatabase = Depends(get_db)):
return UserRepository(db)
@app.get("/users/{user_id}")
async def read_user(user_id: str, repo: UserRepository = Depends(get_users)):
user = await repo.get_by_id(user_id)
if user is None:
raise HTTPException(404, "User not found")
user["_id"] = str(user["_id"])
return user
"""
For the PyMongo (sync) alternative — PyMongo’s synchronous collection.find() blocks the thread during network I/O, requiring thread pools or process pools for concurrent requests, while Motor wraps the exact same PyMongo API in asyncio coroutines so async for doc in collection.find({}) yields the event loop on each network call — a FastAPI server with one Motor client handles hundreds of concurrent requests without thread overhead. For the Beanie ODM alternative — Beanie adds Pydantic validation and a Django-ORM-like query API on top of Motor, while Motor’s raw collection.aggregate([...]) pipeline gives full control over MongoDB query plan: $lookup for multi-collection joins, $unwind for array expansion, $facet for parallel aggregation buckets, and $changeStream stage for real-time pipeline filtering — patterns that need custom Python to express in Beanie’s typed query builder. The Claude Skills 360 bundle includes Motor skill sets covering AsyncIOMotorClient configuration, insert_one/many and bulk_write, find with projection sort limit skip, update_one/many with $set/$inc/$push, aggregate pipelines, IndexModel for compound indexes, find_one_and_update with ReturnDocument, change stream watch, GridFSBucket file storage, multi-document transactions with sessions, and FastAPI lifespan integration. Start with the free tier to try async MongoDB code generation.