DiskCache stores Python objects in SQLite on disk. pip install diskcache. Cache: from diskcache import Cache; cache = Cache("/tmp/mycache"). cache.set("key", value). cache.get("key"). cache["key"] = value. cache["key"]. cache.get("key", default=None). TTL: cache.set("token", "abc", expire=300) — expires in 300s. cache.set("k", v, expire=None) — no expiry. Tags: cache.set("k", v, tag="user"). cache.evict("user") — delete all tagged items. Delete: del cache["key"]. cache.pop("key", default). cache.delete("key"). Check: "key" in cache. Size: len(cache). Stats: cache.stats() → hits, misses. cache.reset(STATISTICS) to enable. Memoize: @cache.memoize(expire=60). def fn(x): return x*2. FanoutCache: from diskcache import FanoutCache; cache = FanoutCache("/tmp/fc", shards=8) — better concurrent write throughput. Deque: from diskcache import Deque; q = Deque(directory="/tmp/q"). q.append(item). q.popleft(). Index: from diskcache import Index; idx = Index("/tmp/idx") — sorted MutableMapping. Transact: with cache.transact(): cache.set("a", 1); cache.set("b", 2) — atomic. Size limit: Cache("/tmp/c", size_limit=2**30) — 1 GB. Disk: cache.volume() — bytes used. Close: cache.close(). Context: with Cache("/tmp/c") as cache: .... Process-safe: multiple processes can share the same Cache directory. Claude Code generates DiskCache patterns, memoize decorators, and FanoutCache configurations.
CLAUDE.md for DiskCache
## DiskCache Stack
- Version: diskcache >= 5.6 | pip install diskcache
- Cache: Cache("/path/to/dir") — SQLite-backed, process-safe
- Set/Get: cache.set(k, v, expire=N) | cache.get(k, default) | cache[k]
- Memoize: @cache.memoize(expire=300) — auto-key from function and args
- FanoutCache: FanoutCache(path, shards=8) — high-concurrency writes
- Tags: cache.set(k, v, tag="group") | cache.evict("group") for batch delete
- Stats: cache.stats() — enable STATISTICS mode for hit/miss counts
DiskCache Persistent Cache Pipeline
# app/disk_cache.py — DiskCache patterns for persistent function caching
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
from diskcache import Cache, Deque, FanoutCache, Index
# ─────────────────────────────────────────────────────────────────────────────
# Cache directory — use /tmp for ephemeral, app data dir for persistent
# ─────────────────────────────────────────────────────────────────────────────
CACHE_DIR = Path(os.environ.get("CACHE_DIR", "/tmp/myapp_cache"))
FANOUT_DIR = Path(os.environ.get("FANOUT_DIR", "/tmp/myapp_fanout"))
QUEUE_DIR = Path(os.environ.get("QUEUE_DIR", "/tmp/myapp_queue"))
CACHE_DIR.mkdir(parents=True, exist_ok=True)
FANOUT_DIR.mkdir(parents=True, exist_ok=True)
QUEUE_DIR.mkdir(parents=True, exist_ok=True)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic get/set with TTL and tags
# ─────────────────────────────────────────────────────────────────────────────
def demo_basic(cache: Cache) -> None:
# Simple set/get
cache.set("config:db_host", "db.example.com")
assert cache.get("config:db_host") == "db.example.com"
# TTL expiry — item disappears after 0.1s
cache.set("temp_token", "xyz", expire=0.1)
assert "temp_token" in cache
time.sleep(0.15)
assert "temp_token" not in cache
# Default on miss
val = cache.get("missing_key", default="fallback")
assert val == "fallback"
# Dict-style
cache["version"] = "1.2.3"
assert cache["version"] == "1.2.3"
del cache["version"]
assert "version" not in cache
# Atomic set-if-not-exists (add returns None on success, existing value on collision)
cache.add("lock_key", "holder", expire=5)
cache.add("lock_key", "other") # no-op: key already exists
print("Basic operations: OK")
# ─────────────────────────────────────────────────────────────────────────────
# 2. Tag-based group eviction
# ─────────────────────────────────────────────────────────────────────────────
def demo_tags(cache: Cache) -> None:
"""
Tags group related items so they can be invalidated together.
Perfect for invalidating all cache entries for a specific user or tenant.
"""
user_id = 42
cache.set(f"user:{user_id}:profile", {"name": "Alice"}, tag=f"user:{user_id}")
cache.set(f"user:{user_id}:settings", {"theme": "dark"}, tag=f"user:{user_id}")
cache.set(f"user:{user_id}:orders", [1, 2, 3], tag=f"user:{user_id}")
cache.set("global:config", {"env": "prod"}) # no tag — not evicted
assert f"user:{user_id}:profile" in cache
# Evict all items tagged with this user — atomic, one call
count = cache.evict(f"user:{user_id}")
print(f"Evicted {count} items for user {user_id}")
assert f"user:{user_id}:profile" not in cache
assert f"user:{user_id}:settings" not in cache
assert "global:config" in cache # untouched
# ─────────────────────────────────────────────────────────────────────────────
# 3. @cache.memoize — persistent function result cache
# ─────────────────────────────────────────────────────────────────────────────
def build_memoized_client(cache: Cache):
"""
@cache.memoize wraps any function so results are persisted to disk.
Survives process restarts — useful for expensive one-time computations.
"""
@cache.memoize(expire=600, tag="exchange_rate")
def get_exchange_rate(from_currency: str, to_currency: str) -> float:
"""Simulates an expensive external API call."""
time.sleep(0.05)
rates = {"USD_EUR": 0.92, "EUR_USD": 1.09, "USD_GBP": 0.79}
return rates.get(f"{from_currency}_{to_currency}", 1.0)
@cache.memoize(expire=3600, tag="geo")
def geocode(address: str) -> dict:
"""Geocoding result cached for 1 hour — expensive API call."""
time.sleep(0.1)
return {"lat": 51.5074, "lng": -0.1278, "address": address}
return get_exchange_rate, geocode
# ─────────────────────────────────────────────────────────────────────────────
# 4. FanoutCache — high-concurrency writes
# ─────────────────────────────────────────────────────────────────────────────
def demo_fanout() -> None:
"""
FanoutCache shards the storage across N SQLite files.
Use when multiple processes write concurrently — reduces lock contention.
API is identical to Cache.
"""
with FanoutCache(FANOUT_DIR, shards=4) as fanout:
for i in range(20):
fanout.set(f"item:{i}", {"id": i, "value": i * 2}, expire=60)
hits = sum(1 for i in range(20) if fanout.get(f"item:{i}") is not None)
print(f"FanoutCache: {hits}/20 items cached, {fanout.volume()} bytes on disk")
# ─────────────────────────────────────────────────────────────────────────────
# 5. Deque — persistent FIFO queue
# ─────────────────────────────────────────────────────────────────────────────
def demo_deque() -> None:
"""
DiskCache Deque is a persistent double-ended queue — survives restarts.
Process-safe: multiple workers can push/pop from the same directory.
"""
with Deque(directory=QUEUE_DIR) as queue:
# Producer
for i in range(5):
queue.append({"job_id": i, "payload": f"task_{i}"})
print(f"Queue size: {len(queue)}")
# Consumer
while queue:
job = queue.popleft()
print(f" Processing job {job['job_id']}")
# ─────────────────────────────────────────────────────────────────────────────
# 6. Index — persistent sorted mapping
# ─────────────────────────────────────────────────────────────────────────────
def demo_index() -> None:
"""
Index is a MutableMapping backed by SQLite — sorted by key.
Useful for leaderboards, ordered job queues, or sorted config.
"""
idx_dir = Path("/tmp/myapp_index")
idx_dir.mkdir(exist_ok=True)
with Index(directory=str(idx_dir)) as idx:
idx["user:001"] = {"score": 1500, "name": "Alice"}
idx["user:002"] = {"score": 2300, "name": "Bob"}
idx["user:003"] = {"score": 800, "name": "Carol"}
print(f"Index keys (sorted): {list(idx.keys())}")
print(f"Bob: {idx['user:002']}")
# ─────────────────────────────────────────────────────────────────────────────
# 7. Atomic transact — batch updates
# ─────────────────────────────────────────────────────────────────────────────
def demo_transact(cache: Cache) -> None:
"""
transact() wraps multiple operations in a SQLite transaction.
All operations succeed or all are rolled back.
"""
with cache.transact():
cache.set("balance:alice", 1000)
cache.set("balance:bob", 500)
# Simulate a transfer
with cache.transact():
alice = cache.get("balance:alice", default=0)
bob = cache.get("balance:bob", default=0)
amount = 100
cache.set("balance:alice", alice - amount)
cache.set("balance:bob", bob + amount)
print(f"After transfer: alice={cache['balance:alice']} bob={cache['balance:bob']}")
# ─────────────────────────────────────────────────────────────────────────────
# 8. Cache statistics and size management
# ─────────────────────────────────────────────────────────────────────────────
def demo_stats(cache: Cache) -> None:
"""Enable statistics to track hit rate for capacity planning."""
from diskcache import EVICTION_POLICY, STATISTICS
cache.reset(STATISTICS, 1) # enable stats collection
for i in range(10):
cache.set(f"stat_k:{i}", i)
for i in range(10):
_ = cache.get(f"stat_k:{i}") # 10 hits
for i in range(10, 15):
_ = cache.get(f"stat_k:{i}") # 5 misses
hits, misses = cache.stats(reset=True)
hit_rate = hits / (hits + misses) if (hits + misses) else 0
print(f"Stats: hits={hits} misses={misses} hit_rate={hit_rate:.0%}")
print(f"Disk volume: {cache.volume()} bytes")
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
with Cache(CACHE_DIR, size_limit=2**28) as cache: # 256 MB limit
print("=== Basic ===")
demo_basic(cache)
print("\n=== Tags ===")
demo_tags(cache)
print("\n=== Memoize ===")
get_rate, geocode = build_memoized_client(cache)
r1 = get_rate("USD", "EUR")
r2 = get_rate("USD", "EUR") # from disk cache
print(f" USD→EUR: {r1} (both={r1 == r2})")
geo = geocode("10 Downing Street, London")
print(f" Geocoded: lat={geo['lat']}")
print("\n=== Transact ===")
demo_transact(cache)
print("\n=== Stats ===")
demo_stats(cache)
print("\n=== FanoutCache ===")
demo_fanout()
print("\n=== Deque ===")
demo_deque()
print("\n=== Index ===")
demo_index()
For the redis caching alternative — Redis requires a running server process, network sockets, and serialization/deserialization of every value, while DiskCache uses SQLite on the local filesystem so values are stored as pickled Python objects, the cache survives process restarts without any external service, @cache.memoize() persists results across application deployments, and FanoutCache handles concurrent multi-process writes without a Redis cluster. For the shelve / pickle files alternative — Python’s shelve stores pickled objects but has no TTL, no eviction policy, no concurrency safety (single writer lock on the file), no size limit, and no tag-based invalidation, while DiskCache’s TTLCache.set(key, val, expire=N) auto-expires stale items, cache.evict("tag") deletes groups atomically, size_limit enforces a disk quota with LRU eviction, and the cache is process-safe for concurrent readers and writers. The Claude Skills 360 bundle includes DiskCache skill sets covering Cache with TTL expiry and tag-based eviction, @cache.memoize for persistent function caching, FanoutCache sharded for concurrent multiprocess writes, Deque persistent job queue operations, Index sorted mapping use cases, transact atomic batch operations, cache.stats hit/miss rate monitoring, size_limit disk quota with LRU eviction, memoize invalidation by tag, and Flask session or API response cache integration. Start with the free tier to try persistent disk caching code generation.