Claude Code for Motor: Async MongoDB with Python — Claude Skills 360 Blog
Blog / AI / Claude Code for Motor: Async MongoDB with Python
AI

Claude Code for Motor: Async MongoDB with Python

Published: December 27, 2027
Read time: 5 min read
By: Claude Skills 360

Motor is the official async MongoDB driver for Python. pip install motor. Client: from motor.motor_asyncio import AsyncIOMotorClient; client = AsyncIOMotorClient("mongodb://localhost:27017"). Database: db = client["mydb"]. Collection: col = db["users"]. Insert one: result = await col.insert_one({"name":"Alice","email":"[email protected]"}), result.inserted_id. Insert many: result = await col.insert_many([{...},{...}]), result.inserted_ids. Find: async for doc in col.find({"active":True}): .... Find one: doc = await col.find_one({"email":"[email protected]"}). Project: col.find({}, {"name":1,"email":1,"_id":0}). Sort/limit: col.find().sort("name",1).limit(20).skip(offset). Count: await col.count_documents({"active":True}). Estimate: await col.estimated_document_count(). Update one: await col.update_one({"_id":id}, {"$set":{"name":"Bob"}}). Update many: await col.update_many({"active":False}, {"$set":{"archived":True}}). Find and update: doc = await col.find_one_and_update(filter, update, return_document=ReturnDocument.AFTER). Delete: await col.delete_one({"_id":id}). Delete many: await col.delete_many({"active":False}). Aggregate: await col.aggregate([...]).to_list(length=None). Index: await col.create_index("email", unique=True). await col.create_index([("role",1),("active",1)]). Bulk: from pymongo import InsertOne, UpdateOne; await col.bulk_write([...]). Change stream: async with col.watch() as stream: async for change in stream: .... GridFS: from motor.motor_asyncio import AsyncIOMotorGridFSBucket; fs = AsyncIOMotorGridFSBucket(db). Transaction: async with await client.start_session() as s: async with s.start_transaction(): .... ObjectId: from bson import ObjectId. ObjectId(id_str). Claude Code generates Motor collection abstractions, aggregation pipelines, and change stream processors.

CLAUDE.md for Motor

## Motor Stack
- Version: motor >= 3.4 | pip install motor
- Client: AsyncIOMotorClient(url) | db = client["name"] | col = db["collection"]
- Insert: await col.insert_one(doc) | insert_many([...]) → result.inserted_ids
- Find: await col.find_one(filter) | col.find(filter).sort().limit().skip()
- Update: update_one/update_many(filter, {"$set":{...}, "$inc":{...}})
- Aggregate: await col.aggregate([{"$match":{}},{"$group":{...}}]).to_list(None)
- Index: await col.create_index(field, unique=True) / IndexModel for compound
- Transactions: async with session.start_transaction(): (replica set required)

Motor Async MongoDB Pipeline

# app/motor_db.py — Motor async MongoDB client and collection helpers
from __future__ import annotations

import json
import logging
import os
from datetime import datetime, timezone
from typing import Any, AsyncIterator

from bson import ObjectId
from motor.motor_asyncio import (
    AsyncIOMotorClient,
    AsyncIOMotorCollection,
    AsyncIOMotorDatabase,
    AsyncIOMotorGridFSBucket,
)
from pymongo import ASCENDING, DESCENDING, IndexModel, ReturnDocument
from pymongo.errors import BulkWriteError, DuplicateKeyError

logger = logging.getLogger(__name__)

MONGODB_URL = os.environ.get("MONGODB_URL", "mongodb://localhost:27017")
MONGODB_DB  = os.environ.get("MONGODB_DB",  "myapp")


# ─────────────────────────────────────────────────────────────────────────────
# Client and pool management
# ─────────────────────────────────────────────────────────────────────────────

_client: AsyncIOMotorClient | None = None


def get_client() -> AsyncIOMotorClient:
    global _client
    if _client is None:
        _client = AsyncIOMotorClient(
            MONGODB_URL,
            maxPoolSize=20,
            minPoolSize=5,
            maxIdleTimeMS=300_000,
            serverSelectionTimeoutMS=5_000,
        )
    return _client


def get_db(name: str = MONGODB_DB) -> AsyncIOMotorDatabase:
    return get_client()[name]


