Python’s sqlite3 module provides a zero-configuration embedded SQL database. import sqlite3. connect: con = sqlite3.connect("app.db"). in-memory: sqlite3.connect(":memory:"). cursor: cur = con.cursor(); cur.execute("SELECT ..."). fetchone/all: cur.fetchone(); cur.fetchall(). row_factory: con.row_factory = sqlite3.Row — access columns by name. execute context: with con: con.execute("INSERT ...") — auto-commits or rolls back. parameterized: cur.execute("SELECT * FROM t WHERE id = ?", (id_val,)) — NEVER use % or f-string. executemany: cur.executemany("INSERT INTO t VALUES (?,?)", rows). create_table: con.execute("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, ...)"). WAL mode: con.execute("PRAGMA journal_mode=WAL") — better concurrent reads. check_same_thread: sqlite3.connect("app.db", check_same_thread=False) — multi-thread. backup: con.backup(dest_con). register_adapter: sqlite3.register_adapter(type, fn). register_converter: sqlite3.register_converter("typename", fn). detect_types: sqlite3.connect(db, detect_types=sqlite3.PARSE_DECLTYPES). create_function: con.create_function("fn_name", n_args, callable). lastrowid: cur.lastrowid. rowcount: cur.rowcount. isolation_level: sqlite3.connect("app.db", isolation_level=None) — autocommit. close: con.close(). Claude Code generates typed repository classes, migration runners, and in-memory test fixtures.
CLAUDE.md for sqlite3
## sqlite3 Stack
- Stdlib: import sqlite3
- Connect: con = sqlite3.connect("app.db"); con.row_factory = sqlite3.Row
- Query: con.execute("SELECT ... WHERE id = ?", (val,)) — ALWAYS use ? placeholders
- Write: with con: con.execute("INSERT ...") / con.executemany("INSERT ...", rows)
- WAL: con.execute("PRAGMA journal_mode=WAL") — enable for read-heavy workloads
- Test: sqlite3.connect(":memory:") — isolated, ephemeral database for unit tests
sqlite3 Repository Pipeline
# app/db.py — connect, Row factory, typed repository, migrations, in-memory
from __future__ import annotations
import sqlite3
import threading
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Generator, Iterable, Iterator
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection factory
# ─────────────────────────────────────────────────────────────────────────────
def open_db(
path: str | Path = ":memory:",
wal: bool = True,
timeout: float = 30.0,
row_factory: bool = True,
) -> sqlite3.Connection:
"""
Open an SQLite connection with sensible defaults.
Example:
con = open_db("app.db")
con = open_db(":memory:") # fast test database
"""
p = str(path)
con = sqlite3.connect(p, timeout=timeout, check_same_thread=False)
if row_factory:
con.row_factory = sqlite3.Row
con.execute("PRAGMA foreign_keys = ON")
if wal and p != ":memory:":
con.execute("PRAGMA journal_mode = WAL")
return con
@contextmanager
def db_transaction(con: sqlite3.Connection) -> Generator[sqlite3.Connection, None, None]:
"""
Context manager that commits on success and rolls back on any exception.
Example:
with db_transaction(con) as c:
c.execute("INSERT INTO orders VALUES (?,?)", (order_id, amount))
c.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, user_id))
"""
try:
yield con
con.commit()
except Exception:
con.rollback()
raise
@contextmanager
def temp_db() -> Generator[sqlite3.Connection, None, None]:
"""
Yield an in-memory SQLite connection; close on exit.
Useful for isolated test fixtures.
Example:
with temp_db() as con:
setup_schema(con)
assert count_users(con) == 0
"""
con = open_db(":memory:")
try:
yield con
finally:
con.close()
# ─────────────────────────────────────────────────────────────────────────────
# 2. Schema migrations
# ─────────────────────────────────────────────────────────────────────────────
def get_schema_version(con: sqlite3.Connection) -> int:
"""Return current user_version pragma value."""
row = con.execute("PRAGMA user_version").fetchone()
return row[0] if row else 0
def set_schema_version(con: sqlite3.Connection, version: int) -> None:
"""Set user_version pragma (requires no ? placeholder)."""
con.execute(f"PRAGMA user_version = {int(version)}")
def run_migrations(
con: sqlite3.Connection,
migrations: list[tuple[int, str]],
) -> int:
"""
Run pending migrations in version order.
Each migration is a (version, sql) tuple.
Returns number of migrations applied.
Example:
MIGRATIONS = [
(1, "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT UNIQUE)"),
(2, "ALTER TABLE users ADD COLUMN created_at TEXT"),
]
run_migrations(con, MIGRATIONS)
"""
current = get_schema_version(con)
applied = 0
for version, sql in sorted(migrations, key=lambda m: m[0]):
if version > current:
with db_transaction(con):
for stmt in sql.strip().split(";"):
stmt = stmt.strip()
if stmt:
con.execute(stmt)
set_schema_version(con, version)
applied += 1
return applied
# ─────────────────────────────────────────────────────────────────────────────
# 3. Query helpers
# ─────────────────────────────────────────────────────────────────────────────
def fetch_one(
con: sqlite3.Connection,
sql: str,
params: tuple = (),
) -> sqlite3.Row | None:
"""
Execute a SELECT and return the first row or None.
Example:
user = fetch_one(con, "SELECT * FROM users WHERE email = ?", (email,))
if user:
print(user["name"])
"""
return con.execute(sql, params).fetchone()
def fetch_all(
con: sqlite3.Connection,
sql: str,
params: tuple = (),
) -> list[sqlite3.Row]:
"""Execute a SELECT and return all rows."""
return con.execute(sql, params).fetchall()
def fetch_scalar(
con: sqlite3.Connection,
sql: str,
params: tuple = (),
default: Any = None,
) -> Any:
"""
Execute a SELECT and return a single scalar value.
Example:
count = fetch_scalar(con, "SELECT COUNT(*) FROM users WHERE active = 1")
total = fetch_scalar(con, "SELECT SUM(amount) FROM orders WHERE user_id = ?", (uid,))
"""
row = con.execute(sql, params).fetchone()
return row[0] if row else default
def execute_script(con: sqlite3.Connection, sql: str) -> None:
"""
Execute a multi-statement SQL script (e.g., a migration file).
Example:
execute_script(con, Path("schema.sql").read_text())
"""
con.executescript(sql)
def iter_rows(
con: sqlite3.Connection,
sql: str,
params: tuple = (),
) -> Iterator[sqlite3.Row]:
"""
Lazily iterate query results — memory-efficient for large result sets.
Example:
for row in iter_rows(con, "SELECT * FROM events ORDER BY ts"):
process(row)
"""
return iter(con.execute(sql, params))
# ─────────────────────────────────────────────────────────────────────────────
# 4. Typed repository pattern
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class User:
name: str
email: str
id: int = 0
active: bool = True
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class UserRepository:
"""
CRUD repository for the users table.
Example:
repo = UserRepository(con)
uid = repo.create(User(name="Alice", email="[email protected]"))
user = repo.get(uid)
repo.update(uid, active=False)
users = repo.list_active(limit=10)
"""
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL
)
"""
def __init__(self, con: sqlite3.Connection) -> None:
self.con = con
con.execute(self.SCHEMA)
con.commit()
def create(self, user: User) -> int:
"""Insert user; return new row ID."""
with db_transaction(self.con):
cur = self.con.execute(
"INSERT INTO users (name, email, active, created_at) VALUES (?,?,?,?)",
(user.name, user.email, int(user.active), user.created_at),
)
return cur.lastrowid
def get(self, user_id: int) -> User | None:
"""Fetch user by ID or return None."""
row = fetch_one(self.con, "SELECT * FROM users WHERE id = ?", (user_id,))
return self._to_user(row) if row else None
def get_by_email(self, email: str) -> User | None:
row = fetch_one(self.con, "SELECT * FROM users WHERE email = ?", (email,))
return self._to_user(row) if row else None
def update(self, user_id: int, **fields) -> bool:
"""Update specific fields; return True if row was found."""
if not fields:
return False
cols = ", ".join(f"{k} = ?" for k in fields)
vals = list(fields.values()) + [user_id]
with db_transaction(self.con):
cur = self.con.execute(
f"UPDATE users SET {cols} WHERE id = ?", vals
)
return cur.rowcount > 0
def delete(self, user_id: int) -> bool:
with db_transaction(self.con):
cur = self.con.execute("DELETE FROM users WHERE id = ?", (user_id,))
return cur.rowcount > 0
def list_active(self, limit: int = 100, offset: int = 0) -> list[User]:
rows = fetch_all(
self.con,
"SELECT * FROM users WHERE active = 1 ORDER BY id LIMIT ? OFFSET ?",
(limit, offset),
)
return [self._to_user(r) for r in rows]
def count(self) -> int:
return fetch_scalar(self.con, "SELECT COUNT(*) FROM users", default=0)
def bulk_insert(self, users: Iterable[User]) -> int:
"""Bulk insert; return number of rows inserted."""
data = [(u.name, u.email, int(u.active), u.created_at) for u in users]
with db_transaction(self.con):
self.con.executemany(
"INSERT OR IGNORE INTO users (name, email, active, created_at) VALUES (?,?,?,?)",
data,
)
return len(data)
@staticmethod
def _to_user(row: sqlite3.Row) -> User:
return User(
id=row["id"], name=row["name"], email=row["email"],
active=bool(row["active"]), created_at=row["created_at"],
)
# ─────────────────────────────────────────────────────────────────────────────
# 5. Custom adapters/converters
# ─────────────────────────────────────────────────────────────────────────────
def register_json_column(con: sqlite3.Connection | None = None) -> None:
"""
Register JSON adapter/converter so Python dicts/lists are stored as TEXT.
Call once at app startup.
Example:
register_json_column()
con = sqlite3.connect("app.db", detect_types=sqlite3.PARSE_DECLTYPES)
con.execute("CREATE TABLE t (data JSON)")
con.execute("INSERT INTO t VALUES (?)", ({"key": "value"},))
row = con.execute("SELECT data FROM t").fetchone()
print(type(row[0])) # dict
"""
import json
sqlite3.register_adapter(dict, json.dumps)
sqlite3.register_adapter(list, json.dumps)
sqlite3.register_converter("JSON", json.loads)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== sqlite3 demo ===")
with temp_db() as con:
repo = UserRepository(con)
print("\n--- create users ---")
uid1 = repo.create(User("Alice", "[email protected]"))
uid2 = repo.create(User("Bob", "[email protected]"))
uid3 = repo.create(User("Carol", "[email protected]"))
print(f" inserted IDs: {uid1}, {uid2}, {uid3}")
print(f" total count: {repo.count()}")
print("\n--- get by ID ---")
user = repo.get(uid1)
print(f" user {uid1}: name={user.name}, email={user.email}, active={user.active}")
print("\n--- update ---")
ok = repo.update(uid2, active=False)
print(f" deactivated {uid2}: {ok}")
print("\n--- list_active ---")
active = repo.list_active()
print(f" active users: {[u.name for u in active]}")
print("\n--- bulk_insert ---")
extras = [User(f"User{i}", f"user{i}@example.com") for i in range(5)]
inserted = repo.bulk_insert(extras)
print(f" bulk inserted: {inserted}, total: {repo.count()}")
print("\n--- raw queries ---")
count = fetch_scalar(con, "SELECT COUNT(*) FROM users WHERE active = 1")
print(f" active count via scalar: {count}")
print("\n--- iter_rows ---")
for row in iter_rows(con, "SELECT id, name FROM users WHERE active = 1 ORDER BY id LIMIT 3"):
print(f" {dict(row)}")
print("\n=== done ===")
For the sqlalchemy alternative — SQLAlchemy provides a full ORM (Object-Relational Mapper) supporting PostgreSQL, MySQL, SQLite, and more with declarative model classes, relationship mapping, lazy loading, and a query language (Core + ORM layers); stdlib sqlite3 is SQLite-only, requires manual SQL, and returns dict-like rows — use sqlalchemy for production applications needing to support multiple database backends, complex joins, or migration tooling (with Alembic), stdlib sqlite3 for embedded SQLite in apps, scripts, CLI tools, and anywhere zero-dependency lightweight storage is the requirement. For the peewee alternative — peewee is a small, expressive ORM with Model subclasses, fluent query API, and support for SQLite, PostgreSQL, and MySQL with a much smaller footprint than SQLAlchemy; stdlib sqlite3 has lower overhead and no abstractions — use peewee for projects that need ORM ergonomics without SQLAlchemy’s complexity or macOS/Linux command-line tools embedding a database, stdlib sqlite3 for scripts and utilities where adding any ORM dependency is unwanted. The Claude Skills 360 bundle includes sqlite3 skill sets covering open_db()/db_transaction()/temp_db() connection helpers, get_schema_version()/run_migrations() migration runner, fetch_one()/fetch_all()/fetch_scalar()/iter_rows() query helpers, User dataclass + UserRepository CRUD with bulk_insert(), and register_json_column() custom JSON column adapter. Start with the free tier to try embedded database and sqlite3 repository pipeline code generation.