aiosqlite wraps Python’s sqlite3 with async/await support. pip install aiosqlite. Open: import aiosqlite; async with aiosqlite.connect("db.sqlite3") as db:. Execute: await db.execute("SELECT * FROM t WHERE id = ?", (1,)). Fetch: async with await db.execute("SELECT * FROM t") as cursor: rows = await cursor.fetchall(). cursor.fetchone() cursor.fetchmany(10). Commit: await db.commit(). Row: db.row_factory = aiosqlite.Row → access by column name row["name"]. sqlite3.Row — row[0] order access. Execute many: await db.executemany("INSERT INTO t VALUES (?,?)", rows). Script: await db.executescript("CREATE TABLE t(id INT); INSERT INTO t VALUES(1)"). WAL: await db.execute("PRAGMA journal_mode=WAL") — concurrent reads while write in progress. Pragma: await db.execute("PRAGMA foreign_keys=ON"). In-memory: aiosqlite.connect(":memory:"). Isolation: async with aiosqlite.connect("db.sqlite3", isolation_level=None) as db: — autocommit. await db.execute("BEGIN") / await db.execute("COMMIT"). Path: aiosqlite.connect(Path("data") / "app.db"). Timeout: aiosqlite.connect("db.sqlite3", timeout=30). FastAPI: create pool per-app-instance; pass connection via dependency injection. Claude Code generates aiosqlite CRUD layers, async FastAPI handlers, and embedded database pipelines.
CLAUDE.md for aiosqlite
## aiosqlite Stack
- Version: aiosqlite >= 0.19 | pip install aiosqlite
- Open: async with aiosqlite.connect("app.db") as db: | Path("data/app.db")
- Query: await db.execute(sql, params) | cursor = await db.execute(...)
- Fetch: await cursor.fetchone() | fetchall() | fetchmany(n)
- Commit: await db.commit() | isolation_level=None for autocommit
- Row: db.row_factory = aiosqlite.Row → row["col"] dict-like access
- Batch: await db.executemany(sql, list_of_params)
aiosqlite Async SQLite Pipeline
# app/sqlite_db.py — aiosqlite async CRUD, migrations, and FastAPI integration
from __future__ import annotations
import contextlib
import sqlite3
from collections.abc import AsyncIterator
from pathlib import Path
from typing import Any
import aiosqlite
# ─────────────────────────────────────────────────────────────────────────────
# 1. Connection helpers
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_PRAGMAS = [
"PRAGMA journal_mode=WAL", # concurrent reads + one writer
"PRAGMA foreign_keys=ON", # enforce FK constraints
"PRAGMA synchronous=NORMAL", # balance durability vs speed
"PRAGMA cache_size=-64000", # 64 MB page cache
"PRAGMA temp_store=MEMORY", # store temp tables in RAM
]
async def open_db(
path: str | Path,
pragmas: list[str] | None = None,
) -> aiosqlite.Connection:
"""
Open an aiosqlite connection with recommended pragmas.
WAL mode allows multiple concurrent readers with a single writer.
"""
path = str(path)
db = await aiosqlite.connect(path)
db.row_factory = aiosqlite.Row # row["col"] access
for pragma in (pragmas if pragmas is not None else DEFAULT_PRAGMAS):
await db.execute(pragma)
return db
@contextlib.asynccontextmanager
async def get_db(
path: str | Path,
pragmas: list[str] | None = None,
) -> AsyncIterator[aiosqlite.Connection]:
"""Async context manager: open a connection and commit+close on exit."""
db = await open_db(path, pragmas=pragmas)
try:
yield db
await db.commit()
except Exception:
await db.rollback()
raise
finally:
await db.close()
# ─────────────────────────────────────────────────────────────────────────────
# 2. Query helpers
# ─────────────────────────────────────────────────────────────────────────────
async def fetch_all(
db: aiosqlite.Connection,
sql: str,
params: tuple | list | None = None,
) -> list[dict[str, Any]]:
"""Execute a SELECT query and return all rows as dicts."""
async with await db.execute(sql, params or ()) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def fetch_one(
db: aiosqlite.Connection,
sql: str,
params: tuple | list | None = None,
) -> dict[str, Any] | None:
"""Execute a SELECT query and return the first row, or None."""
async with await db.execute(sql, params or ()) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
async def fetch_scalar(
db: aiosqlite.Connection,
sql: str,
params: tuple | list | None = None,
) -> Any:
"""Return the first column of the first row (e.g. COUNT(*), MAX(id))."""
async with await db.execute(sql, params or ()) as cursor:
row = await cursor.fetchone()
return row[0] if row else None
async def execute_write(
db: aiosqlite.Connection,
sql: str,
params: tuple | list | None = None,
) -> int:
"""
Execute an INSERT/UPDATE/DELETE and return lastrowid for INSERTs.
Caller must commit (or use the get_db context manager).
"""
async with await db.execute(sql, params or ()) as cursor:
return cursor.lastrowid or cursor.rowcount
async def execute_many_write(
db: aiosqlite.Connection,
sql: str,
params_list: list[tuple | list],
) -> int:
"""Execute DML for a list of parameter sets. Returns rows affected."""
await db.executemany(sql, params_list)
async with await db.execute("SELECT changes()") as cur:
row = await cur.fetchone()
return row[0] if row else 0
# ─────────────────────────────────────────────────────────────────────────────
# 3. Schema management
# ─────────────────────────────────────────────────────────────────────────────
async def apply_schema(db: aiosqlite.Connection, ddl: str) -> None:
"""
Execute a DDL script (CREATE TABLE, CREATE INDEX, etc.).
Uses executescript — each statement automatically committed.
"""
await db.executescript(ddl)
async def table_exists(db: aiosqlite.Connection, table_name: str) -> bool:
"""Return True if a table exists in the database."""
count = await fetch_scalar(
db,
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
)
return bool(count)
async def get_table_info(db: aiosqlite.Connection, table_name: str) -> list[dict[str, Any]]:
"""Return column info for a table (name, type, notnull, pk, dflt_value)."""
return await fetch_all(db, f"PRAGMA table_info({table_name})")
# ─────────────────────────────────────────────────────────────────────────────
# 4. CRUD helpers
# ─────────────────────────────────────────────────────────────────────────────
async def insert(
db: aiosqlite.Connection,
table: str,
data: dict[str, Any],
) -> int:
"""INSERT a row and return its rowid."""
cols = list(data.keys())
vals = list(data.values())
ph = ", ".join(["?"] * len(cols))
col_str = ", ".join(cols)
sql = f"INSERT INTO {table} ({col_str}) VALUES ({ph})"
return await execute_write(db, sql, vals)
async def upsert(
db: aiosqlite.Connection,
table: str,
data: dict[str, Any],
conflict_cols: list[str],
) -> int:
"""
INSERT OR REPLACE / INSERT ... ON CONFLICT DO UPDATE.
conflict_cols: column(s) that form the unique constraint for upsert.
"""
cols = list(data.keys())
vals = list(data.values())
ph = ", ".join(["?"] * len(cols))
col_str = ", ".join(cols)
set_str = ", ".join(f"{c}=excluded.{c}" for c in cols if c not in conflict_cols)
conflict = ", ".join(conflict_cols)
sql = (
f"INSERT INTO {table} ({col_str}) VALUES ({ph}) "
f"ON CONFLICT ({conflict}) DO UPDATE SET {set_str}"
)
return await execute_write(db, sql, vals)
async def update(
db: aiosqlite.Connection,
table: str,
data: dict[str, Any],
where: str,
where_params: tuple | list,
) -> int:
"""UPDATE rows matching `where`. Returns rowcount."""
set_clause = ", ".join(f"{col}=?" for col in data.keys())
sql = f"UPDATE {table} SET {set_clause} WHERE {where}"
async with await db.execute(sql, list(data.values()) + list(where_params)) as cur:
return cur.rowcount
async def delete(
db: aiosqlite.Connection,
table: str,
where: str,
where_params: tuple | list,
) -> int:
"""DELETE rows matching `where`. Returns rowcount."""
sql = f"DELETE FROM {table} WHERE {where}"
async with await db.execute(sql, list(where_params)) as cur:
return cur.rowcount
# ─────────────────────────────────────────────────────────────────────────────
# 5. FastAPI dependency
# ─────────────────────────────────────────────────────────────────────────────
def make_fastapi_db_dependency(db_path: str | Path, pragmas: list[str] | None = None):
"""
Return a FastAPI dependency that yields an open aiosqlite connection.
Usage:
get_db_dep = make_fastapi_db_dependency("app.db")
@app.get("/users/{user_id}")
async def get_user(user_id: int, db=Depends(get_db_dep)):
return await fetch_one(db, "SELECT * FROM users WHERE id=?", (user_id,))
"""
@contextlib.asynccontextmanager
async def _dep():
async with get_db(db_path, pragmas=pragmas) as db:
yield db
return _dep
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
import asyncio
SCHEMA = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
score REAL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tags (
user_id INTEGER REFERENCES users(id),
tag TEXT NOT NULL,
PRIMARY KEY (user_id, tag)
);
"""
async def demo():
async with get_db(":memory:") as db:
print("=== Schema ===")
await apply_schema(db, SCHEMA)
exists = await table_exists(db, "users")
print(f" users table exists: {exists}")
print("\n=== Insert ===")
uid1 = await insert(db, "users", {"name": "Alice", "email": "[email protected]", "score": 98.5})
uid2 = await insert(db, "users", {"name": "Bob", "email": "[email protected]", "score": 87.0})
print(f" Inserted user IDs: {uid1}, {uid2}")
await db.commit()
print("\n=== Fetch all ===")
users = await fetch_all(db, "SELECT * FROM users")
for u in users:
print(f" {dict(u)}")
print("\n=== Fetch one ===")
alice = await fetch_one(db, "SELECT * FROM users WHERE name=?", ("Alice",))
print(f" Alice: {alice}")
print("\n=== Scalar ===")
count = await fetch_scalar(db, "SELECT COUNT(*) FROM users")
avg = await fetch_scalar(db, "SELECT AVG(score) FROM users")
print(f" count={count} avg_score={avg:.1f}")
print("\n=== Upsert ===")
await upsert(db, "users",
{"name": "Alice Updated", "email": "[email protected]", "score": 99.0},
conflict_cols=["email"])
await db.commit()
alice2 = await fetch_one(db, "SELECT * FROM users WHERE email=?", ("[email protected]",))
print(f" After upsert: {alice2}")
print("\n=== Batch insert ===")
tags = [(uid1, "python"), (uid1, "asyncio"), (uid2, "javascript")]
await execute_many_write(db, "INSERT INTO tags VALUES (?,?)", tags)
await db.commit()
tag_rows = await fetch_all(db, "SELECT * FROM tags")
for t in tag_rows:
print(f" tag: {dict(t)}")
print("\n=== Update + Delete ===")
rows_updated = await update(db, "users", {"score": 0.0}, "name=?", ("Bob",))
print(f" rows updated: {rows_updated}")
await db.commit()
rows_deleted = await delete(db, "users", "name=?", ("Bob",))
print(f" rows deleted: {rows_deleted}")
await db.commit()
final = await fetch_all(db, "SELECT * FROM users")
print(f" users after delete: {final}")
if __name__ == "__main__":
asyncio.run(demo())
For the sqlite3 (stdlib) alternative — Python’s built-in sqlite3 module is synchronous; in an async FastAPI or asyncio application, every connection.execute() call blocks the event loop thread; aiosqlite wraps sqlite3 in a background thread and exposes an async/await interface, so your FastAPI routes remain non-blocking even during database queries. For the SQLAlchemy async alternative — SQLAlchemy’s async ORM supports aiosqlite as a dialect (create_async_engine("sqlite+aiosqlite:///app.db")), giving you full ORM models with async queries; use the raw aiosqlite API when you want a lightweight layer without ORM overhead, and use SQLAlchemy when you need model relationships, migrations via Alembic, or a path to switch to PostgreSQL later. The Claude Skills 360 bundle includes aiosqlite skill sets covering aiosqlite.connect() with pragmas, WAL mode and foreign keys setup, fetch_all()/fetch_one()/fetch_scalar() helpers, execute_write() and execute_many_write(), insert()/upsert()/update()/delete() CRUD, apply_schema() DDL execution, table_exists()/get_table_info() introspection, aiosqlite.Row dict-like access, get_db() auto-commit/rollback context manager, and FastAPI dependency injection. Start with the free tier to try async SQLite code generation.