async def close_client() -> None:
    global _client
    if _client is not None:
        _client.close()
        _client = None


# ─────────────────────────────────────────────────────────────────────────────
# Schema initialisation — create indexes once at startup
# ─────────────────────────────────────────────────────────────────────────────

async def create_indexes(db: AsyncIOMotorDatabase) -> None:
    """Create all collection indexes. Safe to call multiple times (idempotent)."""

    await db["users"].create_indexes([
        IndexModel([("email", ASCENDING)], unique=True, name="users_email_uniq"),
        IndexModel([("role", ASCENDING), ("is_active", ASCENDING)], name="users_role_active"),
        IndexModel([("$**", "text")], name="users_text"),   # wildcard text search
    ])

    await db["products"].create_indexes([
        IndexModel([("sku", ASCENDING)], unique=True, name="products_sku_uniq"),
        IndexModel([("is_active", ASCENDING), ("price", ASCENDING)], name="products_active_price"),
    ])

    await db["orders"].create_indexes([
        IndexModel([("user_id", ASCENDING)], name="orders_user"),
        IndexModel([("status", ASCENDING)], name="orders_status"),
        IndexModel([("created_at", DESCENDING)], name="orders_created_desc"),
    ])

    logger.info("MongoDB indexes created/verified")


# ─────────────────────────────────────────────────────────────────────────────
# User repository
# ─────────────────────────────────────────────────────────────────────────────

class UserRepository:

    def __init__(self, db: AsyncIOMotorDatabase) -> None:
        self._col: AsyncIOMotorCollection = db["users"]

    async def create(self, email: str, first_name: str, last_name: str,
                     role: str = "user") -> dict:
        doc = {
            "email":      email,
            "first_name": first_name,
            "last_name":  last_name,
            "role":       role,
            "is_active":  True,
            "created_at": datetime.now(timezone.utc),
        }
        try:
            result = await self._col.insert_one(doc)
        except DuplicateKeyError:
            raise ValueError(f"User with email {email!r} already exists")
        doc["_id"] = result.inserted_id
        return doc

    async def get_by_id(self, user_id: str) -> dict | None:
        return await self._col.find_one({"_id": ObjectId(user_id)})

    async def get_by_email(self, email: str) -> dict | None:
        return await self._col.find_one({"email": email})

    async def list_active(self, page: int = 1, page_size: int = 20) -> list[dict]:
        cursor = (
            self._col
            .find({"is_active": True}, {"_id": 1, "email": 1, "first_name": 1, "last_name": 1})
            .sort("last_name", ASCENDING)
            .skip((page - 1) * page_size)
            .limit(page_size)
        )
        return await cursor.to_list(length=page_size)

    async def search(self, query: str) -> list[dict]:
        """Full-text search using MongoDB text index."""
        return await self._col.find(
            {"$text": {"$search": query}},
            {"score": {"$meta": "textScore"}},
        ).sort([("score", {"$meta": "textScore"})]).to_list(length=50)

    async def update(self, user_id: str, updates: dict) -> dict | None:
        updates["updated_at"] = datetime.now(timezone.utc)
        return await self._col.find_one_and_update(
            {"_id": ObjectId(user_id)},
            {"$set": updates},
            return_document=ReturnDocument.AFTER,
        )

    async def deactivate(self, user_id: str) -> bool:
        result = await self._col.update_one(
            {"_id": ObjectId(user_id), "is_active": True},
            {"$set": {"is_active": False, "updated_at": datetime.now(timezone.utc)}},
        )
        return result.matched_count > 0

    async def count_by_role(self) -> list[dict]:
        """Aggregate user counts grouped by role."""
        return await self._col.aggregate([
            {"$group": {"_id": "$role", "count": {"$sum": 1}}},
            {"$sort":  {"count": DESCENDING}},
            {"$project": {"role": "$_id", "count": 1, "_id": 0}},
        ]).to_list(length=None)

    async def bulk_create(self, users: list[dict]) -> int:
        """Bulk insert with ordered=False to continue past duplicate key errors."""
        docs = [{
            "email":      u["email"],
            "first_name": u["first_name"],
            "last_name":  u["last_name"],
            "role":       u.get("role", "user"),
            "is_active":  True,
            "created_at": datetime.now(timezone.utc),
        } for u in users]

        try:
            result = await self._col.insert_many(docs, ordered=False)
            return len(result.inserted_ids)
        except BulkWriteError as exc:
            # Some inserts succeeded even though some failed
            inserted = exc.details.get("nInserted", 0)
            logger.warning("Bulk insert partial: %d succeeded, some failed", inserted)
            return inserted


