MongoDB’s aggregation pipeline is a sequence of transformation stages — $match, $group, $lookup, $project — that process documents in order, each stage feeding the next. It handles complex analytics that SQL handles with nested subqueries, but works on flexible document schemas that don’t require migrations for new fields. Atlas Search adds full-text and vector search on the same data. Claude Code writes aggregation pipelines, mongoose schemas with proper indexes, change stream processors, and the embedding vs referencing decisions that prevent slow queries.
CLAUDE.md for MongoDB Projects
## MongoDB Stack
- MongoDB 7.x or Atlas (managed)
- ODM: Mongoose 8.x with TypeScript strict types
- Indexes: compound indexes on all query patterns; no collection scans in production
- Aggregations: $facet for multi-faceted results; $lookup for joins (but prefer embedding)
- Transactions: multi-document transactions for atomic cross-collection writes
- Change streams: real-time CDC for downstream services
- Atlas Search: full-text + vector search without separate search cluster
- Schema: always define JSON Schema validation on collections
Mongoose Schema with TypeScript
// models/Order.ts
import mongoose, { Schema, Document, model } from 'mongoose';
interface IOrderItem {
productId: string;
productName: string;
quantity: number;
unitPriceCents: number;
}
interface IOrder {
customerId: string;
status: 'pending' | 'processing' | 'shipped' | 'delivered' | 'cancelled';
items: IOrderItem[];
totalCents: number;
shippingAddress: {
street: string;
city: string;
country: string;
};
tags: string[];
metadata: Record<string, unknown>;
createdAt: Date;
updatedAt: Date;
}
export interface OrderDocument extends IOrder, Document {}
const orderSchema = new Schema<OrderDocument>(
{
customerId: { type: String, required: true, index: true },
status: {
type: String,
enum: ['pending', 'processing', 'shipped', 'delivered', 'cancelled'],
default: 'pending',
index: true,
},
items: [{
productId: { type: String, required: true },
productName: { type: String, required: true },
quantity: { type: Number, required: true, min: 1 },
unitPriceCents: { type: Number, required: true, min: 1 },
}],
totalCents: { type: Number, required: true, min: 0 },
shippingAddress: {
street: String,
city: String,
country: { type: String, minlength: 2, maxlength: 2 },
},
tags: [String],
metadata: { type: Schema.Types.Mixed, default: {} },
},
{
timestamps: true, // Adds createdAt/updatedAt automatically
versionKey: false,
}
);
// Compound indexes for common query patterns
orderSchema.index({ customerId: 1, createdAt: -1 }); // User's order history
orderSchema.index({ status: 1, createdAt: -1 }); // Admin order list by status
orderSchema.index({ 'items.productId': 1 }); // Orders containing product
export const Order = model<OrderDocument>('Order', orderSchema);
Aggregation Pipelines
// repositories/orders.ts
// Complex analytics with $facet
export async function getOrderStats(startDate: Date, endDate: Date) {
const result = await Order.aggregate([
{
$match: {
createdAt: { $gte: startDate, $lte: endDate },
status: { $ne: 'cancelled' },
},
},
{
$facet: {
// Revenue by day
dailyRevenue: [
{
$group: {
_id: { $dateToString: { format: '%Y-%m-%d', date: '$createdAt' } },
revenue: { $sum: '$totalCents' },
orderCount: { $sum: 1 },
},
},
{ $sort: { _id: 1 } },
],
// Top products by revenue
topProducts: [
{ $unwind: '$items' },
{
$group: {
_id: '$items.productId',
productName: { $first: '$items.productName' },
totalRevenue: { $sum: { $multiply: ['$items.quantity', '$items.unitPriceCents'] } },
unitsSold: { $sum: '$items.quantity' },
},
},
{ $sort: { totalRevenue: -1 } },
{ $limit: 10 },
],
// Order status breakdown
statusBreakdown: [
{ $group: { _id: '$status', count: { $sum: 1 } } },
],
// Summary
summary: [
{
$group: {
_id: null,
totalRevenue: { $sum: '$totalCents' },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: '$totalCents' },
uniqueCustomers: { $addToSet: '$customerId' },
},
},
{
$project: {
totalRevenue: 1,
totalOrders: 1,
avgOrderValue: { $round: ['$avgOrderValue', 0] },
uniqueCustomers: { $size: '$uniqueCustomers' },
},
},
],
},
},
]);
return result[0];
}
// $lookup: join orders with customer data
export async function getOrdersWithCustomers(limit = 20) {
return Order.aggregate([
{ $match: { status: 'pending' } },
{
$lookup: {
from: 'customers',
localField: 'customerId',
foreignField: '_id',
as: 'customer',
pipeline: [
{ $project: { name: 1, email: 1, tier: 1 } }, // Only fetch needed fields
],
},
},
{ $unwind: { path: '$customer', preserveNullAndEmpty: true } },
{ $sort: { createdAt: -1 } },
{ $limit: limit },
]);
}
Transactions
// services/order-service.ts — multi-document transactions
import mongoose from 'mongoose';
export async function placeOrderTransactionally(input: CreateOrderInput) {
const session = await mongoose.startSession();
try {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
});
// Decrement inventory for each item
for (const item of input.items) {
const result = await ProductInventory.findOneAndUpdate(
{ productId: item.productId, stock: { $gte: item.quantity } },
{ $inc: { stock: -item.quantity } },
{ session, new: true },
);
if (!result) {
throw new Error(`Insufficient stock for product ${item.productId}`);
}
}
// Create the order
const [order] = await Order.create([input], { session });
await session.commitTransaction();
return order;
} catch (err) {
await session.abortTransaction();
throw err;
} finally {
session.endSession();
}
}
Change Streams
// streams/order-changes.ts — real-time CDC
export async function watchOrderChanges() {
const changeStream = Order.watch([
{ $match: { 'fullDocument.status': { $in: ['shipped', 'delivered'] } } },
], {
fullDocument: 'updateLookup', // Include the full updated document
});
changeStream.on('change', async (change) => {
if (change.operationType === 'update' || change.operationType === 'insert') {
const order = change.fullDocument as OrderDocument;
if (order.status === 'shipped') {
await notificationService.sendShippingNotification(order);
} else if (order.status === 'delivered') {
await loyaltyService.awardPoints(order.customerId, order.totalCents);
}
}
});
changeStream.on('error', (err) => {
logger.error('Change stream error:', err);
});
return changeStream;
}
For the SQL database alternative when complex queries matter more than flexibility, the PostgreSQL advanced guide covers window functions and indexes for relational workloads. For the data engineering pipelines that consume MongoDB change streams downstream, the change data capture guide covers Debezium-based CDC architectures. The Claude Skills 360 bundle includes MongoDB skill sets covering aggregation pipelines, compound indexing, transactions, and change stream consumers. Start with the free tier to try MongoDB aggregation pipeline generation.