requests is the de-facto Python HTTP library. pip install requests. GET: import requests; r = requests.get("https://api.example.com/users"); r.json(); r.status_code; r.text. POST: r = requests.post(url, json={"key":"val"}). PUT: requests.put(url, json=data). DELETE: requests.delete(url). PATCH: requests.patch(url, json={"field":"val"}). Headers: requests.get(url, headers={"Authorization":"Bearer token"}). Params: requests.get(url, params={"page":1,"per_page":20}) — auto-encodes to query string. Timeout: requests.get(url, timeout=10) — seconds float or (connect, read) tuple. Session: s = requests.Session(); s.headers.update({"Authorization":"Bearer tok"}); s.get(url) — reuses connections, shares cookies. Auth: requests.get(url, auth=("user","pass")) — HTTPBasicAuth. Bearer: headers={"Authorization":f"Bearer {token}"}. Response: r.ok → True if 2xx; r.raise_for_status() raises HTTPError on 4xx/5xx; r.json() parses JSON; r.text decoded; r.content bytes; r.headers["Content-Type"]. Stream: r = requests.get(url, stream=True); r.iter_content(chunk_size=8192). Files: requests.post(url, files={"file":open("f.csv","rb")}). Form: requests.post(url, data={"key":"val"}). Retry: from requests.adapters import HTTPAdapter; from urllib3.util.retry import Retry; s.mount("https://", HTTPAdapter(max_retries=Retry(total=3, backoff_factor=1))). Hooks: r = requests.get(url, hooks={"response": [lambda r,*a,**kw: print(r.status_code)]}). Verify: requests.get(url, verify=False) disables TLS (dev only); verify="cert.pem". Claude Code generates requests API clients, retry wrappers, authenticated sessions, and upload helpers.
CLAUDE.md for requests
## requests Stack
- Version: requests >= 2.31 | pip install requests
- GET/POST: requests.get(url, params={}, headers={}, timeout=10) | requests.post(url, json={})
- Session: s = requests.Session() | s.headers.update() | s.mount("https://", HTTPAdapter(max_retries=...))
- Error: r.raise_for_status() | except requests.HTTPError | except requests.RequestException
- Auth: headers={"Authorization": f"Bearer {token}"} | auth=("user", "pass")
- Stream: requests.get(url, stream=True) | r.iter_content(chunk_size=8192)
requests HTTP Client Pipeline
# app/http_client.py — requests session, retry, auth, pagination, upload, stream, mock
from __future__ import annotations
import time
import logging
from contextlib import contextmanager
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generator, Iterator
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
log = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# 1. Session factory with retry
# ─────────────────────────────────────────────────────────────────────────────
def make_session(
base_url: str = "",
token: str | None = None,
timeout: float | tuple[float, float] = (5.0, 30.0),
retries: int = 3,
backoff_factor: float = 0.5,
status_forcelist: tuple[int, ...] = (429, 500, 502, 503, 504),
headers: dict[str, str] | None = None,
) -> requests.Session:
"""
Build a Session with automatic retries, optional auth, and base URL.
Example:
s = make_session("https://api.github.com", token=os.getenv("GH_TOKEN"))
data = s.get("/repos/octocat/Hello-World").json()
"""
session = requests.Session()
retry = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=list(status_forcelist),
allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({"User-Agent": "python-requests-client/1.0"})
if token:
session.headers["Authorization"] = f"Bearer {token}"
if headers:
session.headers.update(headers)
# Store base_url as custom attribute for url_for() usage
session.base_url = base_url # type: ignore[attr-defined]
session.default_timeout = timeout # type: ignore[attr-defined]
return session
def url_for(session: requests.Session, path: str) -> str:
"""Prepend base_url to path if path is not already absolute."""
base = getattr(session, "base_url", "")
if path.startswith(("http://", "https://")):
return path
return base.rstrip("/") + "/" + path.lstrip("/")
def get(
session: requests.Session,
path: str,
params: dict | None = None,
**kwargs,
) -> requests.Response:
"""
GET with base URL and default timeout.
Example:
r = get(session, "/users", params={"page": 1})
users = r.json()["results"]
"""
timeout = kwargs.pop("timeout", getattr(session, "default_timeout", 30))
r = session.get(url_for(session, path), params=params, timeout=timeout, **kwargs)
r.raise_for_status()
return r
def post(
session: requests.Session,
path: str,
json: Any = None,
data: Any = None,
**kwargs,
) -> requests.Response:
"""
POST with base URL and default timeout.
Example:
r = post(session, "/users", json={"name": "Alice", "email": "[email protected]"})
created = r.json()
"""
timeout = kwargs.pop("timeout", getattr(session, "default_timeout", 30))
r = session.post(url_for(session, path), json=json, data=data, timeout=timeout, **kwargs)
r.raise_for_status()
return r
def put(session: requests.Session, path: str, json: Any = None, **kwargs) -> requests.Response:
timeout = kwargs.pop("timeout", getattr(session, "default_timeout", 30))
r = session.put(url_for(session, path), json=json, timeout=timeout, **kwargs)
r.raise_for_status()
return r
def patch(session: requests.Session, path: str, json: Any = None, **kwargs) -> requests.Response:
timeout = kwargs.pop("timeout", getattr(session, "default_timeout", 30))
r = session.patch(url_for(session, path), json=json, timeout=timeout, **kwargs)
r.raise_for_status()
return r
def delete(session: requests.Session, path: str, **kwargs) -> requests.Response:
timeout = kwargs.pop("timeout", getattr(session, "default_timeout", 30))
r = session.delete(url_for(session, path), timeout=timeout, **kwargs)
r.raise_for_status()
return r
# ─────────────────────────────────────────────────────────────────────────────
# 2. Pagination helpers
# ─────────────────────────────────────────────────────────────────────────────
def paginate_offset(
session: requests.Session,
path: str,
params: dict | None = None,
page_param: str = "page",
per_page_param: str = "per_page",
per_page: int = 100,
results_key: str = "results",
max_pages: int = 500,
) -> Iterator[Any]:
"""
Iterate over offset-paginated API results.
Example:
for user in paginate_offset(session, "/users", results_key="data"):
process(user)
"""
base_params = dict(params or {})
base_params[per_page_param] = per_page
for page in range(1, max_pages + 1):
base_params[page_param] = page
data = get(session, path, params=base_params).json()
items = data.get(results_key, data) if isinstance(data, dict) else data
if not items:
break
yield from items
if isinstance(data, dict) and len(items) < per_page:
break
def paginate_cursor(
session: requests.Session,
path: str,
params: dict | None = None,
cursor_param: str = "cursor",
next_key: str = "next_cursor",
results_key: str = "results",
max_pages: int = 500,
) -> Iterator[Any]:
"""
Iterate over cursor-paginated API results.
Example:
for event in paginate_cursor(session, "/events", next_key="next"):
process(event)
"""
current_params = dict(params or {})
for _ in range(max_pages):
data = get(session, path, params=current_params).json()
items = data.get(results_key, [])
yield from items
cursor = data.get(next_key)
if not cursor:
break
current_params[cursor_param] = cursor
def paginate_link_header(
session: requests.Session,
url: str,
results_key: str | None = None,
max_pages: int = 500,
) -> Iterator[Any]:
"""
Follow Link header pagination (GitHub-style).
Example:
for repo in paginate_link_header(session, "https://api.github.com/orgs/python/repos"):
print(repo["name"])
"""
for _ in range(max_pages):
r = session.get(url, timeout=getattr(session, "default_timeout", 30))
r.raise_for_status()
data = r.json()
items = data.get(results_key, data) if (isinstance(data, dict) and results_key) else data
yield from (items if isinstance(items, list) else [items])
link_header = r.headers.get("Link", "")
next_url = None
for part in link_header.split(","):
if 'rel="next"' in part:
next_url = part.strip().split(";")[0].strip("<>")
break
if not next_url:
break
url = next_url
# ─────────────────────────────────────────────────────────────────────────────
# 3. File upload and download
# ─────────────────────────────────────────────────────────────────────────────
def upload_file(
session: requests.Session,
path: str,
local_path: str | Path,
field_name: str = "file",
extra_data: dict | None = None,
) -> requests.Response:
"""
Upload a file via multipart/form-data.
Example:
r = upload_file(session, "/documents", "report.pdf", extra_data={"title": "Q1 Report"})
doc_id = r.json()["id"]
"""
local = Path(local_path)
with open(local, "rb") as f:
files = {field_name: (local.name, f, "application/octet-stream")}
data = extra_data or {}
timeout = getattr(session, "default_timeout", 60)
r = session.post(url_for(session, path), files=files, data=data, timeout=timeout)
r.raise_for_status()
return r
def download_file(
session: requests.Session,
url: str,
dest: str | Path,
chunk_size: int = 32_768,
progress: bool = False,
) -> Path:
"""
Stream-download a file to disk.
Example:
path = download_file(session, "https://example.com/dataset.csv.gz", "data/dataset.csv.gz")
"""
dest = Path(dest)
dest.parent.mkdir(parents=True, exist_ok=True)
timeout = getattr(session, "default_timeout", (5, 120))
with session.get(url, stream=True, timeout=timeout) as r:
r.raise_for_status()
total = int(r.headers.get("Content-Length", 0))
downloaded = 0
with open(dest, "wb") as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress and total:
pct = downloaded / total * 100
log.debug("Download %.1f%% (%d/%d bytes)", pct, downloaded, total)
return dest
# ─────────────────────────────────────────────────────────────────────────────
# 4. Error handling and response utilities
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class APIError(Exception):
status_code: int
message: str
body: dict = field(default_factory=dict)
def __str__(self) -> str:
return f"HTTP {self.status_code}: {self.message}"
def safe_json(r: requests.Response, default: Any = None) -> Any:
"""
Parse JSON from response, returning default if not JSON.
Example:
data = safe_json(r, default={})
"""
try:
return r.json()
except Exception:
return default
def raise_for_api_error(r: requests.Response) -> None:
"""
Raise APIError for 4xx/5xx with parsed body.
Example:
raise_for_api_error(r)
"""
if r.ok:
return
body = safe_json(r, default={})
message = (
body.get("message") or
body.get("error") or
body.get("detail") or
r.reason or
"API error"
)
raise APIError(status_code=r.status_code, message=str(message), body=body)
def with_rate_limit_retry(
fn,
max_retries: int = 5,
base_delay: float = 1.0,
) -> Any:
"""
Call fn(), retrying on 429 with Retry-After or exponential backoff.
Example:
data = with_rate_limit_retry(lambda: get(session, "/expensive").json())
"""
for attempt in range(max_retries):
try:
return fn()
except requests.HTTPError as exc:
if exc.response is None or exc.response.status_code != 429:
raise
retry_after = float(exc.response.headers.get("Retry-After", base_delay * (2 ** attempt)))
log.warning("Rate limited; waiting %.1fs (attempt %d)", retry_after, attempt + 1)
time.sleep(retry_after)
return fn() # final attempt, let exceptions propagate
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== requests demo using httpbin.org ===")
s = make_session("https://httpbin.org", timeout=(5, 15))
print("\n--- GET with params ---")
r = get(s, "/get", params={"hello": "world", "num": 42})
print(f" args: {r.json()['args']}")
print("\n--- POST JSON ---")
r = post(s, "/post", json={"name": "Alice", "role": "admin"})
body = r.json()["json"]
print(f" echoed json: {body}")
print("\n--- response headers ---")
r = s.get("https://httpbin.org/headers")
print(f" User-Agent: {r.json()['headers'].get('User-Agent', '-')}")
print("\n--- status code handling ---")
try:
r = s.get("https://httpbin.org/status/404", timeout=5)
r.raise_for_status()
except requests.HTTPError as e:
print(f" Caught HTTPError: {e.response.status_code}")
print("\n--- stream download (small) ---")
import tempfile, os
with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tf:
tmp = tf.name
download_file(s, "https://httpbin.org/json", tmp)
size = os.path.getsize(tmp)
os.unlink(tmp)
print(f" Downloaded {size} bytes to temp file")
print("\n=== done ===")
For the httpx alternative — httpx is a modern HTTP client with an API very similar to requests but with native async support (async with httpx.AsyncClient()), HTTP/2, and stricter timeouts by default; requests is synchronous-only but simpler to use in non-async code and has broader ecosystem compatibility (auth plugins, adapters, test mocking) — use requests for synchronous scripts, CLI tools, and codebases not using asyncio, httpx when you need async or HTTP/2 or want the same API across sync and async contexts. For the aiohttp alternative — aiohttp is an async-first HTTP client/server framework that does not try to match the requests API; it provides a ClientSession with fine-grained connection pool control and is typically faster at high concurrency than httpx — use aiohttp for high-throughput async HTTP workloads and when building async servers, requests for synchronous use cases, httpx for async code that needs a requests-compatible API. The Claude Skills 360 bundle includes requests skill sets covering make_session() with retry/auth/base_url, get()/post()/put()/patch()/delete() with raise_for_status, paginate_offset()/paginate_cursor()/paginate_link_header() iterators, upload_file()/download_file() with streaming, safe_json()/raise_for_api_error()/with_rate_limit_retry() error helpers, and APIError dataclass. Start with the free tier to try HTTP client automation and REST API integration code generation.