Claude Code for psycopg3: Modern PostgreSQL Client for Python — Claude Skills 360 Blog
Blog / AI / Claude Code for psycopg3: Modern PostgreSQL Client for Python
AI

Claude Code for psycopg3: Modern PostgreSQL Client for Python

Published: March 13, 2028
Read time: 5 min read
By: Claude Skills 360

psycopg3 (psycopg) is the modern PostgreSQL adapter for Python with native async support. pip install psycopg[binary]. Sync: import psycopg; conn = psycopg.connect("postgresql://user:pw@host/db"). Query: conn.execute("SELECT * FROM users WHERE id = %s", (user_id,)). Fetch: conn.execute(...).fetchone() fetchmany(10) fetchall(). for row in conn.execute("SELECT * FROM t"):. Async: async with await psycopg.AsyncConnection.connect(dsn) as conn: await conn.execute("SELECT 1"). Row factory: conn.row_factory = psycopg.rows.dict_row → dicts. namedtuple_row → namedtuples. class_row(MyClass). Transaction: with conn.transaction(): conn.execute(...). conn.autocommit = True. Copy: with conn.copy("COPY t FROM STDIN") as copy: copy.write_row((1,"a")). Pipeline: with conn.pipeline(): conn.execute(...) — batches round trips. Server cursor: conn.cursor("name").execute("SELECT ...") — server-side, for large results. Pool: from psycopg_pool import ConnectionPool; pool = ConnectionPool(dsn). with pool.connection() as conn:. Prepare: conn.execute("SELECT %s::int", prepare=True). psycopg.adapt.Loader and .Dumper for custom types. psycopg.types.TypeInfo. Claude Code generates psycopg3 database layers, async FastAPI handlers, and bulk COPY pipelines.

CLAUDE.md for psycopg3

## psycopg3 Stack
- Version: psycopg >= 3.1 | pip install "psycopg[binary]" psycopg_pool
- Sync: psycopg.connect(dsn) | conn.execute(sql, params) | cursor.fetchall()
- Async: await psycopg.AsyncConnection.connect(dsn) | await conn.execute()
- Rows: conn.row_factory = dict_row | namedtuple_row | class_row(Model)
- Transaction: with conn.transaction(): ... | conn.autocommit=True
- Pool: ConnectionPool(dsn) / AsyncConnectionPool(dsn) | with pool.connection()
- COPY: with conn.copy("COPY t FROM STDIN") as copy: copy.write_row(row)

psycopg3 PostgreSQL Access Pipeline

# app/db.py — psycopg3 sync/async connection, CRUD, transactions, and bulk COPY
from __future__ import annotations

import contextlib
import logging
from collections.abc import AsyncIterator, Iterator
from typing import Any

import psycopg
from psycopg.rows import dict_row, namedtuple_row

logger = logging.getLogger(__name__)


# ─────────────────────────────────────────────────────────────────────────────
# 1. Synchronous helpers
# ─────────────────────────────────────────────────────────────────────────────

def connect(dsn: str, autocommit: bool = False, **kwargs) -> psycopg.Connection:
    """
    Open a synchronous psycopg3 connection.
    autocommit=True: each statement is immediately committed (no explicit BEGIN).
    row_factory defaults to dict_row so rows come back as {column: value} dicts.
    """
    conn = psycopg.connect(dsn, row_factory=dict_row, **kwargs)
    if autocommit:
        conn.autocommit = True
    return conn


@contextlib.contextmanager
def get_conn(dsn: str, autocommit: bool = False) -> Iterator[psycopg.Connection]:
    """Context manager: open a connection and close on exit."""
    conn = connect(dsn, autocommit=autocommit)
    try:
        yield conn
    finally:
        conn.close()


def execute(
    conn: psycopg.Connection,
    sql: str,
    params: tuple | list | dict | None = None,
) -> list[dict[str, Any]]:
    """
    Execute a SELECT query and return all rows as dicts.
    Uses %s positional placeholders (psycopg3 style).
    """
    cur = conn.execute(sql, params)
    return cur.fetchall()


def execute_one(
    conn: psycopg.Connection,
    sql: str,
    params: tuple | list | dict | None = None,
) -> dict[str, Any] | None:
    """Execute a query and return the first row, or None."""
    cur = conn.execute(sql, params)
    return cur.fetchone()


def execute_scalar(
    conn: psycopg.Connection,
    sql: str,
    params: tuple | list | dict | None = None,
) -> Any:
    """Return the first column of the first row (scalar)."""
    row = execute_one(conn, sql, params)
    if row is None:
        return None
    return next(iter(row.values()))


