pypika is a Python SQL query builder — construct SQL programmatically without string concatenation. pip install pypika. Table: from pypika import Table, Query; users = Table("users"); q = Query.from_(users).select("*"). Select fields: q = Query.from_(users).select(users.id, users.name). Where: .where(users.active == True). And: .where((users.age > 18) & (users.role == "admin")). Join: .join(orders).on(users.id == orders.user_id). Left join: .left_join(...). Group by: .groupby(users.role). Having: .having(fn.Count("*") > 1). Order: .orderby(users.name). .orderby(users.age, order=Order.desc). Limit/offset: .limit(10).offset(20). Alias: users.name.as_("display_name"). Insert: Query.into(users).insert(1, "Alice", True). Insert dict: Query.into(users).columns(users.id, users.name).insert(1, "Alice"). Update: Query.update(users).set(users.name, "Bob").where(users.id == 1). Delete: Query.from_(users).delete().where(users.active == False). Subquery: .from_(sub_q.as_("sub")). Functions: from pypika import functions as fn; fn.Count("*"), fn.Sum(users.score), fn.Avg(users.score). Case: Case().when(users.age > 18, "adult").else_("minor"). Param: from pypika import Parameter; Parameter("%s"). Dialect: from pypika import PostgreSQLQuery, MySQLQuery. Claude Code generates pypika SELECT/INSERT/UPDATE/DELETE builders, parameterized query factories, and SQL pipeline helpers.
CLAUDE.md for pypika
## pypika Stack
- Version: pypika >= 0.48 | pip install pypika
- Select: Query.from_(Table("t")).select(t.col).where(t.field == val)
- Joins: .left_join(t2).on(t1.id == t2.fk) | .join() | .right_join()
- DML: Query.into(t).insert(...) | Query.update(t).set(f,v) | .delete()
- Aggregate: fn.Count("*") | fn.Sum(t.col) | fn.Avg | fn.Max | fn.Min
- Dialect: PostgreSQLQuery | MySQLQuery for DB-specific syntax
- Params: Parameter("%s") for psycopg2 | Parameter("?") for sqlite3
pypika SQL Builder Pipeline
# app/query_builder.py — pypika SELECT/INSERT/UPDATE/DELETE, joins, aggregates, subqueries
from __future__ import annotations
from typing import Any
from pypika import (
Case,
Criterion,
Field,
Order,
Parameter,
PostgreSQLQuery,
Query,
Table,
Tables,
functions as fn,
)
from pypika.enums import JoinType
# ─────────────────────────────────────────────────────────────────────────────
# 1. Select builders
# ─────────────────────────────────────────────────────────────────────────────
def select_all(table_name: str) -> str:
"""SELECT * FROM table_name"""
t = Table(table_name)
return Query.from_(t).select("*").get_sql()
def select_by_id(table_name: str, id_value: Any, id_col: str = "id") -> tuple[str, list]:
"""SELECT * FROM t WHERE id = %s — returns (sql, params)"""
t = Table(table_name)
q = Query.from_(t).select("*").where(t.field(id_col) == Parameter("%s"))
return q.get_sql(), [id_value]
def select_where(
table_name: str,
conditions: dict[str, Any],
columns: list[str] | None = None,
limit: int | None = None,
offset: int | None = None,
order_by: str | None = None,
order_desc: bool = False,
) -> tuple[str, list]:
"""
Build a SELECT with exact-match WHERE conditions.
Returns (sql, params).
Example:
sql, params = select_where("users", {"role": "admin", "active": True})
cursor.execute(sql, params)
"""
t = Table(table_name)
if columns:
q = Query.from_(t).select(*[t.field(c) for c in columns])
else:
q = Query.from_(t).select("*")
params: list[Any] = []
for col, val in conditions.items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
if order_by:
order = Order.desc if order_desc else Order.asc
q = q.orderby(t.field(order_by), order=order)
if limit is not None:
q = q.limit(limit)
if offset is not None:
q = q.offset(offset)
return q.get_sql(), params
def select_in(
table_name: str,
column: str,
values: list[Any],
extra_conditions: dict[str, Any] | None = None,
) -> tuple[str, list]:
"""SELECT * FROM t WHERE col IN (v1, v2, ...) — returns (sql, params)"""
t = Table(table_name)
placeholders = [Parameter("%s")] * len(values)
q = Query.from_(t).select("*").where(t.field(column).isin(placeholders))
params = list(values)
if extra_conditions:
for col, val in extra_conditions.items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
return q.get_sql(), params
def select_paginated(
table_name: str,
page: int = 1,
per_page: int = 20,
conditions: dict[str, Any] | None = None,
sort_by: str = "id",
sort_desc: bool = False,
) -> tuple[str, list]:
"""Paginated SELECT with optional conditions."""
return select_where(
table_name,
conditions=conditions or {},
limit=per_page,
offset=(page - 1) * per_page,
order_by=sort_by,
order_desc=sort_desc,
)
# ─────────────────────────────────────────────────────────────────────────────
# 2. Join builders
# ─────────────────────────────────────────────────────────────────────────────
def select_with_join(
base_table: str,
join_table: str,
join_on: tuple[str, str], # (base_col, join_col)
columns: dict[str, list[str]] | None = None, # {table_name: [cols]}
conditions: list | None = None,
left_join: bool = True,
) -> str:
"""
Build a SELECT with a single LEFT or INNER JOIN.
columns: {table_name: [col_names]} — defaults to all from both tables.
Example:
sql = select_with_join(
"users", "profiles",
join_on=("id", "user_id"),
columns={"users": ["id","name"], "profiles": ["bio","avatar"]},
)
"""
bt = Table(base_table)
jt = Table(join_table)
if columns:
cols = [bt.field(c) for c in columns.get(base_table, [])] + \
[jt.field(c) for c in columns.get(join_table, [])]
else:
cols = [bt.star, jt.star]
join_type = JoinType.left if left_join else JoinType.inner
base_col, join_col = join_on
q = (Query.from_(bt)
.join(jt, how=join_type).on(bt.field(base_col) == jt.field(join_col))
.select(*cols))
for crit in (conditions or []):
q = q.where(crit)
return q.get_sql()
# ─────────────────────────────────────────────────────────────────────────────
# 3. Aggregate queries
# ─────────────────────────────────────────────────────────────────────────────
def count_by_group(
table_name: str,
group_col: str,
count_alias: str = "count",
conditions: dict[str, Any] | None = None,
) -> tuple[str, list]:
"""
SELECT group_col, COUNT(*) AS count FROM t GROUP BY group_col.
"""
t = Table(table_name)
q = (Query.from_(t)
.select(t.field(group_col), fn.Count("*").as_(count_alias))
.groupby(t.field(group_col)))
params: list[Any] = []
for col, val in (conditions or {}).items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
return q.get_sql(), params
def aggregate_stats(
table_name: str,
numeric_col: str,
group_by: str | None = None,
conditions: dict[str, Any] | None = None,
) -> tuple[str, list]:
"""
SELECT COUNT, SUM, AVG, MIN, MAX for a numeric column.
Optional GROUP BY.
"""
t = Table(table_name)
nc = t.field(numeric_col)
q = (Query.from_(t)
.select(
fn.Count("*").as_("count"),
fn.Sum(nc).as_("total"),
fn.Avg(nc).as_("average"),
fn.Min(nc).as_("minimum"),
fn.Max(nc).as_("maximum"),
))
params: list[Any] = []
for col, val in (conditions or {}).items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
if group_by:
gc = t.field(group_by)
q = Query.from_(t).select(
gc,
fn.Count("*").as_("count"),
fn.Sum(nc).as_("total"),
fn.Avg(nc).as_("average"),
).groupby(gc)
return q.get_sql(), params
# ─────────────────────────────────────────────────────────────────────────────
# 4. DML builders
# ─────────────────────────────────────────────────────────────────────────────
def insert_row(table_name: str, data: dict[str, Any]) -> tuple[str, list]:
"""
INSERT INTO table (cols...) VALUES (%s, ...) — returns (sql, params).
"""
t = Table(table_name)
cols = list(data.keys())
q = (Query.into(t)
.columns(*[t.field(c) for c in cols])
.insert(*[Parameter("%s")] * len(cols)))
return q.get_sql(), list(data.values())
def insert_rows(table_name: str, rows: list[dict[str, Any]]) -> tuple[str, list]:
"""
Multi-row INSERT. All rows must have the same keys.
Returns (sql, flat_params).
"""
if not rows:
raise ValueError("rows must not be empty")
t = Table(table_name)
cols = list(rows[0].keys())
q = Query.into(t).columns(*[t.field(c) for c in cols])
for _ in rows:
q = q.insert(*[Parameter("%s")] * len(cols))
flat_params = [v for row in rows for v in row.values()]
return q.get_sql(), flat_params
def update_row(
table_name: str,
data: dict[str, Any],
conditions: dict[str, Any],
) -> tuple[str, list]:
"""
UPDATE table SET col=%s WHERE cond_col=%s — returns (sql, params).
"""
t = Table(table_name)
q = Query.update(t)
params: list[Any] = []
for col, val in data.items():
q = q.set(t.field(col), Parameter("%s"))
params.append(val)
for col, val in conditions.items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
return q.get_sql(), params
def delete_rows(
table_name: str,
conditions: dict[str, Any],
) -> tuple[str, list]:
"""DELETE FROM table WHERE ... — returns (sql, params)."""
t = Table(table_name)
q = Query.from_(t).delete()
params: list[Any] = []
for col, val in conditions.items():
q = q.where(t.field(col) == Parameter("%s"))
params.append(val)
return q.get_sql(), params
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== SELECT builders ===")
print(select_all("users"))
sql, p = select_by_id("users", 42)
print(sql, p)
sql, p = select_where(
"users",
{"role": "admin", "active": True},
columns=["id", "name", "email"],
limit=10,
order_by="name",
)
print(sql)
print("params:", p)
print("\n=== SELECT IN ===")
sql, p = select_in("users", "role", ["admin", "moderator"])
print(sql, p)
print("\n=== Paginated ===")
sql, p = select_paginated("users", page=2, per_page=20, sort_by="created_at", sort_desc=True)
print(sql, p)
print("\n=== JOIN ===")
sql = select_with_join(
"users", "profiles",
join_on=("id", "user_id"),
columns={"users": ["id","name","email"], "profiles": ["bio","avatar_url"]},
)
print(sql)
print("\n=== Aggregates ===")
sql, p = count_by_group("users", "role")
print(sql)
sql, p = aggregate_stats("orders", "total", group_by="status")
print(sql)
print("\n=== INSERT ===")
sql, p = insert_row("users", {"name": "Alice", "email": "[email protected]", "role": "user"})
print(sql)
print("params:", p)
sql, p = insert_rows("users", [
{"name": "Bob", "email": "[email protected]", "role": "user"},
{"name": "Carol", "email": "[email protected]", "role": "admin"},
])
print(sql)
print("params:", p)
print("\n=== UPDATE ===")
sql, p = update_row("users", {"role": "moderator"}, {"id": 42})
print(sql, p)
print("\n=== DELETE ===")
sql, p = delete_rows("users", {"active": False, "role": "user"})
print(sql, p)
print("\n=== Case expression ===")
from pypika import Query, Table, Case
users = Table("users")
label = Case().when(users.age >= 18, "adult").else_("minor").as_("age_group")
q = Query.from_(users).select(users.name, users.age, label)
print(q.get_sql())
For the SQLAlchemy Core alternative — SQLAlchemy Core is more feature-complete (dialect-aware DDL, connection pooling, transactions, reflection), but pypika is lighter (~50 kB), has no dependencies, and generates SQL strings without needing a connection — ideal for building query objects that you pass to raw drivers like psycopg2, asyncpg, or aiopg. For the SQLObject / Peewee ORM alternative — ORMs map rows to objects and handle the full query execution lifecycle; pypika only builds SQL strings, giving you full control over query execution and working well alongside async drivers (asyncpg, aiopg) that don’t support ORM session patterns. The Claude Skills 360 bundle includes pypika skill sets covering select_all/select_by_id/select_where/select_in/select_paginated, select_with_join() left/inner JOIN builder, count_by_group()/aggregate_stats() aggregates, insert_row()/insert_rows()/update_row()/delete_rows() DML, parameterized queries with Parameter(“%s”), PostgreSQLQuery dialect, and Case/fn.Count/fn.Sum/fn.Avg expressions. Start with the free tier to try SQL query builder code generation.