# ─────────────────────────────────────────────────────────────────────────────
# Change streams — real-time event processing
# ─────────────────────────────────────────────────────────────────────────────

async def watch_order_events(
    db: AsyncIOMotorDatabase,
    handler,
) -> None:
    """
    Subscribe to order collection changes.
    Emits events for insert, update, replace operations.
    Requires a MongoDB replica set (or Atlas).
    """
    pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "replace"]}}}]

    async with db["orders"].watch(pipeline, full_document="updateLookup") as stream:
        logger.info("Watching orders collection for changes")
        async for change in stream:
            op  = change["operationType"]
            doc = change.get("fullDocument") or change.get("documentKey")
            logger.debug("Order event op=%s id=%s", op, doc.get("_id"))
            await handler(op, doc)


# ─────────────────────────────────────────────────────────────────────────────
# GridFS — large file storage
# ─────────────────────────────────────────────────────────────────────────────

class FileRepository:
    """Store and retrieve large files via GridFS."""

    def __init__(self, db: AsyncIOMotorDatabase) -> None:
        self._fs = AsyncIOMotorGridFSBucket(db, bucket_name="uploads")

    async def upload(self, filename: str, data: bytes, metadata: dict | None = None) -> str:
        grid_in = self._fs.open_upload_stream(
            filename,
            metadata=metadata or {},
        )
        await grid_in.write(data)
        await grid_in.close()
        file_id = str(grid_in._id)
        logger.info("Uploaded file=%s id=%s size=%d", filename, file_id, len(data))
        return file_id

    async def download(self, file_id: str) -> bytes:
        grid_out = await self._fs.open_download_stream(ObjectId(file_id))
        return await grid_out.read()

    async def delete(self, file_id: str) -> None:
        await self._fs.delete(ObjectId(file_id))


# ─────────────────────────────────────────────────────────────────────────────
# FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────

FASTAPI_EXAMPLE = """
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from motor.motor_asyncio import AsyncIOMotorDatabase
from app.motor_db import get_db, close_client, create_indexes, UserRepository

@asynccontextmanager
async def lifespan(app: FastAPI):
    db = get_db()
    await create_indexes(db)
    yield
    await close_client()

app = FastAPI(lifespan=lifespan)

async def get_users(db: AsyncIOMotorDatabase = Depends(get_db)):
    return UserRepository(db)

@app.get("/users/{user_id}")
async def read_user(user_id: str, repo: UserRepository = Depends(get_users)):
    user = await repo.get_by_id(user_id)
    if user is None:
        raise HTTPException(404, "User not found")
    user["_id"] = str(user["_id"])
    return user
"""

For the PyMongo (sync) alternative — PyMongo’s synchronous collection.find() blocks the thread during network I/O, requiring thread pools or process pools for concurrent requests, while Motor wraps the exact same PyMongo API in asyncio coroutines so async for doc in collection.find({}) yields the event loop on each network call — a FastAPI server with one Motor client handles hundreds of concurrent requests without thread overhead. For the Beanie ODM alternative — Beanie adds Pydantic validation and a Django-ORM-like query API on top of Motor, while Motor’s raw collection.aggregate([...]) pipeline gives full control over MongoDB query plan: $lookup for multi-collection joins, $unwind for array expansion, $facet for parallel aggregation buckets, and $changeStream stage for real-time pipeline filtering — patterns that need custom Python to express in Beanie’s typed query builder. The Claude Skills 360 bundle includes Motor skill sets covering AsyncIOMotorClient configuration, insert_one/many and bulk_write, find with projection sort limit skip, update_one/many with $set/$inc/$push, aggregate pipelines, IndexModel for compound indexes, find_one_and_update with ReturnDocument, change stream watch, GridFSBucket file storage, multi-document transactions with sessions, and FastAPI lifespan integration. Start with the free tier to try async MongoDB code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free