def execute_many(
    conn: psycopg.Connection,
    sql: str,
    params_list: list[tuple | dict],
) -> int:
    """
    Execute a DML statement (INSERT/UPDATE/DELETE) for multiple parameter sets.
    Returns total rows affected.
    """
    with conn.cursor() as cur:
        cur.executemany(sql, params_list)
        return cur.rowcount


# ─────────────────────────────────────────────────────────────────────────────
# 2. CRUD helpers
# ─────────────────────────────────────────────────────────────────────────────

def insert_returning(
    conn: psycopg.Connection,
    table: str,
    data: dict[str, Any],
) -> dict[str, Any]:
    """
    INSERT a row and return the full inserted record (RETURNING *).
    table: bare table name (trusted — don't accept from user input).
    data: column → value dict.
    """
    cols  = list(data.keys())
    vals  = list(data.values())
    ph    = ", ".join(["%s"] * len(cols))
    col_names = ", ".join(cols)
    sql   = f"INSERT INTO {table} ({col_names}) VALUES ({ph}) RETURNING *"
    return conn.execute(sql, vals).fetchone()


def update_returning(
    conn: psycopg.Connection,
    table: str,
    data: dict[str, Any],
    where: str,
    where_params: tuple | list,
) -> list[dict[str, Any]]:
    """
    UPDATE rows matching `where` and return updated records.
    data: columns to set.
    where: SQL condition fragment e.g. "id = %s".
    """
    set_clause = ", ".join(f"{col} = %s" for col in data.keys())
    sql = f"UPDATE {table} SET {set_clause} WHERE {where} RETURNING *"
    return conn.execute(sql, list(data.values()) + list(where_params)).fetchall()


def delete_returning(
    conn: psycopg.Connection,
    table: str,
    where: str,
    where_params: tuple | list,
) -> list[dict[str, Any]]:
    """DELETE rows and return deleted records."""
    sql = f"DELETE FROM {table} WHERE {where} RETURNING *"
    return conn.execute(sql, list(where_params)).fetchall()


# ─────────────────────────────────────────────────────────────────────────────
# 3. Bulk COPY
# ─────────────────────────────────────────────────────────────────────────────

def copy_from_rows(
    conn: psycopg.Connection,
    table: str,
    columns: list[str],
    rows: list[tuple],
) -> int:
    """
    Bulk-load rows into `table` using PostgreSQL COPY.
    Significantly faster than executemany for large datasets.
    Returns number of rows copied.
    """
    col_list = ", ".join(columns)
    count = 0
    with conn.copy(f"COPY {table} ({col_list}) FROM STDIN") as copy:
        for row in rows:
            copy.write_row(row)
            count += 1
    return count


def copy_to_csv(
    conn: psycopg.Connection,
    query: str,
    output_path: str,
    params: tuple | None = None,
) -> int:
    """
    Export query results to a CSV file using COPY TO.
    Returns number of rows exported.
    """
    count = 0
    with open(output_path, "w", encoding="utf-8") as fh:
        sql = f"COPY ({query}) TO STDOUT WITH CSV HEADER"
        with conn.copy(sql) as copy:
            for chunk in copy:
                fh.write(chunk.decode() if isinstance(chunk, bytes) else chunk)
                count += chunk.count(b"\n") if isinstance(chunk, bytes) else chunk.count("\n")
    return count


# ─────────────────────────────────────────────────────────────────────────────
# 4. Async helpers
# ─────────────────────────────────────────────────────────────────────────────

async def async_connect(dsn: str, autocommit: bool = False) -> psycopg.AsyncConnection:
    """Open an async psycopg3 connection with dict rows."""
    conn = await psycopg.AsyncConnection.connect(dsn, row_factory=dict_row)
    if autocommit:
        await conn.set_autocommit(True)
    return conn


@contextlib.asynccontextmanager
async def get_async_conn(dsn: str, autocommit: bool = False) -> AsyncIterator[psycopg.AsyncConnection]:
    """Async context manager: open and close a psycopg3 async connection."""
    conn = await async_connect(dsn, autocommit=autocommit)
    try:
        yield conn
    finally:
        await conn.close()


async def async_execute(
    conn: psycopg.AsyncConnection,
    sql: str,
    params: tuple | list | dict | None = None,
) -> list[dict[str, Any]]:
    """Execute SELECT and return all rows as dicts."""
    cur = await conn.execute(sql, params)
    return await cur.fetchall()


async def async_execute_one(
    conn: psycopg.AsyncConnection,
    sql: str,
    params: tuple | list | dict | None = None,
) -> dict[str, Any] | None:
    """Execute and return first row, or None."""
    cur = await conn.execute(sql, params)
    return await cur.fetchone()


