asyncpg is the fastest PostgreSQL driver for Python asyncio. pip install asyncpg. Connect: conn = await asyncpg.connect("postgresql://user:pass@host/db"). Pool: pool = await asyncpg.create_pool(dsn, min_size=5, max_size=20). Execute: await conn.execute("INSERT INTO users(name) VALUES($1)", "Alice"). Fetch all: rows = await conn.fetch("SELECT * FROM users WHERE active=$1", True). One row: row = await conn.fetchrow("SELECT * FROM users WHERE id=$1", 1). Scalar: count = await conn.fetchval("SELECT count(*) FROM users"). Row access: row["id"], row[0]. Executemany: await conn.executemany("INSERT INTO t(a,b) VALUES($1,$2)", [(1,"a"),(2,"b")]). Prepared: stmt = await conn.prepare("SELECT * FROM users WHERE id=$1"), row = await stmt.fetchrow(42). Transaction: async with conn.transaction(): await conn.execute(...). Isolation: async with conn.transaction(isolation="serializable"):. Savepoint: async with conn.transaction() as txn: await txn.savepoint("save1"). Pool acquire: async with pool.acquire() as conn: .... Codec: await conn.set_type_codec("json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog"). COPY from query: await conn.copy_from_query("SELECT * FROM t", output="data.csv", format="csv"). COPY to table: await conn.copy_to_table("t", source="data.csv", format="csv"). COPY records: await conn.copy_records_to_table("t", records=[(1,"a"),(2,"b")], columns=["id","name"]). LISTEN: await conn.add_listener("channel", callback). NOTIFY: await conn.execute("SELECT pg_notify('channel', 'msg')"). asyncpg.Record behaves like a dict and tuple. Exceptions: asyncpg.UniqueViolationError, asyncpg.ForeignKeyViolationError, asyncpg.CheckViolationError. Claude Code generates asyncpg connection pools, CRUD patterns, COPY bulk loaders, and LISTEN/NOTIFY subscription services.
CLAUDE.md for asyncpg
## asyncpg Stack
- Version: asyncpg >= 0.29 | pip install asyncpg
- Pool: asyncpg.create_pool(dsn, min_size=5, max_size=20, max_inactive_connection_lifetime=300)
- Fetch: conn.fetch("SELECT...$1", val) → list[Record] | fetchrow → Record | fetchval → scalar
- Write: conn.execute("INSERT...",$1,$2) | executemany(sql, [(row),...])
- Prepared: await conn.prepare(sql) → stmt; stmt.fetchrow(args)
- Tx: async with conn.transaction(isolation="read_committed"): ...
- COPY: copy_records_to_table(table, records=[(...),...], columns=[...])
asyncpg PostgreSQL Pipeline
# app/db.py — asyncpg connection pool and query utilities
from __future__ import annotations
import json
import logging
import os
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any, AsyncIterator
import asyncpg
from asyncpg import Pool, Record
logger = logging.getLogger(__name__)
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql://user:password@localhost:5432/mydb",
)
# ─────────────────────────────────────────────────────────────────────────────
# Pool setup
# ─────────────────────────────────────────────────────────────────────────────
_pool: Pool | None = None
async def create_db_pool(dsn: str = DATABASE_URL) -> Pool:
"""
Create and configure a connection pool with custom type codecs.
Call once at application startup.
"""
pool = await asyncpg.create_pool(
dsn=dsn,
min_size=5,
max_size=20,
max_inactive_connection_lifetime=300, # recycle idle connections after 5 min
command_timeout=60,
# Run after each new connection is established
init=_setup_connection,
)
global _pool
_pool = pool
logger.info("asyncpg pool created min=%d max=%d", 5, 20)
return pool
async def _setup_connection(conn: asyncpg.Connection) -> None:
"""
Called on each new connection — register custom type codecs.
JSON codec: encode/decode dicts transparently.
UUID codec: return Python uuid.UUID objects.
"""
await conn.set_type_codec(
"json",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
await conn.set_type_codec(
"uuid",
encoder=str,
decoder=uuid.UUID,
schema="pg_catalog",
format="text",
)
async def close_db_pool() -> None:
global _pool
if _pool:
await _pool.close()
_pool = None
logger.info("asyncpg pool closed")
def get_pool() -> Pool:
if _pool is None:
raise RuntimeError("Database pool not initialised")
return _pool
# ─────────────────────────────────────────────────────────────────────────────
# Repository: User CRUD
# ─────────────────────────────────────────────────────────────────────────────
class UserRepository:
"""All DB access for the users table."""
def __init__(self, pool: Pool) -> None:
self._pool = pool
async def create(self, email: str, name: str) -> Record:
async with self._pool.acquire() as conn:
return await conn.fetchrow(
"""
INSERT INTO users (email, name, created_at)
VALUES ($1, $2, NOW())
RETURNING id, email, name, is_active, created_at
""",
email, name,
)
async def get_by_id(self, user_id: int) -> Record | None:
async with self._pool.acquire() as conn:
return await conn.fetchrow(
"SELECT id, email, name, is_active, created_at FROM users WHERE id = $1",
user_id,
)
async def get_by_email(self, email: str) -> Record | None:
async with self._pool.acquire() as conn:
return await conn.fetchrow(
"SELECT id, email, name, is_active FROM users WHERE email = $1",
email,
)
async def list_active(self, page: int = 1, page_size: int = 20) -> list[Record]:
offset = (page - 1) * page_size
async with self._pool.acquire() as conn:
return await conn.fetch(
"""
SELECT id, email, name, created_at
FROM users
WHERE is_active = TRUE
ORDER BY name
LIMIT $1 OFFSET $2
""",
page_size, offset,
)
async def count(self, active_only: bool = True) -> int:
sql = "SELECT count(*) FROM users"
if active_only:
sql += " WHERE is_active = TRUE"
async with self._pool.acquire() as conn:
return await conn.fetchval(sql)
async def update_name(self, user_id: int, name: str) -> Record | None:
async with self._pool.acquire() as conn:
return await conn.fetchrow(
"UPDATE users SET name=$1, updated_at=NOW() WHERE id=$2 RETURNING *",
name, user_id,
)
async def deactivate(self, user_id: int) -> bool:
async with self._pool.acquire() as conn:
result = await conn.execute(
"UPDATE users SET is_active=FALSE WHERE id=$1 AND is_active=TRUE",
user_id,
)
# result is "UPDATE N" — N=1 means row was updated
return result == "UPDATE 1"
async def bulk_insert(self, users: list[dict]) -> int:
"""Insert many users with copy_records_to_table — fastest bulk insert."""
records = [(u["email"], u["name"]) for u in users]
async with self._pool.acquire() as conn:
await conn.copy_records_to_table(
"users",
records=records,
columns=["email", "name"],
)
return len(records)
# ─────────────────────────────────────────────────────────────────────────────
# Transactions
# ─────────────────────────────────────────────────────────────────────────────
class OrderRepository:
def __init__(self, pool: Pool) -> None:
self._pool = pool
async def create_with_lines(
self,
user_id: int,
lines: list[dict], # [{"product_id": int, "quantity": int, "price": float}]
) -> Record:
"""
Create order + order lines atomically.
Stock is decremented; if any product is out of stock, the whole TX rolls back.
"""
async with self._pool.acquire() as conn:
async with conn.transaction():
total = sum(line["quantity"] * line["price"] for line in lines)
order = await conn.fetchrow(
"""
INSERT INTO orders (user_id, total, status, created_at)
VALUES ($1, $2, 'pending', NOW())
RETURNING id, user_id, total, status, created_at
""",
user_id, total,
)
for line in lines:
# Decrement stock atomically — raises CheckViolationError if stock < 0
await conn.execute(
"""
UPDATE products
SET stock = stock - $1
WHERE id = $2
""",
line["quantity"], line["product_id"],
)
await conn.execute(
"""
INSERT INTO order_lines (order_id, product_id, quantity, unit_price)
VALUES ($1, $2, $3, $4)
""",
order["id"], line["product_id"], line["quantity"], line["price"],
)
return order
async def get_with_lines(self, order_id: int) -> dict:
"""Fetch order and its lines in two queries (avoids column name collisions)."""
async with self._pool.acquire() as conn:
order = await conn.fetchrow(
"SELECT * FROM orders WHERE id = $1", order_id
)
if order is None:
raise ValueError(f"Order {order_id} not found")
lines = await conn.fetch(
"""
SELECT ol.*, p.name product_name, p.sku
FROM order_lines ol
JOIN products p ON p.id = ol.product_id
WHERE ol.order_id = $1
""",
order_id,
)
return {"order": dict(order), "lines": [dict(l) for l in lines]}
# ─────────────────────────────────────────────────────────────────────────────
# COPY — bulk data transfer
# ─────────────────────────────────────────────────────────────────────────────
async def export_users_csv(pool: Pool, output_path: str) -> int:
"""Export users table to CSV using the PostgreSQL COPY protocol."""
async with pool.acquire() as conn:
await conn.copy_from_query(
"SELECT id, email, name, created_at FROM users WHERE is_active = TRUE",
output=output_path,
format="csv",
header=True,
)
logger.info("Exported users to %s", output_path)
return 0 # asyncpg doesn't return row count for copy_from_query
async def import_users_csv(pool: Pool, source_path: str) -> None:
"""Import users from CSV into a staging table."""
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("""
CREATE TEMP TABLE users_staging (
email TEXT, name TEXT
) ON COMMIT DROP
""")
await conn.copy_to_table(
"users_staging",
source=source_path,
format="csv",
header=True,
columns=["email", "name"],
)
inserted = await conn.fetchval("""
INSERT INTO users (email, name, created_at)
SELECT email, name, NOW()
FROM users_staging
ON CONFLICT (email) DO NOTHING
RETURNING count(*)
""")
logger.info("Imported %s new users", inserted)
# ─────────────────────────────────────────────────────────────────────────────
# LISTEN / NOTIFY — real-time events
# ─────────────────────────────────────────────────────────────────────────────
async def start_event_listener(pool: Pool, channel: str, handler) -> asyncpg.Connection:
"""
Subscribe to a PostgreSQL NOTIFY channel.
The connection must be kept alive — do NOT return it to the pool.
"""
conn = await pool.acquire()
await conn.add_listener(channel, handler)
logger.info("Listening on channel=%s", channel)
return conn # caller is responsible for closing
async def demo_listener():
pool = await create_db_pool()
def order_notification(connection, pid, channel, payload):
logger.info("Event channel=%s payload=%s", channel, payload)
conn = await start_event_listener(pool, "order_events", order_notification)
# Send a test notification
async with pool.acquire() as send_conn:
await send_conn.execute(
"SELECT pg_notify('order_events', $1)",
json.dumps({"type": "order_placed", "order_id": 42}),
)
import asyncio
await asyncio.sleep(1) # receive notification
await conn.remove_listener("order_events", order_notification)
await pool.release(conn)
await close_db_pool()
# ─────────────────────────────────────────────────────────────────────────────
# FastAPI integration
# ─────────────────────────────────────────────────────────────────────────────
FASTAPI_EXAMPLE = """
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from asyncpg import Pool
from app.db import create_db_pool, close_db_pool, get_pool, UserRepository
@asynccontextmanager
async def lifespan(app: FastAPI):
await create_db_pool()
yield
await close_db_pool()
app = FastAPI(lifespan=lifespan)
async def get_user_repo() -> UserRepository:
return UserRepository(get_pool())
@app.get("/users/{user_id}")
async def get_user(user_id: int, repo: UserRepository = Depends(get_user_repo)):
row = await repo.get_by_id(user_id)
if row is None:
raise HTTPException(404, "User not found")
return dict(row)
"""
For the psycopg2 (sync) alternative — psycopg2 uses blocking I/O: cursor.execute() halts the entire thread while waiting for PostgreSQL, while asyncpg’s conn.fetch() yields the event loop during the network round-trip so a FastAPI server with a 20-connection pool handles hundreds of concurrent requests on 20 connections — the same queries run 2–3× faster on asyncpg because it uses the binary PostgreSQL wire protocol instead of text serialization. For the databases library alternative — databases wraps asyncpg/aiosqlite/aiomysql in a common interface but loses asyncpg-specific features: copy_records_to_table is 10–100× faster than executemany for bulk inserts, add_listener is the only way to receive push notifications from PostgreSQL without polling, and conn.prepare() caches the query plan server-side for high-frequency parameterized queries. The Claude Skills 360 bundle includes asyncpg skill sets covering create_pool configuration, fetch/fetchrow/fetchval/execute patterns, executemany for batch writes, prepared statements, transaction context managers with isolation levels, copy_records_to_table for bulk insert, copy_from_query for CSV export, LISTEN/NOTIFY subscription, custom type codecs for JSON and UUID, and FastAPI lifespan pool management. Start with the free tier to try async PostgreSQL code generation.