Alembic is the database migration tool for SQLAlchemy. pip install alembic. alembic init alembic — creates alembic/ directory with env.py and alembic.ini. Set sqlalchemy.url in alembic.ini. Configure target_metadata in env.py: from myapp.models import Base; target_metadata = Base.metadata. Generate migration: alembic revision --autogenerate -m "add users table". Apply: alembic upgrade head. Rollback one: alembic downgrade -1. Full rollback: alembic downgrade base. Check current: alembic current. History: alembic history --verbose. Create empty: alembic revision -m "seed data". Column ops: op.add_column("users", sa.Column("bio", sa.Text())). op.drop_column("users", "old_col"). op.alter_column("users", "name", nullable=False). op.create_index("ix_users_email", "users", ["email"], unique=True). op.drop_index("ix_users_email"). Create table: op.create_table("tags", sa.Column("id", sa.Integer, primary_key=True), sa.Column("name", sa.String(100))). Drop table: op.drop_table("tags"). Rename: op.rename_table("old", "new"). Data migration: op.bulk_insert(table, rows). op.execute("UPDATE users SET role='user' WHERE role IS NULL"). Stamp: alembic stamp head — marks DB as current without running migrations. Branch: alembic revision --head base --branch-label schema_v2. Claude Code generates Alembic migration environments, autogenerate upgrade/downgrade pairs, and data seed migrations.
CLAUDE.md for Alembic
## Alembic Stack
- Version: alembic >= 1.13 | sqlalchemy >= 2.0
- Init: alembic init alembic — creates env.py, alembic.ini, versions/
- Config: set sqlalchemy.url in alembic.ini | target_metadata = Base.metadata
- Generate: alembic revision --autogenerate -m "description"
- Apply: alembic upgrade head | alembic downgrade -1 | alembic downgrade base
- Ops: op.create_table | op.add_column | op.alter_column | op.create_index
- Data: op.bulk_insert | op.execute for raw SQL data migrations
- SQLite: use render_as_batch=True in context.configure for alter support
Alembic Migrations Pipeline
# db/alembic_pipeline.py — database schema migrations with Alembic
# This file shows helpers for programmatic Alembic usage.
# Typical workflow: alembic init → edit env.py → alembic revision --autogenerate → alembic upgrade head
from __future__ import annotations
import os
from logging.config import fileConfig
from pathlib import Path
from typing import Any
import sqlalchemy as sa
from sqlalchemy import engine_from_config, pool, text
from sqlalchemy.orm import DeclarativeBase
from alembic import context, command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.environment import EnvironmentContext
from alembic.runtime.migration import MigrationContext
# ── 0. Models (example schema) ────────────────────────────────────────────────
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
email = sa.Column(sa.String(255), nullable=False, unique=True)
name = sa.Column(sa.String(100), nullable=True)
role = sa.Column(sa.String(50), nullable=False, server_default="user")
created_at = sa.Column(sa.DateTime(timezone=True), server_default=sa.func.now())
is_active = sa.Column(sa.Boolean, nullable=False, server_default=sa.true())
class Post(Base):
__tablename__ = "posts"
id = sa.Column(sa.Integer, primary_key=True)
user_id = sa.Column(sa.Integer, sa.ForeignKey("users.id", ondelete="CASCADE"))
title = sa.Column(sa.String(500), nullable=False)
body = sa.Column(sa.Text, nullable=True)
published = sa.Column(sa.Boolean, nullable=False, server_default=sa.false())
created_at = sa.Column(sa.DateTime(timezone=True), server_default=sa.func.now())
class Tag(Base):
__tablename__ = "tags"
id = sa.Column(sa.Integer, primary_key=True)
name = sa.Column(sa.String(100), nullable=False, unique=True)
class PostTag(Base):
__tablename__ = "post_tags"
post_id = sa.Column(sa.Integer, sa.ForeignKey("posts.id", ondelete="CASCADE"),
primary_key=True)
tag_id = sa.Column(sa.Integer, sa.ForeignKey("tags.id", ondelete="CASCADE"),
primary_key=True)
# ── 1. Alembic config factory ─────────────────────────────────────────────────
def make_alembic_config(
db_url: str,
migrations_dir: str = "alembic",
script_location: str = None,
) -> Config:
"""
Create an Alembic Config object programmatically.
Useful for running migrations from application code (e.g., at startup).
"""
cfg = Config()
cfg.set_main_option("script_location", script_location or migrations_dir)
cfg.set_main_option("sqlalchemy.url", db_url)
# Prevent Alembic from configuring its own logging
cfg.attributes["configure_logger"] = False
return cfg
# ── 2. Programmatic migration runner ─────────────────────────────────────────
def run_migrations(db_url: str, migrations_dir: str = "alembic") -> None:
"""
Run all pending migrations up to head.
Call this at application startup to ensure the DB is always up-to-date.
"""
cfg = make_alembic_config(db_url, migrations_dir)
command.upgrade(cfg, "head")
print("Migrations: applied up to head.")
def rollback_migration(
db_url: str,
target: str = "-1", # "-1", "base", or a revision ID
migrations_dir: str = "alembic",
) -> None:
"""Roll back to a target revision."""
cfg = make_alembic_config(db_url, migrations_dir)
command.downgrade(cfg, target)
print(f"Migrations: rolled back to {target}.")
def get_current_revision(db_url: str) -> str | None:
"""Return the current migration revision applied to the database."""
engine = sa.create_engine(db_url)
with engine.connect() as conn:
migration_ctx = MigrationContext.configure(conn)
return migration_ctx.get_current_revision()
def get_pending_migrations(db_url: str, migrations_dir: str = "alembic") -> list[str]:
"""Return list of revision IDs that have not yet been applied."""
cfg = make_alembic_config(db_url, migrations_dir)
script = ScriptDirectory.from_config(cfg)
engine = sa.create_engine(db_url)
with engine.connect() as conn:
migration_ctx = MigrationContext.configure(conn)
current_rev = migration_ctx.get_current_revision()
pending = []
for rev in script.iterate_revisions("head", current_rev or "base"):
if rev.revision != current_rev:
pending.append(rev.revision)
return pending
def generate_migration(
db_url: str,
message: str,
migrations_dir: str = "alembic",
autogenerate: bool = True,
) -> str:
"""
Generate a new migration script, optionally with autogenerate.
Returns the path to the generated script.
"""
cfg = make_alembic_config(db_url, migrations_dir)
# Set target_metadata so autogenerate can diff the current schema
cfg.attributes["target_metadata"] = Base.metadata
rev = command.revision(cfg, message=message, autogenerate=autogenerate)
return rev.path if hasattr(rev, "path") else str(rev)
# ── 3. Migration script examples (versions/*.py patterns) ────────────────────
# The following is the typical structure of a generated migration file.
# Alembic writes these to versions/ when you run `alembic revision --autogenerate`.
EXAMPLE_MIGRATION = '''
"""add users and posts tables
Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2026-01-15 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = "a1b2c3d4e5f6"
down_revision = None # None = initial migration
branch_labels = None
depends_on = None
def upgrade() -> None:
# Create users table
op.create_table(
"users",
sa.Column("id", sa.Integer(), nullable=False, autoincrement=True),
sa.Column("email", sa.String(255), nullable=False),
sa.Column("name", sa.String(100), nullable=True),
sa.Column("role", sa.String(50), nullable=False, server_default="user"),
sa.Column("created_at", sa.DateTime(timezone=True),
server_default=sa.text("NOW()")),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default=sa.text("true")),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("email", name="uq_users_email"),
)
op.create_index("ix_users_email", "users", ["email"], unique=True)
op.create_index("ix_users_is_active","users", ["is_active"])
# Create posts table
op.create_table(
"posts",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("user_id", sa.Integer(), nullable=True),
sa.Column("title", sa.String(500), nullable=False),
sa.Column("body", sa.Text(), nullable=True),
sa.Column("published", sa.Boolean(), nullable=False, server_default=sa.text("false")),
sa.Column("created_at", sa.DateTime(timezone=True),
server_default=sa.text("NOW()")),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_posts_user_id", "posts", ["user_id"])
op.create_index("ix_posts_published", "posts", ["published", "created_at"])
def downgrade() -> None:
op.drop_index("ix_posts_published", table_name="posts")
op.drop_index("ix_posts_user_id", table_name="posts")
op.drop_table("posts")
op.drop_index("ix_users_is_active", table_name="users")
op.drop_index("ix_users_email", table_name="users")
op.drop_table("users")
'''
EXAMPLE_ALTER_MIGRATION = '''
"""add bio column and role index to users
Revision ID: b2c3d4e5f6a7
Revises: a1b2c3d4e5f6
Create Date: 2026-02-01 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = "b2c3d4e5f6a7"
down_revision = "a1b2c3d4e5f6"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add a new column
op.add_column("users", sa.Column("bio", sa.Text(), nullable=True))
# Change column type (PostgreSQL: cast required; SQLite: use batch)
with op.batch_alter_table("users") as batch_op:
batch_op.alter_column(
"name",
existing_type=sa.String(100),
type_=sa.String(200),
nullable=False,
server_default="Anonymous",
)
# Add an index
op.create_index("ix_users_role", "users", ["role"])
# Rename a column
op.alter_column("users", "is_active", new_column_name="active")
def downgrade() -> None:
op.alter_column("users", "active", new_column_name="is_active")
op.drop_index("ix_users_role", table_name="users")
with op.batch_alter_table("users") as batch_op:
batch_op.alter_column("name", type_=sa.String(100), nullable=True,
server_default=None)
op.drop_column("users", "bio")
'''
EXAMPLE_DATA_MIGRATION = '''
"""seed default roles
Revision ID: c3d4e5f6a7b8
Revises: b2c3d4e5f6a7
Create Date: 2026-03-01 09:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = "c3d4e5f6a7b8"
down_revision = "b2c3d4e5f6a7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# Data migration: back-fill role for existing users
op.execute(
"UPDATE users SET role = 'admin' WHERE email LIKE '%@company.com'"
)
# Bulk insert using op.bulk_insert (table reflection approach)
roles_table = table(
"roles",
column("id", sa.Integer()),
column("name", sa.String()),
column("description", sa.String()),
)
op.bulk_insert(roles_table, [
{"id": 1, "name": "admin", "description": "Full access"},
{"id": 2, "name": "editor", "description": "Can publish posts"},
{"id": 3, "name": "viewer", "description": "Read-only access"},
])
# Execute raw SQL for complex transformations
op.execute(sa.text("""
INSERT INTO post_tags (post_id, tag_id)
SELECT p.id, t.id
FROM posts p, tags t
WHERE t.name = 'general' AND p.published = true
ON CONFLICT DO NOTHING
"""))
def downgrade() -> None:
op.execute("DELETE FROM roles WHERE id IN (1, 2, 3)")
op.execute(
"UPDATE users SET role = 'user' WHERE email LIKE '%@company.com'"
)
'''
# ── 4. env.py template ────────────────────────────────────────────────────────
ENV_PY_TEMPLATE = '''
# alembic/env.py — configure migration context
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models so metadata is populated
from myapp.models import Base # adjust import to your package
config = context.config
target_metadata = Base.metadata
if config.config_file_name is not None:
fileConfig(config.config_file_name)
def run_migrations_offline() -> None:
"""Emit SQL to stdout without a live DB connection (for DBA review)."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True, # detect column type changes
compare_server_default=True,
render_as_batch=True, # needed for SQLite ALTER support
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations against a live DB connection."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
'''
# ── 5. Migration health check ─────────────────────────────────────────────────
def check_migration_health(db_url: str, migrations_dir: str = "alembic") -> dict:
"""
Verify the database migration state is consistent.
Returns a dict with current revision, is_up_to_date, and pending count.
"""
current = get_current_revision(db_url)
pending = get_pending_migrations(db_url, migrations_dir)
cfg = make_alembic_config(db_url, migrations_dir)
script = ScriptDirectory.from_config(cfg)
head_rev = script.get_current_head()
return {
"current_revision": current or "none (no migrations applied)",
"head_revision": head_rev,
"is_up_to_date": current == head_rev,
"pending_count": len(pending),
"pending_revisions": pending,
}
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("Alembic Database Migrations Demo")
print("=" * 50)
print("\nTypical workflow:")
print(" 1. alembic init alembic # create migrations dir")
print(" 2. Edit alembic/env.py # set target_metadata")
print(" 3. alembic revision --autogenerate -m 'initial'")
print(" 4. alembic upgrade head # apply migrations")
print(" 5. alembic current # show current revision")
print(" 6. alembic history --verbose # show all revisions")
print(" 7. alembic downgrade -1 # roll back one step")
print("\nProgrammatic usage (e.g., app startup):")
print(' db_url = "postgresql+psycopg2://user:pass@localhost/mydb"')
print(" run_migrations(db_url) # upgrade head on startup")
print(" health = check_migration_health(db_url)")
print(" print(health['is_up_to_date']) # True if no pending migrations")
print("\nalembic.ini key settings:")
print(" sqlalchemy.url = postgresql+psycopg2://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s/%(DB_NAME)s")
print(" file_template = %%(year)d%%(month).2d%%(day).2d_%%(rev)s_%%(slug)s")
print(" timezone = UTC")
print("\nSQLite note: use render_as_batch=True in env.py")
print(" SQLite does not support ALTER COLUMN — batch mode recreates the table.")
For the raw SQL migration scripts alternative — hand-written SQL files require tracking execution order and applied state manually in a custom table while Alembic’s alembic_version table and ScriptDirectory handle dependency graphs automatically, --autogenerate diffs Base.metadata against the live schema to generate upgrade() / downgrade() pairs in under a second, and batch_alter_table transparently rewrites SQLite tables that don’t support ALTER COLUMN, making the same migration code work across PostgreSQL, MySQL, and SQLite. For the Django migrations alternative — Django migrations are framework-specific and tied to its ORM while Alembic works with any SQLAlchemy schema, alembic revision --autogenerate can be called at deploy time in CI/CD so run_migrations(db_url) at startup ensures zero-downtime schema drift, and compare_type=True in env.py catches column type changes that naive autogenerate would miss, preventing silent truncation bugs in production. The Claude Skills 360 bundle includes Alembic skill sets covering alembic init and env.py setup, autogenerate revision workflow, create_table and add_column/alter_column/drop_column ops, create_index and drop_index, batch_alter_table for SQLite, bulk_insert and execute data migrations, programmatic upgrade/downgrade/current, and migration health checks. Start with the free tier to try schema migration code generation.