# ─────────────────────────────────────────────────────────────────────────────
# 5. Connection pool helpers
# ─────────────────────────────────────────────────────────────────────────────

def make_sync_pool(dsn: str, min_size: int = 2, max_size: int = 10):
    """
    Create a synchronous connection pool.
    pip install psycopg_pool
    """
    from psycopg_pool import ConnectionPool
    return ConnectionPool(
        dsn,
        min_size=min_size,
        max_size=max_size,
        kwargs={"row_factory": dict_row},
    )


async def make_async_pool(dsn: str, min_size: int = 2, max_size: int = 10):
    """Create an async connection pool."""
    from psycopg_pool import AsyncConnectionPool
    pool = AsyncConnectionPool(
        dsn,
        min_size=min_size,
        max_size=max_size,
        kwargs={"row_factory": dict_row},
    )
    await pool.open()
    return pool


# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI dependency example
# ─────────────────────────────────────────────────────────────────────────────

def make_fastapi_db_dependency(pool):
    """
    Return a FastAPI dependency that yields a pooled async connection.

    Usage:
        pool = await make_async_pool(DSN)
        get_db = make_fastapi_db_dependency(pool)

        @app.get("/users/{user_id}")
        async def get_user(user_id: int, conn=Depends(get_db)):
            return await async_execute_one(conn, "SELECT * FROM users WHERE id=%s", (user_id,))
    """
    @contextlib.asynccontextmanager
    async def _get_db():
        async with pool.connection() as conn:
            yield conn

    return _get_db


# ─────────────────────────────────────────────────────────────────────────────
# Demo (requires a running PostgreSQL or will show the API)
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import asyncio

    DSN = "postgresql://postgres:postgres@localhost/testdb"

    print("=== psycopg3 API demo (requires PostgreSQL) ===")
    print("  Sync connect:")
    print("    conn = psycopg.connect(DSN, row_factory=dict_row)")
    print("    rows = conn.execute('SELECT * FROM t WHERE x=%s', (1,)).fetchall()")
    print("    conn.execute('INSERT INTO t (x) VALUES (%s)', (42,))")
    print("    conn.commit()")

    print("\n  Transaction:")
    print("    with conn.transaction():")
    print("        conn.execute(sql1, params1)")
    print("        conn.execute(sql2, params2)")

    print("\n  COPY bulk load:")
    print("    with conn.copy('COPY t (a,b) FROM STDIN') as copy:")
    print("        for row in data:")
    print("            copy.write_row(row)")

    print("\n  Async:")
    print("    conn = await psycopg.AsyncConnection.connect(DSN, row_factory=dict_row)")
    print("    row  = await (await conn.execute('SELECT * FROM t')).fetchone()")

    print("\n  Pipeline mode (batch multiple queries, fewer round trips):")
    print("    with conn.pipeline():")
    print("        conn.execute('INSERT ...')")
    print("        conn.execute('UPDATE ...')")
    print("        conn.execute('DELETE ...')")
    print("    # all 3 sent in a single network round trip")

    print("\n  Row factories:")
    print("    conn.row_factory = dict_row      # {col: val}")
    print("    conn.row_factory = namedtuple_row # Row(col=val)")
    print("    from psycopg.rows import class_row")
    print("    conn.row_factory = class_row(MyDataclass)  # MyDataclass(col=val)")

For the psycopg2 alternative — psycopg2 is the older, widely-used PostgreSQL adapter; psycopg3 (import as psycopg) is its modernized successor with native async/await support (no psycopg2-binary + aiopg combination needed), a cleaner API for connection pooling, server-side cursors, COPY, and pipeline mode; psycopg3 replaces %s server-side binding with the same %s syntax so migration is often straightforward. For the asyncpg alternative — asyncpg is async-only and extremely fast for PostgreSQL (compiled C extension, no DB-API layer); psycopg3 supports both sync and async in one package, follows the DB-API 2.0 interface, and integrates better with SQLAlchemy’s async ORM; choose asyncpg for raw performance in async-only services, psycopg3 when you need both sync/async paths or ORM compatibility. The Claude Skills 360 bundle includes psycopg3 skill sets covering psycopg.connect() and AsyncConnection.connect(), dict_row/namedtuple_row/class_row factories, execute()/fetchone()/fetchall(), execute_many() batch DML, insert_returning()/update_returning() helpers, copy_from_rows() bulk COPY, copy_to_csv() export, async_execute()/async_execute_one(), ConnectionPool and AsyncConnectionPool, make_sync_pool()/make_async_pool(), FastAPI dependency injection, and pipeline mode batching. Start with the free tier to try PostgreSQL client code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free