SQLAlchemy provides both a Pythonic ORM and a low-level SQL expression language. pip install sqlalchemy. from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session. ORM model: class Base(DeclarativeBase): pass. class User(Base): __tablename__="users", id: Mapped[int] = mapped_column(primary_key=True), email: Mapped[str] = mapped_column(unique=True), orders: Mapped[list["Order"]] = relationship(back_populates="user"). Create engine: engine = create_engine("postgresql://user:pass@host/db", echo=False, pool_size=10). Create tables: Base.metadata.create_all(engine). Session: Session = sessionmaker(engine), with Session() as s: s.add(user); s.commit(). Query 2.0: stmt = select(User).where(User.active == True).order_by(User.name), users = session.scalars(stmt).all(). Insert: session.execute(insert(User).values(email="[email protected]")). Update: session.execute(update(User).where(User.id==1).values(active=False)). Delete: session.execute(delete(User).where(User.id==1)). Join: select(User, Order).join(Order.user). Eager loading: select(User).options(selectinload(User.orders)). Async: from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession. Engine: engine = create_async_engine("postgresql+asyncpg://..."). Bulk: session.execute(insert(User), [{"email": e} for e in emails]). Raw SQL: session.execute(text("SELECT * FROM users WHERE id=:id"), {"id":1}). Claude Code generates SQLAlchemy ORM models, async database services, migration scripts, and query builders.
CLAUDE.md for SQLAlchemy
## SQLAlchemy Stack
- Version: sqlalchemy >= 2.0, psycopg2/asyncpg for PostgreSQL
- ORM: DeclarativeBase | Mapped[T] | mapped_column(primary_key, unique, index)
- Session: sessionmaker(engine) | with Session() as s: ... s.commit()
- Query 2.0: select(Model).where(...).order_by(...) | session.scalars(stmt).all()
- Eager: selectinload(rel) | joinedload(rel) in options()
- Async: create_async_engine | AsyncSession | async with session:
- Bulk: session.execute(insert(Model), list_of_dicts)
- Migrations: alembic init alembic | alembic revision --autogenerate
SQLAlchemy ORM Pipeline
# db/sqlalchemy_pipeline.py — Python ORM and SQL toolkit with SQLAlchemy 2.0
from __future__ import annotations
from datetime import datetime
from typing import Optional, Any
import uuid
from sqlalchemy import (
create_engine, String, Integer, Float, Boolean, DateTime, Text,
ForeignKey, UniqueConstraint, Index, CheckConstraint,
select, insert, update, delete, func, text, event,
)
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship,
Session, sessionmaker, selectinload, joinedload,
)
from sqlalchemy.pool import QueuePool
# ── 1. Models ─────────────────────────────────────────────────────────────────
class Base(DeclarativeBase):
pass
class User(Base):
"""User account with profile and orders."""
__tablename__ = "users"
__table_args__ = (
UniqueConstraint("email", name="uq_user_email"),
Index("ix_user_email_active", "email", "active"),
)
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), nullable=False)
name: Mapped[str] = mapped_column(String(120))
active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
role: Mapped[str] = mapped_column(String(50), default="user")
# Relationships
orders: Mapped[list["Order"]] = relationship(back_populates="user",
cascade="all, delete-orphan")
profile: Mapped[Optional["UserProfile"]] = relationship(back_populates="user",
uselist=False,
cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<User id={self.id} email={self.email}>"
class UserProfile(Base):
"""Optional user profile — one-to-one with User."""
__tablename__ = "user_profiles"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), unique=True)
bio: Mapped[Optional[str]] = mapped_column(Text)
score: Mapped[float] = mapped_column(Float, default=0.0)
user: Mapped["User"] = relationship(back_populates="profile")
class Product(Base):
"""Product catalog."""
__tablename__ = "products"
__table_args__ = (
CheckConstraint("price >= 0", name="ck_product_price_positive"),
)
id: Mapped[int] = mapped_column(primary_key=True)
sku: Mapped[str] = mapped_column(String(50), unique=True)
name: Mapped[str] = mapped_column(String(255))
price: Mapped[float] = mapped_column(Float)
stock: Mapped[int] = mapped_column(Integer, default=0)
active: Mapped[bool] = mapped_column(Boolean, default=True)
order_items: Mapped[list["OrderItem"]] = relationship(back_populates="product")
class Order(Base):
"""Purchase order."""
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
total: Mapped[float] = mapped_column(Float, default=0.0)
status: Mapped[str] = mapped_column(String(50), default="pending")
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
user: Mapped["User"] = relationship(back_populates="orders")
items: Mapped[list["OrderItem"]] = relationship(back_populates="order",
cascade="all, delete-orphan")
class OrderItem(Base):
"""Order line item."""
__tablename__ = "order_items"
id: Mapped[int] = mapped_column(primary_key=True)
order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"))
product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
quantity: Mapped[int] = mapped_column(Integer, default=1)
unit_price: Mapped[float] = mapped_column(Float)
order: Mapped["Order"] = relationship(back_populates="items")
product: Mapped["Product"] = relationship(back_populates="order_items")
# ── 2. Engine and Session factory ────────────────────────────────────────────
def create_db_engine(
url: str,
echo: bool = False,
pool_size: int = 10,
max_overflow: int = 20,
):
"""
Create SQLAlchemy 2.0 engine with connection pool.
url: e.g. "postgresql://user:pass@localhost/mydb" or "sqlite:///./db.sqlite"
"""
return create_engine(
url,
echo=echo,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=True, # Verify connection health before use
)
def make_session_factory(engine):
"""Return a sessionmaker for the given engine."""
return sessionmaker(engine, autoflush=False, autocommit=False)
# ── 3. CRUD operations ────────────────────────────────────────────────────────
def create_user(
session: Session,
email: str,
name: str,
role: str = "user",
) -> User:
"""Create and persist a new user. Raises IntegrityError if email exists."""
user = User(email=email, name=name, role=role)
session.add(user)
session.flush() # Populate user.id without committing
return user
def get_user_by_email(session: Session, email: str) -> User | None:
"""Fetch user by email with profile eagerly loaded."""
stmt = (
select(User)
.where(User.email == email)
.options(joinedload(User.profile))
)
return session.scalars(stmt).first()
def get_users_with_orders(
session: Session,
active: bool = True,
limit: int = 100,
offset: int = 0,
) -> list[User]:
"""Fetch users with all orders eagerly loaded (avoids N+1)."""
stmt = (
select(User)
.where(User.active == active)
.options(selectinload(User.orders).selectinload(Order.items))
.order_by(User.created_at.desc())
.limit(limit)
.offset(offset)
)
return session.scalars(stmt).all()
def update_user_status(
session: Session,
user_id: int,
active: bool,
) -> int:
"""Bulk-update active status. Returns rows affected."""
stmt = (
update(User)
.where(User.id == user_id)
.values(active=active)
.execution_options(synchronize_session="fetch")
)
result = session.execute(stmt)
return result.rowcount
def delete_user(session: Session, user_id: int) -> bool:
"""Delete user (cascades to orders and profile)."""
user = session.get(User, user_id)
if user:
session.delete(user)
return True
return False
# ── 4. Aggregation queries ────────────────────────────────────────────────────
def sales_by_user(
session: Session,
limit: int = 10,
) -> list[dict]:
"""Top N users by total order value."""
stmt = (
select(
User.id,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total).label("total_revenue"),
)
.join(Order, Order.user_id == User.id)
.where(User.active == True)
.group_by(User.id, User.email)
.order_by(func.sum(Order.total).desc())
.limit(limit)
)
rows = session.execute(stmt).all()
return [{"id": r.id, "email": r.email,
"order_count": r.order_count, "total_revenue": r.total_revenue}
for r in rows]
def product_inventory_summary(session: Session) -> list[dict]:
"""Products with stock level and total sold quantity."""
stmt = (
select(
Product.sku,
Product.name,
Product.stock,
func.coalesce(func.sum(OrderItem.quantity), 0).label("total_sold"),
)
.outerjoin(OrderItem, OrderItem.product_id == Product.id)
.where(Product.active == True)
.group_by(Product.id, Product.sku, Product.name, Product.stock)
.order_by(func.sum(OrderItem.quantity).desc().nullslast())
)
rows = session.execute(stmt).all()
return [r._asdict() for r in rows]
# ── 5. Bulk operations ────────────────────────────────────────────────────────
def bulk_insert_users(
session: Session,
records: list[dict],
) -> int:
"""
High-throughput insert using Core (no ORM overhead).
records: [{"email": ..., "name": ..., "role": ...}, ...]
Returns number of inserted rows.
"""
if not records:
return 0
result = session.execute(insert(User), records)
return result.rowcount
def upsert_products(
session: Session,
products: list[dict],
) -> None:
"""
Upsert products by SKU.
For PostgreSQL: uses ON CONFLICT DO UPDATE.
"""
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = (
pg_insert(Product)
.values(products)
.on_conflict_do_update(
index_elements=["sku"],
set_={"price": pg_insert(Product).excluded.price,
"stock": pg_insert(Product).excluded.stock},
)
)
session.execute(stmt)
# ── 6. Event listeners (audit logging) ────────────────────────────────────────
def attach_audit_listener(session_factory):
"""
Attach SQLAlchemy event listeners to log all INSERT/UPDATE/DELETE.
Useful for audit trails without modifying model code.
"""
@event.listens_for(session_factory, "after_flush")
def log_changes(session, flush_context):
for obj in session.new:
print(f"[AUDIT] INSERT {type(obj).__name__} id={getattr(obj,'id','?')}")
for obj in session.dirty:
print(f"[AUDIT] UPDATE {type(obj).__name__} id={getattr(obj,'id','?')}")
for obj in session.deleted:
print(f"[AUDIT] DELETE {type(obj).__name__} id={getattr(obj,'id','?')}")
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("SQLAlchemy 2.0 ORM Demo")
print("=" * 50)
# In-memory SQLite for demo
engine = create_db_engine("sqlite:///:memory:", echo=False)
Base.metadata.create_all(engine)
Session_ = make_session_factory(engine)
with Session_() as s:
# Create users
alice = create_user(s, "[email protected]", "Alice", "admin")
bob = create_user(s, "[email protected]", "Bob")
# Create products
s.add_all([
Product(sku="P001", name="Widget A", price=29.99, stock=100),
Product(sku="P002", name="Widget B", price=49.99, stock=50),
])
s.flush()
# Create orders
p1 = s.scalars(select(Product).where(Product.sku=="P001")).first()
order = Order(user_id=alice.id, total=59.98, status="completed")
s.add(order)
s.flush()
s.add(OrderItem(order_id=order.id, product_id=p1.id, quantity=2, unit_price=29.99))
s.commit()
# Queries
user = get_user_by_email(s, "[email protected]")
print(f"\nFetched: {user}")
users = get_users_with_orders(s, active=True)
print(f"Active users: {len(users)}")
top_users = sales_by_user(s, limit=5)
print(f"\nTop users by revenue:")
for u in top_users:
print(f" {u['email']}: {u['order_count']} orders, ${u['total_revenue']:.2f}")
# Bulk insert
records = [{"email": f"user{i}@example.com", "name": f"User {i}", "active": True}
for i in range(10)]
n = bulk_insert_users(s, records)
s.commit()
print(f"\nBulk inserted {n} users")
total = s.scalar(select(func.count(User.id)))
print(f"Total users: {total}")
For the Django ORM alternative when a full-stack web framework is needed — Django provides admin UI, forms, and auth while SQLAlchemy 2.0’s select().options(selectinload()) pattern for relationship loading, execution_options(synchronize_session="fetch") for bulk updates, and the Core expression language for cross-database SQL give fine-grained control over query shape that Django’s ORM obscures, making SQLAlchemy the preferred choice for data-heavy microservices and tools where query efficiency is critical. For the raw psycopg2 alternative when direct SQL is preferred — psycopg2 provides no connection pooling or schema management while SQLAlchemy’s QueuePool with pool_pre_ping, metadata.create_all, and alembic migration suite replace an entire infrastructure stack, and the Unit of Work session pattern batches all inserts/updates/deletes in one transaction flush with automatic rollback on exception. The Claude Skills 360 bundle includes SQLAlchemy skill sets covering DeclarativeBase ORM models, relationships with cascade, session transactions, SQLAlchemy 2.0 select/insert/update/delete, eager loading with selectinload and joinedload, aggregation queries with func, bulk inserts, PostgreSQL upsert, event listeners, and async sessions with AsyncEngine. Start with the free tier to try ORM code generation.