psycopg3 (psycopg) is the modern PostgreSQL adapter for Python with native async support. pip install psycopg[binary]. Sync: import psycopg; conn = psycopg.connect("postgresql://user:pw@host/db"). Query: conn.execute("SELECT * FROM users WHERE id = %s", (user_id,)). Fetch: conn.execute(...).fetchone() fetchmany(10) fetchall(). for row in conn.execute("SELECT * FROM t"):. Async: async with await psycopg.AsyncConnection.connect(dsn) as conn: await conn.execute("SELECT 1"). Row factory: conn.row_factory = psycopg.rows.dict_row → dicts. namedtuple_row → namedtuples. class_row(MyClass). Transaction: with conn.transaction(): conn.execute(...). conn.autocommit = True. Copy: with conn.copy("COPY t FROM STDIN") as copy: copy.write_row((1,"a")). Pipeline: with conn.pipeline(): conn.execute(...) — batches round trips. Server cursor: conn.cursor("name").execute("SELECT ...") — server-side, for large results. Pool: from psycopg_pool import ConnectionPool; pool = ConnectionPool(dsn). with pool.connection() as conn:. Prepare: conn.execute("SELECT %s::int", prepare=True). psycopg.adapt.Loader and .Dumper for custom types. psycopg.types.TypeInfo. Claude Code generates psycopg3 database layers, async FastAPI handlers, and bulk COPY pipelines.
CLAUDE.md for psycopg3
## psycopg3 Stack
- Version: psycopg >= 3.1 | pip install "psycopg[binary]" psycopg_pool
- Sync: psycopg.connect(dsn) | conn.execute(sql, params) | cursor.fetchall()
- Async: await psycopg.AsyncConnection.connect(dsn) | await conn.execute()
- Rows: conn.row_factory = dict_row | namedtuple_row | class_row(Model)
- Transaction: with conn.transaction(): ... | conn.autocommit=True
- Pool: ConnectionPool(dsn) / AsyncConnectionPool(dsn) | with pool.connection()
- COPY: with conn.copy("COPY t FROM STDIN") as copy: copy.write_row(row)
psycopg3 PostgreSQL Access Pipeline
# app/db.py — psycopg3 sync/async connection, CRUD, transactions, and bulk COPY
from __future__ import annotations
import contextlib
import logging
from collections.abc import AsyncIterator, Iterator
from typing import Any
import psycopg
from psycopg.rows import dict_row, namedtuple_row
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Synchronous helpers
# ─────────────────────────────────────────────────────────────────────────────
def connect(dsn: str, autocommit: bool = False, **kwargs) -> psycopg.Connection:
"""
Open a synchronous psycopg3 connection.
autocommit=True: each statement is immediately committed (no explicit BEGIN).
row_factory defaults to dict_row so rows come back as {column: value} dicts.
"""
conn = psycopg.connect(dsn, row_factory=dict_row, **kwargs)
if autocommit:
conn.autocommit = True
return conn
@contextlib.contextmanager
def get_conn(dsn: str, autocommit: bool = False) -> Iterator[psycopg.Connection]:
"""Context manager: open a connection and close on exit."""
conn = connect(dsn, autocommit=autocommit)
try:
yield conn
finally:
conn.close()
def execute(
conn: psycopg.Connection,
sql: str,
params: tuple | list | dict | None = None,
) -> list[dict[str, Any]]:
"""
Execute a SELECT query and return all rows as dicts.
Uses %s positional placeholders (psycopg3 style).
"""
cur = conn.execute(sql, params)
return cur.fetchall()
def execute_one(
conn: psycopg.Connection,
sql: str,
params: tuple | list | dict | None = None,
) -> dict[str, Any] | None:
"""Execute a query and return the first row, or None."""
cur = conn.execute(sql, params)
return cur.fetchone()
def execute_scalar(
conn: psycopg.Connection,
sql: str,
params: tuple | list | dict | None = None,
) -> Any:
"""Return the first column of the first row (scalar)."""
row = execute_one(conn, sql, params)
if row is None:
return None
return next(iter(row.values()))
def execute_many(
conn: psycopg.Connection,
sql: str,
params_list: list[tuple | dict],
) -> int:
"""
Execute a DML statement (INSERT/UPDATE/DELETE) for multiple parameter sets.
Returns total rows affected.
"""
with conn.cursor() as cur:
cur.executemany(sql, params_list)
return cur.rowcount
# ─────────────────────────────────────────────────────────────────────────────
# 2. CRUD helpers
# ─────────────────────────────────────────────────────────────────────────────
def insert_returning(
conn: psycopg.Connection,
table: str,
data: dict[str, Any],
) -> dict[str, Any]:
"""
INSERT a row and return the full inserted record (RETURNING *).
table: bare table name (trusted — don't accept from user input).
data: column → value dict.
"""
cols = list(data.keys())
vals = list(data.values())
ph = ", ".join(["%s"] * len(cols))
col_names = ", ".join(cols)
sql = f"INSERT INTO {table} ({col_names}) VALUES ({ph}) RETURNING *"
return conn.execute(sql, vals).fetchone()
def update_returning(
conn: psycopg.Connection,
table: str,
data: dict[str, Any],
where: str,
where_params: tuple | list,
) -> list[dict[str, Any]]:
"""
UPDATE rows matching `where` and return updated records.
data: columns to set.
where: SQL condition fragment e.g. "id = %s".
"""
set_clause = ", ".join(f"{col} = %s" for col in data.keys())
sql = f"UPDATE {table} SET {set_clause} WHERE {where} RETURNING *"
return conn.execute(sql, list(data.values()) + list(where_params)).fetchall()
def delete_returning(
conn: psycopg.Connection,
table: str,
where: str,
where_params: tuple | list,
) -> list[dict[str, Any]]:
"""DELETE rows and return deleted records."""
sql = f"DELETE FROM {table} WHERE {where} RETURNING *"
return conn.execute(sql, list(where_params)).fetchall()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Bulk COPY
# ─────────────────────────────────────────────────────────────────────────────
def copy_from_rows(
conn: psycopg.Connection,
table: str,
columns: list[str],
rows: list[tuple],
) -> int:
"""
Bulk-load rows into `table` using PostgreSQL COPY.
Significantly faster than executemany for large datasets.
Returns number of rows copied.
"""
col_list = ", ".join(columns)
count = 0
with conn.copy(f"COPY {table} ({col_list}) FROM STDIN") as copy:
for row in rows:
copy.write_row(row)
count += 1
return count
def copy_to_csv(
conn: psycopg.Connection,
query: str,
output_path: str,
params: tuple | None = None,
) -> int:
"""
Export query results to a CSV file using COPY TO.
Returns number of rows exported.
"""
count = 0
with open(output_path, "w", encoding="utf-8") as fh:
sql = f"COPY ({query}) TO STDOUT WITH CSV HEADER"
with conn.copy(sql) as copy:
for chunk in copy:
fh.write(chunk.decode() if isinstance(chunk, bytes) else chunk)
count += chunk.count(b"\n") if isinstance(chunk, bytes) else chunk.count("\n")
return count
# ─────────────────────────────────────────────────────────────────────────────
# 4. Async helpers
# ─────────────────────────────────────────────────────────────────────────────
async def async_connect(dsn: str, autocommit: bool = False) -> psycopg.AsyncConnection:
"""Open an async psycopg3 connection with dict rows."""
conn = await psycopg.AsyncConnection.connect(dsn, row_factory=dict_row)
if autocommit:
await conn.set_autocommit(True)
return conn
@contextlib.asynccontextmanager
async def get_async_conn(dsn: str, autocommit: bool = False) -> AsyncIterator[psycopg.AsyncConnection]:
"""Async context manager: open and close a psycopg3 async connection."""
conn = await async_connect(dsn, autocommit=autocommit)
try:
yield conn
finally:
await conn.close()
async def async_execute(
conn: psycopg.AsyncConnection,
sql: str,
params: tuple | list | dict | None = None,
) -> list[dict[str, Any]]:
"""Execute SELECT and return all rows as dicts."""
cur = await conn.execute(sql, params)
return await cur.fetchall()
async def async_execute_one(
conn: psycopg.AsyncConnection,
sql: str,
params: tuple | list | dict | None = None,
) -> dict[str, Any] | None:
"""Execute and return first row, or None."""
cur = await conn.execute(sql, params)
return await cur.fetchone()
# ─────────────────────────────────────────────────────────────────────────────
# 5. Connection pool helpers
# ─────────────────────────────────────────────────────────────────────────────
def make_sync_pool(dsn: str, min_size: int = 2, max_size: int = 10):
"""
Create a synchronous connection pool.
pip install psycopg_pool
"""
from psycopg_pool import ConnectionPool
return ConnectionPool(
dsn,
min_size=min_size,
max_size=max_size,
kwargs={"row_factory": dict_row},
)
async def make_async_pool(dsn: str, min_size: int = 2, max_size: int = 10):
"""Create an async connection pool."""
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool(
dsn,
min_size=min_size,
max_size=max_size,
kwargs={"row_factory": dict_row},
)
await pool.open()
return pool
# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI dependency example
# ─────────────────────────────────────────────────────────────────────────────
def make_fastapi_db_dependency(pool):
"""
Return a FastAPI dependency that yields a pooled async connection.
Usage:
pool = await make_async_pool(DSN)
get_db = make_fastapi_db_dependency(pool)
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
return await async_execute_one(conn, "SELECT * FROM users WHERE id=%s", (user_id,))
"""
@contextlib.asynccontextmanager
async def _get_db():
async with pool.connection() as conn:
yield conn
return _get_db
# ─────────────────────────────────────────────────────────────────────────────
# Demo (requires a running PostgreSQL or will show the API)
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import asyncio
DSN = "postgresql://postgres:postgres@localhost/testdb"
print("=== psycopg3 API demo (requires PostgreSQL) ===")
print(" Sync connect:")
print(" conn = psycopg.connect(DSN, row_factory=dict_row)")
print(" rows = conn.execute('SELECT * FROM t WHERE x=%s', (1,)).fetchall()")
print(" conn.execute('INSERT INTO t (x) VALUES (%s)', (42,))")
print(" conn.commit()")
print("\n Transaction:")
print(" with conn.transaction():")
print(" conn.execute(sql1, params1)")
print(" conn.execute(sql2, params2)")
print("\n COPY bulk load:")
print(" with conn.copy('COPY t (a,b) FROM STDIN') as copy:")
print(" for row in data:")
print(" copy.write_row(row)")
print("\n Async:")
print(" conn = await psycopg.AsyncConnection.connect(DSN, row_factory=dict_row)")
print(" row = await (await conn.execute('SELECT * FROM t')).fetchone()")
print("\n Pipeline mode (batch multiple queries, fewer round trips):")
print(" with conn.pipeline():")
print(" conn.execute('INSERT ...')")
print(" conn.execute('UPDATE ...')")
print(" conn.execute('DELETE ...')")
print(" # all 3 sent in a single network round trip")
print("\n Row factories:")
print(" conn.row_factory = dict_row # {col: val}")
print(" conn.row_factory = namedtuple_row # Row(col=val)")
print(" from psycopg.rows import class_row")
print(" conn.row_factory = class_row(MyDataclass) # MyDataclass(col=val)")
For the psycopg2 alternative — psycopg2 is the older, widely-used PostgreSQL adapter; psycopg3 (import as psycopg) is its modernized successor with native async/await support (no psycopg2-binary + aiopg combination needed), a cleaner API for connection pooling, server-side cursors, COPY, and pipeline mode; psycopg3 replaces %s server-side binding with the same %s syntax so migration is often straightforward. For the asyncpg alternative — asyncpg is async-only and extremely fast for PostgreSQL (compiled C extension, no DB-API layer); psycopg3 supports both sync and async in one package, follows the DB-API 2.0 interface, and integrates better with SQLAlchemy’s async ORM; choose asyncpg for raw performance in async-only services, psycopg3 when you need both sync/async paths or ORM compatibility. The Claude Skills 360 bundle includes psycopg3 skill sets covering psycopg.connect() and AsyncConnection.connect(), dict_row/namedtuple_row/class_row factories, execute()/fetchone()/fetchall(), execute_many() batch DML, insert_returning()/update_returning() helpers, copy_from_rows() bulk COPY, copy_to_csv() export, async_execute()/async_execute_one(), ConnectionPool and AsyncConnectionPool, make_sync_pool()/make_async_pool(), FastAPI dependency injection, and pipeline mode batching. Start with the free tier to try PostgreSQL client code generation.