httpx is a modern, async-first HTTP client with a requests-compatible API. pip install httpx. import httpx. Sync: with httpx.Client(base_url="https://api.example.com", timeout=30) as client: r = client.get("/path"). Async: async with httpx.AsyncClient() as client: r = await client.get(url). Methods: client.get/post/put/patch/delete(url, params={}, json={}, headers={}, auth=...). Response: r.status_code, r.json(), r.text, r.content (bytes), r.headers. Raise: r.raise_for_status() — raises HTTPStatusError for 4xx/5xx. Auth: httpx.BasicAuth("user","pass"), custom: class MyAuth(httpx.Auth): def auth_flow(self, r): yield r.copy_with(headers={"X-Token":"..."}). Timeout: httpx.Timeout(connect=5, read=30, write=10, pool=5). Limits: httpx.Limits(max_connections=100, max_keepalive_connections=20). Follow redirects: client.get(url, follow_redirects=True). Streaming: with client.stream("GET", url) as r: for chunk in r.iter_bytes(1024): .... Async stream: async for line in r.aiter_lines(): .... Event hooks: client = httpx.Client(event_hooks={"request":[log_req], "response":[log_resp]}). Retry: from httpx import AsyncHTTPTransport, transport = AsyncHTTPTransport(retries=3). Proxy: client = httpx.AsyncClient(proxies="socks5://proxy:1080"). HTTP/2: httpx.AsyncClient(http2=True). Mock: with respx.mock: respx.get("https://api.example.com/data").mock(return_value=httpx.Response(200, json={"key":"val"})). Claude Code generates httpx API clients, async scrapers, retry wrappers, and mock-based test suites.
CLAUDE.md for httpx
## httpx Stack
- Version: httpx >= 0.27
- Sync: httpx.Client(base_url, timeout, auth, limits) as context manager
- Async: httpx.AsyncClient(...) — await client.get/post/put/patch/delete(url)
- Response: r.raise_for_status() | r.json() | r.text | r.content | r.headers
- Auth: BasicAuth | BearerAuth | custom Auth subclass with auth_flow generator
- Timeout: httpx.Timeout(connect=5, read=30) — always set explicit timeouts
- Retry: AsyncHTTPTransport(retries=N) | wrap with tenacity for backoff
- Test: respx.mock for mocking httpx requests in unit tests
httpx HTTP Client Pipeline
# net/httpx_pipeline.py — modern async HTTP client with httpx
from __future__ import annotations
import asyncio
import json
import logging
import time
from typing import Any, AsyncGenerator, Generator, Callable
from collections.abc import Iterator
import httpx
logger = logging.getLogger(__name__)
# ── 1. Authentication helpers ─────────────────────────────────────────────────
class BearerAuth(httpx.Auth):
"""
Bearer token authentication — adds Authorization: Bearer <token> header.
Supports token refresh via a callable (called when 401 is received).
"""
def __init__(self, token: str, refresh_fn: Callable | None = None):
self.token = token
self.refresh_fn = refresh_fn
def auth_flow(self, request: httpx.Request) -> Generator:
request.headers["Authorization"] = f"Bearer {self.token}"
response = yield request
if response.status_code == 401 and self.refresh_fn:
self.token = self.refresh_fn()
request.headers["Authorization"] = f"Bearer {self.token}"
yield request
class ApiKeyAuth(httpx.Auth):
"""API key in a custom header or query param."""
def __init__(self, api_key: str, header: str = "X-API-Key"):
self.api_key = api_key
self.header = header
def auth_flow(self, request: httpx.Request) -> Generator:
request.headers[self.header] = self.api_key
yield request
# ── 2. Sync client ────────────────────────────────────────────────────────────
def make_sync_client(
base_url: str = "",
timeout: float | httpx.Timeout = 30.0,
auth: httpx.Auth | None = None,
headers: dict = None,
max_connections: int = 20,
retries: int = 0,
follow_redirects: bool = True,
verify: bool = True,
) -> httpx.Client:
"""
Create a configured httpx.Client (sync).
Use as context manager: `with make_sync_client(...) as client:`
"""
transport = httpx.HTTPTransport(retries=retries)
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max(5, max_connections // 4),
)
if isinstance(timeout, (int, float)):
timeout = httpx.Timeout(connect=10, read=timeout, write=timeout, pool=5)
return httpx.Client(
base_url=base_url,
timeout=timeout,
auth=auth,
headers=headers or {},
transport=transport,
limits=limits,
follow_redirects=follow_redirects,
verify=verify,
)
def get_json(
url: str,
params: dict = None,
headers: dict = None,
auth: httpx.Auth | None = None,
timeout: float = 30.0,
) -> dict:
"""One-shot GET that returns parsed JSON. Raises on HTTP errors."""
with httpx.Client(timeout=timeout) as client:
r = client.get(url, params=params, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
def post_json(
url: str,
payload: dict,
headers: dict = None,
auth: httpx.Auth | None = None,
timeout: float = 30.0,
) -> dict:
"""One-shot POST with JSON body. Returns parsed response JSON."""
with httpx.Client(timeout=timeout) as client:
r = client.post(url, json=payload, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
# ── 3. Async client ───────────────────────────────────────────────────────────
def make_async_client(
base_url: str = "",
timeout: float | httpx.Timeout = 30.0,
auth: httpx.Auth | None = None,
headers: dict = None,
max_connections: int = 100,
retries: int = 3,
follow_redirects: bool = True,
http2: bool = False,
) -> httpx.AsyncClient:
"""
Create configured httpx.AsyncClient.
Use as: `async with make_async_client(...) as client:`
"""
transport = httpx.AsyncHTTPTransport(retries=retries, http2=http2)
limits = httpx.Limits(
max_connections=max_connections,
max_keepalive_connections=max(10, max_connections // 5),
keepalive_expiry=30,
)
if isinstance(timeout, (int, float)):
timeout = httpx.Timeout(connect=10, read=timeout, write=timeout, pool=5)
return httpx.AsyncClient(
base_url=base_url,
timeout=timeout,
auth=auth,
headers=headers or {},
transport=transport,
limits=limits,
follow_redirects=follow_redirects,
)
async def async_get_json(
url: str,
params: dict = None,
headers: dict = None,
auth: httpx.Auth | None = None,
) -> dict:
"""Async one-shot GET returning JSON."""
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(url, params=params, headers=headers, auth=auth)
r.raise_for_status()
return r.json()
async def parallel_requests(
requests: list[dict],
max_concurrent: int = 10,
) -> list[dict | Exception]:
"""
Execute many requests concurrently with a semaphore limit.
requests: [{"method": "GET", "url": "...", "params": {}, ...}, ...]
Returns list of json results or exceptions in the same order.
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def _one(client: httpx.AsyncClient, req: dict) -> dict | Exception:
async with semaphore:
try:
method = req.pop("method", "GET").upper()
r = await client.request(method, **req)
r.raise_for_status()
return r.json()
except Exception as e:
return e
async with make_async_client(max_connections=max_concurrent) as client:
tasks = [_one(client, dict(req)) for req in requests]
return await asyncio.gather(*tasks)
# ── 4. Streaming ──────────────────────────────────────────────────────────────
def stream_download(
url: str,
dest_path: str,
chunk_size: int = 65_536, # 64 KB
headers: dict = None,
) -> int:
"""
Stream large file download to disk.
Returns total bytes written.
"""
total = 0
with httpx.Client(timeout=httpx.Timeout(connect=10, read=None)) as client:
with client.stream("GET", url, headers=headers or {}, follow_redirects=True) as r:
r.raise_for_status()
with open(dest_path, "wb") as f:
for chunk in r.iter_bytes(chunk_size):
f.write(chunk)
total += len(chunk)
return total
async def stream_ndjson(
url: str,
client: httpx.AsyncClient = None,
) -> AsyncGenerator[dict, None]:
"""
Async generator that yields parsed JSON objects from an NDJSON stream.
Useful for Server-Sent Events or streaming API responses.
"""
owned = client is None
if owned:
client = make_async_client(timeout=httpx.Timeout(connect=10, read=None))
try:
async with client.stream("GET", url) as r:
r.raise_for_status()
async for line in r.aiter_lines():
line = line.strip()
if line.startswith("data: "):
line = line[6:]
if line and line != "[DONE]":
try:
yield json.loads(line)
except json.JSONDecodeError:
pass
finally:
if owned:
await client.aclose()
# ── 5. Event hooks (logging, timing) ─────────────────────────────────────────
def make_logging_client(
base_url: str = "",
level: int = logging.DEBUG,
**kwargs,
) -> httpx.Client:
"""
Create a sync client that logs all requests and response timing.
"""
def _log_request(request: httpx.Request) -> None:
logger.log(level, "→ %s %s", request.method, request.url)
def _log_response(response: httpx.Response) -> None:
elapsed = response.elapsed.total_seconds() * 1000
logger.log(level, "← %s %s %.1f ms",
response.status_code, response.url, elapsed)
return httpx.Client(
base_url=base_url,
event_hooks={"request": [_log_request], "response": [_log_response]},
**kwargs,
)
# ── 6. REST API client pattern ────────────────────────────────────────────────
class ApiClient:
"""
Reusable async API client with token auth, base URL, and error handling.
Usage:
async with ApiClient("https://api.example.com", token="...") as api:
data = await api.get("/users", params={"page": 1})
"""
def __init__(
self,
base_url: str,
token: str = "",
api_key: str = "",
timeout: float = 30.0,
retries: int = 3,
rate_limit: int = 50, # requests per second (soft cap)
):
self._base_url = base_url.rstrip("/")
self._auth = BearerAuth(token) if token else (ApiKeyAuth(api_key) if api_key else None)
self._timeout = timeout
self._retries = retries
self._semaphore = asyncio.Semaphore(rate_limit)
self._client: httpx.AsyncClient | None = None
async def __aenter__(self):
self._client = make_async_client(
base_url=self._base_url,
timeout=self._timeout,
auth=self._auth,
retries=self._retries,
)
return self
async def __aexit__(self, *_):
if self._client:
await self._client.aclose()
async def _request(self, method: str, path: str, **kwargs) -> Any:
async with self._semaphore:
r = await self._client.request(method, path, **kwargs)
r.raise_for_status()
ct = r.headers.get("content-type", "")
return r.json() if "json" in ct else r.text
async def get(self, path: str, params: dict = None, **kw) -> Any:
return await self._request("GET", path, params=params, **kw)
async def post(self, path: str, payload: dict = None, **kw) -> Any:
return await self._request("POST", path, json=payload, **kw)
async def put(self, path: str, payload: dict = None, **kw) -> Any:
return await self._request("PUT", path, json=payload, **kw)
async def patch(self, path: str, payload: dict = None, **kw) -> Any:
return await self._request("PATCH", path, json=payload, **kw)
async def delete(self, path: str, **kw) -> Any:
return await self._request("DELETE", path, **kw)
async def paginate(self, path: str, page_key: str = "page",
results_key: str = "results") -> AsyncGenerator[Any, None]:
"""Async generator that follows pagination until empty results."""
page = 1
while True:
data = await self.get(path, params={page_key: page})
items = data.get(results_key, []) if isinstance(data, dict) else data
if not items:
break
for item in items:
yield item
page += 1
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import asyncio
print("httpx HTTP Client Demo")
print("=" * 50)
# Sync GET
print("\nSync GET (httpbin.org):")
try:
r = get_json("https://httpbin.org/get", params={"hello": "world"}, timeout=10)
print(f" URL: {r.get('url')}")
print(f" Args: {r.get('args')}")
except httpx.RequestError as e:
print(f" Connection error: {e}")
except httpx.HTTPStatusError as e:
print(f" HTTP error: {e.response.status_code}")
# Async parallel
async def demo_async():
print("\nAsync parallel requests (httpbin.org):")
reqs = [
{"method": "GET", "url": f"https://httpbin.org/status/200"},
{"method": "GET", "url": f"https://httpbin.org/json"},
{"method": "POST", "url": "https://httpbin.org/post", "json": {"n": 1}},
]
try:
results = await parallel_requests(reqs, max_concurrent=3)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f" [{i}] Error: {r}")
else:
print(f" [{i}] OK — keys: {list(r.keys())[:3]}")
except Exception as e:
print(f" Could not reach httpbin.org: {e}")
asyncio.run(demo_async())
print("\nDone. Use ApiClient for a production REST client pattern.")
For the requests alternative for synchronous HTTP — requests lacks native async support, requires manual connection pooling, and has no built-in HTTP/2, while httpx’s AsyncClient with asyncio.gather runs hundreds of independent API calls concurrently without threads, the auth_flow generator pattern handles token refresh inside the auth object without tangling retry logic with business code, and the requests-compatible API makes migration a single import swap. For the aiohttp alternative for async HTTP — aiohttp requires more boilerplate (ClientSession, custom connector setup) and its API differs significantly from requests while httpx’s AsyncClient mirrors the sync Client exactly, enabling the same test helper to drive both sync and async endpoints, and AsyncHTTPTransport(retries=3) adds automatic retry without a separate library. The Claude Skills 360 bundle includes httpx skill sets covering sync and async clients, BearerAuth and ApiKey custom auth, connection pool configuration, parallel requests with semaphores, file streaming with iter_bytes, NDJSON streaming, event hooks for logging, retry transport, and a reusable paginating ApiClient class. Start with the free tier to try HTTP client code generation.