tinydb is a pure-Python embedded document database backed by JSON. pip install tinydb. Create: from tinydb import TinyDB, Query; db = TinyDB("data.json"). Insert: db.insert({"name": "Alice", "age": 30}). Insert many: db.insert_multiple([{...}, {...}]). Query: User = Query(); db.search(User.name == "Alice"). Operators: ==, !=, <, >, <=, >=, User.tags.any(["admin"]). And/or: (User.age > 18) & (User.active == True). | for or, ~ for not. Contains: db.contains(User.name == "Alice"). Get: db.get(User.name == "Alice") — first match. Count: db.count(User.active == True). All: db.all(). Search regex: User.name.matches(r"A.*"). Update: db.update({"age": 31}, User.name == "Alice"). Update func: db.update(lambda d: d.__setitem__("age", d["age"]+1), User.age < 20). Remove: db.remove(User.active == False). Upsert: db.upsert({"name":"Bob","age":25}, User.name=="Bob"). Tables: db.table("users"); db.table("products"). db.tables(). Truncate: db.truncate(). Close: db.close(). Storage: TinyDB("db.json", storage=CachingMiddleware(JSONStorage)). Memory: TinyDB(storage=MemoryStorage). Serialization: SerializationMiddleware with custom serializers. doc_id field — auto-incremented int. db.get(doc_id=1). db.remove(doc_ids=[1,2]). Claude Code generates tinydb document stores, embedded config databases, and test fixture helpers.
CLAUDE.md for tinydb
## tinydb Stack
- Version: tinydb >= 4.8 | pip install tinydb
- Create: TinyDB("file.json") | TinyDB(storage=MemoryStorage) for in-memory
- Insert: db.insert(dict) → doc_id | db.insert_multiple(list)
- Query: db.search(Q.field == val) | db.get(Q.field == val) | db.all()
- Update: db.update({"field": val}, Q.cond) | db.upsert(dict, Q.cond)
- Tables: db.table("name") — separate namespace | db.tables()
- Caching: CachingMiddleware(JSONStorage) — batch writes for performance
tinydb Document Database Pipeline
# app/db.py — tinydb CRUD, queries, tables, upsert, and typed document helpers
from __future__ import annotations
import copy
import datetime
import json
from pathlib import Path
from typing import Any, Callable
from tinydb import Query, TinyDB, table
from tinydb.middlewares import CachingMiddleware
from tinydb.storages import JSONStorage, MemoryStorage
# ─────────────────────────────────────────────────────────────────────────────
# 1. Database factory
# ─────────────────────────────────────────────────────────────────────────────
def open_db(path: str | Path, cache: bool = True) -> TinyDB:
"""
Open a TinyDB database file.
cache=True wraps storage in CachingMiddleware for better write performance.
Example:
db = open_db("data/app.json")
db.close()
"""
if cache:
return TinyDB(str(path), storage=CachingMiddleware(JSONStorage))
return TinyDB(str(path))
def in_memory_db() -> TinyDB:
"""Create an in-memory TinyDB database (no file I/O)."""
return TinyDB(storage=MemoryStorage)
# ─────────────────────────────────────────────────────────────────────────────
# 2. CRUD helpers
# ─────────────────────────────────────────────────────────────────────────────
def insert(tbl: table.Table, doc: dict) -> int:
"""Insert a document and return its doc_id."""
return tbl.insert(doc)
def insert_many(tbl: table.Table, docs: list[dict]) -> list[int]:
"""Insert multiple documents and return their doc_ids."""
return list(tbl.insert_multiple(docs))
def get_by_id(tbl: table.Table, doc_id: int) -> dict | None:
"""Retrieve a document by its doc_id."""
return tbl.get(doc_id=doc_id)
def get_by(tbl: table.Table, condition) -> dict | None:
"""Retrieve the first document matching condition."""
return tbl.get(condition)
def find(tbl: table.Table, condition=None) -> list[dict]:
"""
Return all documents matching condition.
If condition is None, return all documents.
"""
if condition is None:
return tbl.all()
return tbl.search(condition)
def find_all(tbl: table.Table) -> list[dict]:
"""Return every document in the table."""
return tbl.all()
def count(tbl: table.Table, condition=None) -> int:
"""Count documents matching condition (or all if None)."""
if condition is None:
return len(tbl)
return tbl.count(condition)
def exists(tbl: table.Table, condition) -> bool:
"""Return True if at least one document matches condition."""
return tbl.contains(condition)
def update(tbl: table.Table, updates: dict, condition) -> list[int]:
"""
Update documents matching condition.
Returns list of updated doc_ids.
"""
return tbl.update(updates, condition)
def update_fn(tbl: table.Table, fn: Callable, condition) -> list[int]:
"""Update documents by applying fn(doc) in-place."""
return tbl.update(fn, condition)
def upsert(tbl: table.Table, doc: dict, condition) -> list[int]:
"""Insert if no match; update existing if match found."""
return tbl.upsert(doc, condition)
def remove(tbl: table.Table, condition) -> list[int]:
"""Remove documents matching condition. Returns removed doc_ids."""
return tbl.remove(condition)
def remove_by_ids(tbl: table.Table, doc_ids: list[int]) -> None:
"""Remove specific documents by doc_id."""
tbl.remove(doc_ids=doc_ids)
def truncate(tbl: table.Table) -> None:
"""Remove all documents from the table."""
tbl.truncate()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Query helpers
# ─────────────────────────────────────────────────────────────────────────────
def find_where(tbl: table.Table, **kwargs) -> list[dict]:
"""
Find documents matching exact field values.
Example:
find_where(users, role="admin", active=True)
"""
Q = Query()
conditions = [getattr(Q, key) == val for key, val in kwargs.items()]
if not conditions:
return tbl.all()
combined = conditions[0]
for c in conditions[1:]:
combined = combined & c
return tbl.search(combined)
def find_sorted(
tbl: table.Table,
condition=None,
key: str = "name",
reverse: bool = False,
) -> list[dict]:
"""Find and sort results by a field."""
items = find(tbl, condition)
return sorted(items, key=lambda d: d.get(key, ""), reverse=reverse)
def paginate(
tbl: table.Table,
condition=None,
page: int = 1,
per_page: int = 20,
sort_key: str | None = None,
) -> dict[str, Any]:
"""
Return a page of results.
Returns {"items": [...], "total": n, "page": p, "per_page": pp, "pages": total_pages}.
"""
items = find(tbl, condition)
if sort_key:
items = sorted(items, key=lambda d: d.get(sort_key, ""))
total = len(items)
pages = max(1, (total + per_page - 1) // per_page)
offset = (page - 1) * per_page
return {
"items": items[offset:offset + per_page],
"total": total,
"page": page,
"per_page": per_page,
"pages": pages,
}
# ─────────────────────────────────────────────────────────────────────────────
# 4. Typed document repository
# ─────────────────────────────────────────────────────────────────────────────
class Repository:
"""
Generic typed repository over a TinyDB table.
Usage:
db = open_db("app.json")
users = Repository(db, "users", id_field="email")
users.save({"email": "[email protected]", "name": "Alice", "role": "admin"})
alice = users.find_one("email", "[email protected]")
"""
def __init__(self, db: TinyDB, table_name: str, id_field: str = "id"):
self._tbl = db.table(table_name)
self._id = id_field
self._Q = Query()
def save(self, doc: dict) -> int | list[int]:
"""Upsert by id_field; returns doc_id(s)."""
val = doc.get(self._id)
if val is not None:
return upsert(self._tbl, doc, getattr(self._Q, self._id) == val)
return insert(self._tbl, doc)
def save_many(self, docs: list[dict]) -> None:
"""Upsert each document."""
for doc in docs:
self.save(doc)
def find_one(self, field: str, value: Any) -> dict | None:
"""Find first document where field == value."""
return get_by(self._tbl, getattr(self._Q, field) == value)
def find_all(self, **kwargs) -> list[dict]:
"""Find all documents matching keyword field==value pairs."""
return find_where(self._tbl, **kwargs)
def all(self) -> list[dict]:
return self._tbl.all()
def delete(self, **kwargs) -> int:
"""Delete documents matching kwargs. Returns count deleted."""
before = len(self._tbl)
Q = Query()
conditions = [getattr(Q, k) == v for k, v in kwargs.items()]
if conditions:
combined = conditions[0]
for c in conditions[1:]:
combined = combined & c
self._tbl.remove(combined)
return before - len(self._tbl)
def count(self, **kwargs) -> int:
return len(find_where(self._tbl, **kwargs))
def update_field(self, id_value: Any, **updates) -> bool:
"""Update fields on the document with id_field == id_value."""
ids = update(self._tbl, updates, getattr(self._Q, self._id) == id_value)
return bool(ids)
def page(self, page: int = 1, per_page: int = 20) -> dict[str, Any]:
return paginate(self._tbl, page=page, per_page=per_page)
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import tempfile, os
print("=== In-memory DB ===")
db = in_memory_db()
users = db.table("users")
insert_many(users, [
{"name": "Alice", "age": 30, "role": "admin", "active": True},
{"name": "Bob", "age": 25, "role": "user", "active": True},
{"name": "Carol", "age": 28, "role": "moderator", "active": True},
{"name": "Dave", "age": 22, "role": "user", "active": False},
{"name": "Eve", "age": 35, "role": "admin", "active": True},
])
print(f"Total users: {count(users)}")
Q = Query()
print("\n=== Queries ===")
admins = find(users, Q.role == "admin")
print("Admins:", [u["name"] for u in admins])
active_adults = find(users, (Q.active == True) & (Q.age > 24))
print("Active adults:", [u["name"] for u in active_adults])
print("\n=== find_where ===")
active_users = find_where(users, role="user", active=True)
print("Active non-admin users:", [u["name"] for u in active_users])
print("\n=== find_sorted ===")
by_age = find_sorted(users, key="age")
print("By age:", [(u["name"], u["age"]) for u in by_age])
print("\n=== upsert ===")
upsert(users, {"name": "Frank", "age": 40, "role": "user", "active": True}, Q.name == "Frank")
print("After upsert Frank:", count(users, Q.name == "Frank"))
upsert(users, {"name": "Frank", "age": 41, "role": "admin", "active": True}, Q.name == "Frank")
frank = get_by(users, Q.name == "Frank")
print("Frank updated:", frank)
print("\n=== update ===")
update(users, {"active": False}, Q.role == "user")
inactive = count(users, Q.active == False)
print("Inactive after update:", inactive)
print("\n=== paginate ===")
page = paginate(users, page=1, per_page=3, sort_key="name")
print(f"Page 1 of {page['pages']}: {[u['name'] for u in page['items']]}")
print("\n=== Repository ===")
db2 = in_memory_db()
products = Repository(db2, "products", id_field="sku")
products.save({"sku": "WDG-001", "name": "Widget", "price": 9.99, "stock": 100})
products.save({"sku": "GDG-002", "name": "Gadget", "price": 19.99, "stock": 50})
products.save({"sku": "WDG-001", "name": "Widget Pro", "price": 14.99, "stock": 80}) # update
print("Products:", [(p["sku"], p["name"], p["price"]) for p in products.all()])
products.update_field("GDG-002", stock=45)
print("Gadget stock:", products.find_one("sku", "GDG-002")["stock"])
db.close()
db2.close()
For the sqlite3 alternative — sqlite3 (stdlib) is a full relational database with SQL queries, transactions, and indexes; tinydb is simpler for small datasets where you want JSON files and Python dict queries without writing SQL — ideal for configs, test fixtures, CLI tool state, and small apps with <100k records. For the shelve alternative — shelve persists Python objects keyed by string using pickle; tinydb uses plain JSON (human-readable, portable, git-friendly), supports structured queries with Query(), and doesn’t have pickle’s security/portability issues. The Claude Skills 360 bundle includes tinydb skill sets covering open_db()/in_memory_db() factory, insert/insert_many/get_by_id/get_by/find/find_all/count/exists CRUD, update/update_fn/upsert/remove/truncate mutations, find_where()/find_sorted()/paginate() query helpers, CachingMiddleware/MemoryStorage/JSONStorage, and Repository generic typed wrapper with save/find_one/find_all/delete/update_field/page. Start with the free tier to try embedded JSON database code generation.