aiohttp provides async HTTP client and server built on asyncio. pip install aiohttp. import aiohttp. Client: async with aiohttp.ClientSession() as session: async with session.get(url) as r: data = await r.json(). Methods: session.get/post/put/patch/delete(url, params={}, json={}, headers={}, ssl=True). Response: await r.json(), await r.text(), r.status, r.headers, r.content_length. Raise: r.raise_for_status(). Timeout: timeout = aiohttp.ClientTimeout(total=30, connect=5, sock_read=25). Connector: conn = aiohttp.TCPConnector(limit=100, limit_per_host=10, keepalive_timeout=30). Session with retry: from aiohttp_retry import RetryClient, ExponentialRetry, retry_options = ExponentialRetry(attempts=3, start_timeout=0.5). Stream download: async for chunk in r.content.iter_chunked(8192): f.write(chunk). Auth: auth = aiohttp.BasicAuth("user","pass"). Headers: ClientSession(headers={"Authorization":"Bearer token"}). Server: app = aiohttp.web.Application(), app.router.add_get("/path", handler). Handler: async def handler(request): return web.json_response({"ok": True}). JSON body: body = await request.json(). Path params: request.match_info["id"]. Query: request.rel_url.query["page"]. Middleware: @web.middleware async def cors(request, handler): r = await handler(request); r.headers["Access-Control-Allow-Origin"]="*"; return r. WebSocket: async with session.ws_connect(url) as ws: await ws.send_str("ping"), msg = await ws.receive(). Run: aiohttp.web.run_app(app, port=8080). Claude Code generates aiohttp async API clients, microservices, WebSocket servers, and proxy middleware.
CLAUDE.md for aiohttp
## aiohttp Stack
- Version: aiohttp >= 3.9
- Client: async with aiohttp.ClientSession(connector, timeout, headers) as session
- Request: await session.get/post/put/delete(url, params, json, ssl)
- Response: r.status | await r.json() | await r.text() | r.raise_for_status()
- Connector: TCPConnector(limit=100, limit_per_host=10, keepalive_timeout=30)
- Server: web.Application() | app.router.add_get/post/route(path, handler)
- Handler: async def h(request: web.Request) -> web.Response
- Run: web.run_app(app, port=8080) | AppRunner + TCPSite for embedding
aiohttp Async HTTP Pipeline
# net/aiohttp_pipeline.py — async HTTP client and server with aiohttp
from __future__ import annotations
import asyncio
import json
import logging
import time
from typing import Any, AsyncGenerator, Callable
import aiohttp
from aiohttp import web, ClientTimeout, TCPConnector, BasicAuth
logger = logging.getLogger(__name__)
# ── 1. Client session factory ─────────────────────────────────────────────────
def make_session(
base_url: str = "",
total_timeout: float = 30.0,
connect_timeout: float = 10.0,
headers: dict = None,
auth: BasicAuth | None = None,
max_connections: int = 100,
max_per_host: int = 10,
keepalive_timeout: float = 30.0,
) -> aiohttp.ClientSession:
"""
Create a configured aiohttp.ClientSession.
Use as: `async with make_session(...) as session:`
Always close the session — memory leaks if left unclosed.
"""
connector = TCPConnector(
limit=max_connections,
limit_per_host=max_per_host,
keepalive_timeout=keepalive_timeout,
ssl=True,
)
timeout = ClientTimeout(
total=total_timeout,
connect=connect_timeout,
sock_read=total_timeout - connect_timeout,
)
return aiohttp.ClientSession(
base_url=base_url or None,
connector=connector,
timeout=timeout,
headers=headers or {},
auth=auth,
raise_for_status=True, # Auto-raises on 4xx/5xx
)
# ── 2. HTTP client helpers ────────────────────────────────────────────────────
async def fetch_json(
url: str,
params: dict = None,
headers: dict = None,
session: aiohttp.ClientSession | None = None,
) -> Any:
"""Async GET that returns parsed JSON."""
owned = session is None
if owned:
session = make_session()
try:
async with session.get(url, params=params, headers=headers) as r:
return await r.json()
finally:
if owned:
await session.close()
async def post_json(
url: str,
payload: dict,
headers: dict = None,
session: aiohttp.ClientSession | None = None,
) -> Any:
"""Async POST with JSON body."""
owned = session is None
if owned:
session = make_session()
try:
async with session.post(url, json=payload, headers=headers) as r:
return await r.json()
finally:
if owned:
await session.close()
async def parallel_get(
urls: list[str],
params_list: list[dict] = None,
max_concurrent: int = 10,
session: aiohttp.ClientSession | None = None,
) -> list[Any | Exception]:
"""
Fetch many URLs concurrently with a semaphore limit.
Returns results in the same order as input — exceptions kept in-band.
"""
semaphore = asyncio.Semaphore(max_concurrent)
params_list = params_list or [None] * len(urls)
async def _one(session_: aiohttp.ClientSession, url: str, params: dict) -> Any | Exception:
async with semaphore:
try:
async with session_.get(url, params=params) as r:
ct = r.headers.get("Content-Type", "")
return await r.json() if "json" in ct else await r.text()
except Exception as e:
return e
owned = session is None
if owned:
session = make_session(max_connections=max_concurrent)
try:
tasks = [_one(session, url, p) for url, p in zip(urls, params_list)]
return await asyncio.gather(*tasks)
finally:
if owned:
await session.close()
async def stream_download(
url: str,
dest_path: str,
chunk_size: int = 65_536,
session: aiohttp.ClientSession | None = None,
) -> int:
"""Stream large file download to disk. Returns total bytes written."""
owned = session is None
if owned:
session = make_session(total_timeout=None, connect_timeout=10)
total = 0
try:
async with session.get(url) as r:
with open(dest_path, "wb") as f:
async for chunk in r.content.iter_chunked(chunk_size):
f.write(chunk)
total += len(chunk)
finally:
if owned:
await session.close()
return total
async def stream_ndjson(
url: str,
session: aiohttp.ClientSession | None = None,
) -> AsyncGenerator[dict, None]:
"""Yield JSON objects from a newline-delimited JSON stream (SSE or NDJSON)."""
owned = session is None
if owned:
session = make_session(total_timeout=None)
try:
async with session.get(url) as r:
async for line in r.content:
line = line.decode().strip()
if line.startswith("data: "):
line = line[6:]
if line and line != "[DONE]":
try:
yield json.loads(line)
except json.JSONDecodeError:
pass
finally:
if owned:
await session.close()
# ── 3. WebSocket client ───────────────────────────────────────────────────────
async def websocket_client(
url: str,
messages: list[str],
session: aiohttp.ClientSession | None = None,
) -> list[str]:
"""
Connect to a WebSocket, send messages, collect responses.
Returns list of response message strings.
"""
owned = session is None
if owned:
session = make_session()
responses = []
try:
async with session.ws_connect(url) as ws:
for msg in messages:
await ws.send_str(msg)
reply = await ws.receive(timeout=10)
if reply.type == aiohttp.WSMsgType.TEXT:
responses.append(reply.data)
elif reply.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
finally:
if owned:
await session.close()
return responses
# ── 4. Web server ─────────────────────────────────────────────────────────────
def create_app(
routes: list[tuple],
middlewares: list[Callable] = None,
) -> web.Application:
"""
Create an aiohttp web Application.
routes: [(method, path, handler), ...]
middlewares: list of @web.middleware coroutine functions
"""
app = web.Application(middlewares=middlewares or [])
for method, path, handler in routes:
app.router.add_route(method.upper(), path, handler)
return app
# ── 5. Handler helpers ────────────────────────────────────────────────────────
async def json_handler_example(request: web.Request) -> web.Response:
"""Example handler: parse body and return JSON response."""
try:
body = await request.json()
except json.JSONDecodeError:
raise web.HTTPBadRequest(reason="Invalid JSON body")
result = {"received": body, "timestamp": time.time()}
return web.json_response(result, status=200)
async def path_param_handler(request: web.Request) -> web.Response:
"""Example: extract path parameter with input validation."""
try:
item_id = int(request.match_info["id"])
except (KeyError, ValueError):
raise web.HTTPBadRequest(reason="id must be an integer")
page = int(request.rel_url.query.get("page", "1"))
return web.json_response({"id": item_id, "page": page})
async def streaming_handler(request: web.Request) -> web.StreamResponse:
"""Server-Sent Events streaming handler."""
response = web.StreamResponse(
status=200,
reason="OK",
headers={"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"},
)
await response.prepare(request)
for i in range(10):
data = json.dumps({"chunk": i, "ts": time.time()})
await response.write(f"data: {data}\n\n".encode())
await asyncio.sleep(0.5)
return response
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
"""Echo WebSocket server handler."""
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws.send_str(f"echo: {msg.data}")
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error("WebSocket error: %s", ws.exception())
break
return ws
# ── 6. Middleware ─────────────────────────────────────────────────────────────
@web.middleware
async def cors_middleware(request: web.Request, handler: Callable) -> web.Response:
"""Add CORS headers to all responses."""
if request.method == "OPTIONS":
return web.Response(headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET,POST,PUT,DELETE,OPTIONS",
"Access-Control-Allow-Headers": "Content-Type,Authorization",
})
response = await handler(request)
response.headers["Access-Control-Allow-Origin"] = "*"
return response
@web.middleware
async def request_logger_middleware(request: web.Request, handler: Callable) -> web.Response:
"""Log request timing and status code."""
t0 = time.perf_counter()
try:
response = await handler(request)
except web.HTTPException as e:
elapsed = (time.perf_counter() - t0) * 1000
logger.info("%s %s → %d %.1f ms", request.method, request.path, e.status_code, elapsed)
raise
elapsed = (time.perf_counter() - t0) * 1000
logger.info("%s %s → %d %.1f ms", request.method, request.path, response.status, elapsed)
return response
@web.middleware
async def error_handler_middleware(request: web.Request, handler: Callable) -> web.Response:
"""Convert unhandled exceptions to JSON error responses."""
try:
return await handler(request)
except web.HTTPException:
raise
except Exception as e:
logger.exception("Unhandled error for %s %s", request.method, request.path)
return web.json_response({"error": str(e)}, status=500)
# ── 7. Server lifecycle ───────────────────────────────────────────────────────
async def run_server_in_background(app: web.Application, port: int = 8080) -> web.AppRunner:
"""
Start the server in the background (useful for embedding in larger async apps).
Call runner.cleanup() to stop.
"""
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
print(f"Server started on http://0.0.0.0:{port}")
return runner
# ── Demo ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
async def main():
print("aiohttp Async HTTP Demo")
print("=" * 50)
# Parallel HTTP requests
print("\nParallel GET requests (httpbin.org):")
urls = [
"https://httpbin.org/get",
"https://httpbin.org/json",
"https://httpbin.org/status/200",
]
try:
results = await parallel_get(urls, max_concurrent=3)
for url, r in zip(urls, results):
if isinstance(r, Exception):
print(f" ERROR {url}: {r}")
elif isinstance(r, dict):
print(f" OK {url}: keys={list(r.keys())[:3]}")
else:
print(f" OK {url}: status response")
except Exception as e:
print(f" Could not reach httpbin.org: {e}")
# Server demo
print("\nStarting echo server on port 8765...")
routes = [
("GET", "/", json_handler_example),
("GET", "/{id}", path_param_handler),
("GET", "/ws", websocket_handler),
]
app = create_app(routes, middlewares=[cors_middleware, request_logger_middleware])
runner = await run_server_in_background(app, port=8765)
# Test the server
try:
async with make_session("http://localhost:8765") as client:
async with client.get("/42?page=3") as r:
data = await r.json()
print(f"Test request: {data}")
except Exception as e:
print(f"Server test: {e}")
finally:
await runner.cleanup()
print("Server stopped")
asyncio.run(main())
For the requests + Flask alternative for simple HTTP work — requests/Flask mix synchronous and async code while aiohttp’s ClientSession with asyncio.gather sends hundreds of API calls concurrently without threads, the TCPConnector with limit_per_host controls connection pressure to third-party APIs, and the same aiohttp.web.Application handles both REST endpoints and WebSocket connections without adding a separate WebSocket library. For the FastAPI alternative for REST APIs — FastAPI wraps starlette/uvicorn and adds OpenAPI schema generation while aiohttp’s web.Application with AppRunner starts in-process in two lines with zero additional dependencies, @web.middleware for CORS and logging adds cross-cutting concerns without decorating every handler, and streaming responses via StreamResponse and SSE are first-class patterns without plugin libraries. The Claude Skills 360 bundle includes aiohttp skill sets covering ClientSession with TCPConnector, parallel GET with semaphores, streaming file download, NDJSON streaming, WebSocket client, web Application routing, JSON and streaming handlers, CORS and logger middleware, error handler middleware, and AppRunner in-process server. Start with the free tier to try async HTTP code generation.