Claude Code for httpx: Modern Async HTTP Client — Claude Skills 360 Blog
Blog / AI / Claude Code for httpx: Modern Async HTTP Client
AI

Claude Code for httpx: Modern Async HTTP Client

Published: November 22, 2027
Read time: 5 min read
By: Claude Skills 360

httpx is a modern, async-first HTTP client with a requests-compatible API. pip install httpx. import httpx. Sync: with httpx.Client(base_url="https://api.example.com", timeout=30) as client: r = client.get("/path"). Async: async with httpx.AsyncClient() as client: r = await client.get(url). Methods: client.get/post/put/patch/delete(url, params={}, json={}, headers={}, auth=...). Response: r.status_code, r.json(), r.text, r.content (bytes), r.headers. Raise: r.raise_for_status() — raises HTTPStatusError for 4xx/5xx. Auth: httpx.BasicAuth("user","pass"), custom: class MyAuth(httpx.Auth): def auth_flow(self, r): yield r.copy_with(headers={"X-Token":"..."}). Timeout: httpx.Timeout(connect=5, read=30, write=10, pool=5). Limits: httpx.Limits(max_connections=100, max_keepalive_connections=20). Follow redirects: client.get(url, follow_redirects=True). Streaming: with client.stream("GET", url) as r: for chunk in r.iter_bytes(1024): .... Async stream: async for line in r.aiter_lines(): .... Event hooks: client = httpx.Client(event_hooks={"request":[log_req], "response":[log_resp]}). Retry: from httpx import AsyncHTTPTransport, transport = AsyncHTTPTransport(retries=3). Proxy: client = httpx.AsyncClient(proxies="socks5://proxy:1080"). HTTP/2: httpx.AsyncClient(http2=True). Mock: with respx.mock: respx.get("https://api.example.com/data").mock(return_value=httpx.Response(200, json={"key":"val"})). Claude Code generates httpx API clients, async scrapers, retry wrappers, and mock-based test suites.

CLAUDE.md for httpx

## httpx Stack
- Version: httpx >= 0.27
- Sync: httpx.Client(base_url, timeout, auth, limits) as context manager
- Async: httpx.AsyncClient(...) — await client.get/post/put/patch/delete(url)
- Response: r.raise_for_status() | r.json() | r.text | r.content | r.headers
- Auth: BasicAuth | BearerAuth | custom Auth subclass with auth_flow generator
- Timeout: httpx.Timeout(connect=5, read=30) — always set explicit timeouts
- Retry: AsyncHTTPTransport(retries=N) | wrap with tenacity for backoff
- Test: respx.mock for mocking httpx requests in unit tests

httpx HTTP Client Pipeline

# net/httpx_pipeline.py — modern async HTTP client with httpx
from __future__ import annotations
import asyncio
import json
import logging
import time
from typing import Any, AsyncGenerator, Generator, Callable
from collections.abc import Iterator

import httpx

logger = logging.getLogger(__name__)


# ── 1. Authentication helpers ─────────────────────────────────────────────────

class BearerAuth(httpx.Auth):
    """
    Bearer token authentication — adds Authorization: Bearer <token> header.
    Supports token refresh via a callable (called when 401 is received).
    """
    def __init__(self, token: str, refresh_fn: Callable | None = None):
        self.token      = token
        self.refresh_fn = refresh_fn

    def auth_flow(self, request: httpx.Request) -> Generator:
        request.headers["Authorization"] = f"Bearer {self.token}"
        response = yield request

        if response.status_code == 401 and self.refresh_fn:
            self.token = self.refresh_fn()
            request.headers["Authorization"] = f"Bearer {self.token}"
            yield request


class ApiKeyAuth(httpx.Auth):
    """API key in a custom header or query param."""
    def __init__(self, api_key: str, header: str = "X-API-Key"):
        self.api_key = api_key
        self.header  = header

    def auth_flow(self, request: httpx.Request) -> Generator:
        request.headers[self.header] = self.api_key
        yield request


# ── 2. Sync client ────────────────────────────────────────────────────────────

def make_sync_client(
    base_url:     str = "",
    timeout:      float | httpx.Timeout = 30.0,
    auth:         httpx.Auth | None = None,
    headers:      dict = None,
    max_connections: int = 20,
    retries:      int = 0,
    follow_redirects: bool = True,
    verify:       bool = True,
) -> httpx.Client:
    """
    Create a configured httpx.Client (sync).
    Use as context manager: `with make_sync_client(...) as client:`
    """
    transport = httpx.HTTPTransport(retries=retries)
    limits    = httpx.Limits(
        max_connections=max_connections,
        max_keepalive_connections=max(5, max_connections // 4),
    )
    if isinstance(timeout, (int, float)):
        timeout = httpx.Timeout(connect=10, read=timeout, write=timeout, pool=5)

    return httpx.Client(
        base_url=base_url,
        timeout=timeout,
        auth=auth,
        headers=headers or {},
        transport=transport,
        limits=limits,
        follow_redirects=follow_redirects,
        verify=verify,
    )


def get_json(
    url:     str,
    params:  dict = None,
    headers: dict = None,
    auth:    httpx.Auth | None = None,
    timeout: float = 30.0,
) -> dict:
    """One-shot GET that returns parsed JSON. Raises on HTTP errors."""
    with httpx.Client(timeout=timeout) as client:
        r = client.get(url, params=params, headers=headers, auth=auth)
        r.raise_for_status()
        return r.json()


def post_json(
    url:     str,
    payload: dict,
    headers: dict = None,
    auth:    httpx.Auth | None = None,
    timeout: float = 30.0,
) -> dict:
    """One-shot POST with JSON body. Returns parsed response JSON."""
    with httpx.Client(timeout=timeout) as client:
        r = client.post(url, json=payload, headers=headers, auth=auth)
        r.raise_for_status()
        return r.json()


# ── 3. Async client ───────────────────────────────────────────────────────────

def make_async_client(
    base_url:     str = "",
    timeout:      float | httpx.Timeout = 30.0,
    auth:         httpx.Auth | None = None,
    headers:      dict = None,
    max_connections: int = 100,
    retries:      int = 3,
    follow_redirects: bool = True,
    http2:        bool = False,
) -> httpx.AsyncClient:
    """
    Create configured httpx.AsyncClient.
    Use as: `async with make_async_client(...) as client:`
    """
    transport = httpx.AsyncHTTPTransport(retries=retries, http2=http2)
    limits    = httpx.Limits(
        max_connections=max_connections,
        max_keepalive_connections=max(10, max_connections // 5),
        keepalive_expiry=30,
    )
    if isinstance(timeout, (int, float)):
        timeout = httpx.Timeout(connect=10, read=timeout, write=timeout, pool=5)

    return httpx.AsyncClient(
        base_url=base_url,
        timeout=timeout,
        auth=auth,
        headers=headers or {},
        transport=transport,
        limits=limits,
        follow_redirects=follow_redirects,
    )


async def async_get_json(
    url:     str,
    params:  dict = None,
    headers: dict = None,
    auth:    httpx.Auth | None = None,
) -> dict:
    """Async one-shot GET returning JSON."""
    async with httpx.AsyncClient(timeout=30) as client:
        r = await client.get(url, params=params, headers=headers, auth=auth)
        r.raise_for_status()
        return r.json()


async def parallel_requests(
    requests: list[dict],
    max_concurrent: int = 10,
) -> list[dict | Exception]:
    """
    Execute many requests concurrently with a semaphore limit.
    requests: [{"method": "GET", "url": "...", "params": {}, ...}, ...]
    Returns list of json results or exceptions in the same order.
    """
    semaphore = asyncio.Semaphore(max_concurrent)

    async def _one(client: httpx.AsyncClient, req: dict) -> dict | Exception:
        async with semaphore:
            try:
                method = req.pop("method", "GET").upper()
                r = await client.request(method, **req)
                r.raise_for_status()
                return r.json()
            except Exception as e:
                return e

    async with make_async_client(max_connections=max_concurrent) as client:
        tasks = [_one(client, dict(req)) for req in requests]
        return await asyncio.gather(*tasks)


# ── 4. Streaming ──────────────────────────────────────────────────────────────

def stream_download(
    url:        str,
    dest_path:  str,
    chunk_size: int = 65_536,   # 64 KB
    headers:    dict = None,
) -> int:
    """
    Stream large file download to disk.
    Returns total bytes written.
    """
    total = 0
    with httpx.Client(timeout=httpx.Timeout(connect=10, read=None)) as client:
        with client.stream("GET", url, headers=headers or {}, follow_redirects=True) as r:
            r.raise_for_status()
            with open(dest_path, "wb") as f:
                for chunk in r.iter_bytes(chunk_size):
                    f.write(chunk)
                    total += len(chunk)
    return total


async def stream_ndjson(
    url:    str,
    client: httpx.AsyncClient = None,
) -> AsyncGenerator[dict, None]:
    """
    Async generator that yields parsed JSON objects from an NDJSON stream.
    Useful for Server-Sent Events or streaming API responses.
    """
    owned = client is None
    if owned:
        client = make_async_client(timeout=httpx.Timeout(connect=10, read=None))

    try:
        async with client.stream("GET", url) as r:
            r.raise_for_status()
            async for line in r.aiter_lines():
                line = line.strip()
                if line.startswith("data: "):
                    line = line[6:]
                if line and line != "[DONE]":
                    try:
                        yield json.loads(line)
                    except json.JSONDecodeError:
                        pass
    finally:
        if owned:
            await client.aclose()


# ── 5. Event hooks (logging, timing) ─────────────────────────────────────────

def make_logging_client(
    base_url: str = "",
    level:    int = logging.DEBUG,
    **kwargs,
) -> httpx.Client:
    """
    Create a sync client that logs all requests and response timing.
    """
    def _log_request(request: httpx.Request) -> None:
        logger.log(level, "→ %s %s", request.method, request.url)

    def _log_response(response: httpx.Response) -> None:
        elapsed = response.elapsed.total_seconds() * 1000
        logger.log(level, "← %s %s %.1f ms",
                   response.status_code, response.url, elapsed)

    return httpx.Client(
        base_url=base_url,
        event_hooks={"request": [_log_request], "response": [_log_response]},
        **kwargs,
    )


# ── 6. REST API client pattern ────────────────────────────────────────────────

class ApiClient:
    """
    Reusable async API client with token auth, base URL, and error handling.
    Usage:
        async with ApiClient("https://api.example.com", token="...") as api:
            data = await api.get("/users", params={"page": 1})
    """

    def __init__(
        self,
        base_url:    str,
        token:       str = "",
        api_key:     str = "",
        timeout:     float = 30.0,
        retries:     int   = 3,
        rate_limit:  int   = 50,   # requests per second (soft cap)
    ):
        self._base_url   = base_url.rstrip("/")
        self._auth       = BearerAuth(token) if token else (ApiKeyAuth(api_key) if api_key else None)
        self._timeout    = timeout
        self._retries    = retries
        self._semaphore  = asyncio.Semaphore(rate_limit)
        self._client: httpx.AsyncClient | None = None

    async def __aenter__(self):
        self._client = make_async_client(
            base_url=self._base_url,
            timeout=self._timeout,
            auth=self._auth,
            retries=self._retries,
        )
        return self

    async def __aexit__(self, *_):
        if self._client:
            await self._client.aclose()

    async def _request(self, method: str, path: str, **kwargs) -> Any:
        async with self._semaphore:
            r = await self._client.request(method, path, **kwargs)
            r.raise_for_status()
            ct = r.headers.get("content-type", "")
            return r.json() if "json" in ct else r.text

    async def get(self, path: str, params: dict = None, **kw) -> Any:
        return await self._request("GET", path, params=params, **kw)

    async def post(self, path: str, payload: dict = None, **kw) -> Any:
        return await self._request("POST", path, json=payload, **kw)

    async def put(self, path: str, payload: dict = None, **kw) -> Any:
        return await self._request("PUT", path, json=payload, **kw)

    async def patch(self, path: str, payload: dict = None, **kw) -> Any:
        return await self._request("PATCH", path, json=payload, **kw)

    async def delete(self, path: str, **kw) -> Any:
        return await self._request("DELETE", path, **kw)

    async def paginate(self, path: str, page_key: str = "page",
                       results_key: str = "results") -> AsyncGenerator[Any, None]:
        """Async generator that follows pagination until empty results."""
        page = 1
        while True:
            data = await self.get(path, params={page_key: page})
            items = data.get(results_key, []) if isinstance(data, dict) else data
            if not items:
                break
            for item in items:
                yield item
            page += 1


# ── Demo ──────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import asyncio

    print("httpx HTTP Client Demo")
    print("=" * 50)

    # Sync GET
    print("\nSync GET (httpbin.org):")
    try:
        r = get_json("https://httpbin.org/get", params={"hello": "world"}, timeout=10)
        print(f"  URL: {r.get('url')}")
        print(f"  Args: {r.get('args')}")
    except httpx.RequestError as e:
        print(f"  Connection error: {e}")
    except httpx.HTTPStatusError as e:
        print(f"  HTTP error: {e.response.status_code}")

    # Async parallel
    async def demo_async():
        print("\nAsync parallel requests (httpbin.org):")
        reqs = [
            {"method": "GET", "url": f"https://httpbin.org/status/200"},
            {"method": "GET", "url": f"https://httpbin.org/json"},
            {"method": "POST", "url": "https://httpbin.org/post", "json": {"n": 1}},
        ]
        try:
            results = await parallel_requests(reqs, max_concurrent=3)
            for i, r in enumerate(results):
                if isinstance(r, Exception):
                    print(f"  [{i}] Error: {r}")
                else:
                    print(f"  [{i}] OK — keys: {list(r.keys())[:3]}")
        except Exception as e:
            print(f"  Could not reach httpbin.org: {e}")

    asyncio.run(demo_async())
    print("\nDone. Use ApiClient for a production REST client pattern.")

For the requests alternative for synchronous HTTP — requests lacks native async support, requires manual connection pooling, and has no built-in HTTP/2, while httpx’s AsyncClient with asyncio.gather runs hundreds of independent API calls concurrently without threads, the auth_flow generator pattern handles token refresh inside the auth object without tangling retry logic with business code, and the requests-compatible API makes migration a single import swap. For the aiohttp alternative for async HTTP — aiohttp requires more boilerplate (ClientSession, custom connector setup) and its API differs significantly from requests while httpx’s AsyncClient mirrors the sync Client exactly, enabling the same test helper to drive both sync and async endpoints, and AsyncHTTPTransport(retries=3) adds automatic retry without a separate library. The Claude Skills 360 bundle includes httpx skill sets covering sync and async clients, BearerAuth and ApiKey custom auth, connection pool configuration, parallel requests with semaphores, file streaming with iter_bytes, NDJSON streaming, event hooks for logging, retry transport, and a reusable paginating ApiClient class. Start with the free tier to try HTTP client code generation.

Keep Reading

AI

Claude Code for email.contentmanager: Python Email Content Accessors

Read and write EmailMessage body content with Python's email.contentmanager module and Claude Code — email contentmanager ContentManager for the class that maps content types to get and set handler functions allowing EmailMessage to support get_content and set_content with type-specific behaviour, email contentmanager raw_data_manager for the ContentManager instance that handles raw bytes and str payloads without any conversion, email contentmanager content_manager for the standard ContentManager instance used by email.policy.default that intelligently handles text plain text html multipart and binary content types, email contentmanager get_content_text for the handler that returns the decoded text payload of a text-star message part as a str, email contentmanager get_content_binary for the handler that returns the raw decoded bytes payload of a non-text message part, email contentmanager get_data_manager for the get-handler lookup used by EmailMessage get_content to find the right reader function for the content type, email contentmanager set_content text for the handler that creates and sets a text part correctly choosing charset and transfer encoding, email contentmanager set_content bytes for the handler that creates and sets a binary part with base64 encoding and optional filename Content-Disposition, email contentmanager EmailMessage get_content for the method that reads the message body using the registered content manager handlers, email contentmanager EmailMessage set_content for the method that sets the message body and MIME headers in one call, email contentmanager EmailMessage make_alternative make_mixed make_related for the methods that convert a simple message into a multipart container, email contentmanager EmailMessage add_attachment for the method that attaches a file or bytes to a multipart message, and email contentmanager integration with email.message and email.policy and email.mime and io for building high-level email readers attachment extractors text body accessors HTML readers and policy-aware MIME construction pipelines.

5 min read Feb 12, 2029
AI

Claude Code for email.charset: Python Email Charset Encoding

Control header and body encoding for international email with Python's email.charset module and Claude Code — email charset Charset for the class that wraps a character set name with the encoding rules for header encoding and body encoding describing how to encode text for that charset in email messages, email charset Charset header_encoding for the attribute specifying whether headers using this charset should use QP quoted-printable encoding BASE64 encoding or no encoding, email charset Charset body_encoding for the attribute specifying the Content-Transfer-Encoding to use for message bodies in this charset such as QP or BASE64, email charset Charset output_codec for the attribute giving the Python codec name used to encode the string to bytes for the wire format, email charset Charset input_codec for the attribute giving the Python codec name used to decode incoming bytes to str, email charset Charset get_output_charset for returning the output charset name, email charset Charset header_encode for encoding a header string using the charset's header_encoding method, email charset Charset body_encode for encoding body content using the charset's body_encoding, email charset Charset convert for converting a string from the input_codec to the output_codec, email charset add_charset for registering a new charset with custom encoding rules in the global charset registry, email charset add_alias for adding an alias name that maps to an existing registered charset, email charset add_codec for registering a codec name mapping for use by the charset machinery, and email charset integration with email.message and email.mime and email.policy and email.encoders for building international email senders non-ASCII header encoders Content-Transfer-Encoding selectors charset-aware message constructors and MIME encoding pipelines.

5 min read Feb 11, 2029
AI

Claude Code for email.utils: Python Email Address and Header Utilities

Parse and format RFC 2822 email addresses and dates with Python's email.utils module and Claude Code — email utils parseaddr for splitting a display-name plus angle-bracket address string into a realname and email address tuple, email utils formataddr for combining a realname and address string into a properly quoted RFC 2822 address with angle brackets, email utils getaddresses for parsing a list of raw address header strings each potentially containing multiple comma-separated addresses into a list of realname address tuples, email utils parsedate for parsing an RFC 2822 date string into a nine-tuple compatible with time.mktime, email utils parsedate_tz for parsing an RFC 2822 date string into a ten-tuple that includes the UTC offset timezone in seconds, email utils parsedate_to_datetime for parsing an RFC 2822 date string into an aware datetime object with timezone, email utils formatdate for formatting a POSIX timestamp or the current time as an RFC 2822 date string with optional usegmt and localtime flags, email utils format_datetime for formatting a datetime object as an RFC 2822 date string, email utils make_msgid for generating a globally unique Message-ID string with optional idstring and domain components, email utils decode_rfc2231 for decoding an RFC 2231 encoded parameter value into a tuple of charset language and value, email utils encode_rfc2231 for encoding a string as an RFC 2231 encoded parameter value, email utils collapse_rfc2231_value for collapsing a decoded RFC 2231 tuple to a Unicode string, and email utils integration with email.message and email.headerregistry and datetime and time for building address parsers date formatters message-id generators header extractors and RFC-compliant email construction utilities.

5 min read Feb 10, 2029

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free