pytest-asyncio runs async test functions. pip install pytest-asyncio. Mark: import pytest. @pytest.mark.asyncio async def test_fetch(): result = await fetch_data(); assert result. Auto mode: asyncio_mode = "auto" in pyproject.toml [tool.pytest.ini_options] — all async test functions are async automatically without the mark. Fixture: @pytest.fixture async def db(): pool = await create_pool(DSN); yield pool; await pool.close(). asyncio_mode = "auto" makes async fixtures work without extra marks. Event loop: default pytest-asyncio creates a new loop per test (function scope). Loop scope: @pytest.fixture(loop_scope="session") async def session_pool(): ... — share one pool across tests. asyncio_mode="auto" + loop_scope="session" — session-wide loop. uvloop: asyncio_mode = "auto"; event_loop_policy = "uvloop.EventLoopPolicy" in pyproject.toml (requires pip install uvloop). anyio: pip install anyio[trio] pytest-anyio. @pytest.mark.anyio async def test_fn() — runs on both asyncio and trio backends. @pytest.fixture(params=["asyncio","trio"]) def anyio_backend(request). AsyncMock: from unittest.mock import AsyncMock. mock_fn = AsyncMock(return_value={"id":1}). await mock_fn() → {"id":1}. mock_fn.assert_awaited_once_with(...). timeout: async def test_timeout(): with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(slow_coroutine(), timeout=1.0). Queue: q = asyncio.Queue(); await q.put(item); item = await q.get(). FastAPI: from httpx import AsyncClient; async with AsyncClient(app=app, base_url="http://test") as client: resp = await client.get("/"). Claude Code generates pytest-asyncio fixtures, async test suites, and FastAPI async client tests.
CLAUDE.md for pytest-asyncio
## pytest-asyncio Stack
- Version: pytest-asyncio >= 0.23 | pip install pytest-asyncio
- Config: asyncio_mode = "auto" in pyproject.toml — no marks needed
- Fixture: @pytest.fixture async def name(): ... — async setup/teardown
- Loop scope: @pytest.fixture(loop_scope="session") — share loop across tests
- Mock: AsyncMock(return_value=val) | assert_awaited_once_with(...)
- FastAPI: AsyncClient(app=app, base_url="http://test") inside async test
- Timeout: await asyncio.wait_for(coro, timeout=N) inside pytest.raises
pytest-asyncio Test Pipeline
# tests/test_async.py — pytest-asyncio patterns
# pyproject.toml:
# [tool.pytest.ini_options]
# asyncio_mode = "auto"
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ─────────────────────────────────────────────────────────────────────────────
# Code under test — async services
# ─────────────────────────────────────────────────────────────────────────────
class DatabasePool:
"""Minimal async DB pool simulator."""
def __init__(self) -> None:
self._store: dict[int, dict] = {
1: {"id": 1, "email": "[email protected]", "role": "admin"},
2: {"id": 2, "email": "[email protected]", "role": "user"},
}
self._next_id = 3
async def fetch_user(self, user_id: int) -> dict | None:
await asyncio.sleep(0) # simulate I/O
return self._store.get(user_id)
async def list_users(self) -> list[dict]:
await asyncio.sleep(0)
return list(self._store.values())
async def create_user(self, data: dict) -> dict:
await asyncio.sleep(0)
user = {**data, "id": self._next_id}
self._store[self._next_id] = user
self._next_id += 1
return user
async def close(self) -> None:
pass
class NotificationService:
async def send_email(self, to: str, subject: str, body: str) -> bool:
"""Sends an email — expensive, should be mocked in tests."""
await asyncio.sleep(0.1) # simulate network
return True
class UserService:
def __init__(self, db: DatabasePool, notifier: NotificationService) -> None:
self._db = db
self._notifier = notifier
async def get_user(self, user_id: int) -> dict | None:
return await self._db.fetch_user(user_id)
async def create_user(self, data: dict) -> dict:
user = await self._db.create_user(data)
await self._notifier.send_email(
to=user["email"],
subject="Welcome!",
body=f"Hi {user.get('first_name', 'there')}",
)
return user
async def list_users(self) -> list[dict]:
return await self._db.list_users()
class EventBus:
"""Async publish-subscribe using asyncio.Queue."""
def __init__(self) -> None:
self._queues: dict[str, list[asyncio.Queue]] = {}
def subscribe(self, topic: str) -> asyncio.Queue:
q: asyncio.Queue = asyncio.Queue()
self._queues.setdefault(topic, []).append(q)
return q
async def publish(self, topic: str, event: dict) -> None:
for q in self._queues.get(topic, []):
await q.put(event)
# ─────────────────────────────────────────────────────────────────────────────
# Async fixtures
# ─────────────────────────────────────────────────────────────────────────────
@pytest.fixture
async def db() -> AsyncGenerator[DatabasePool, None]:
"""
Async generator fixture — setup before yield, teardown after.
asyncio_mode="auto" makes this work without @pytest.mark.asyncio.
"""
pool = DatabasePool()
yield pool
await pool.close()
@pytest.fixture
def mock_notifier() -> AsyncMock:
"""
AsyncMock for NotificationService.send_email.
Returns True by default — configure per-test with .return_value or .side_effect.
"""
notifier = AsyncMock(spec=NotificationService)
notifier.send_email = AsyncMock(return_value=True)
return notifier
@pytest.fixture
async def user_service(db: DatabasePool, mock_notifier: AsyncMock) -> UserService:
return UserService(db=db, notifier=mock_notifier)
@pytest.fixture
def event_bus() -> EventBus:
return EventBus()
# ─────────────────────────────────────────────────────────────────────────────
# 1. Basic async tests — asyncio_mode="auto"
# ─────────────────────────────────────────────────────────────────────────────
class TestGetUser:
async def test_existing_user(self, user_service: UserService) -> None:
user = await user_service.get_user(1)
assert user is not None
assert user["email"] == "[email protected]"
async def test_missing_user_returns_none(self, user_service: UserService) -> None:
user = await user_service.get_user(999)
assert user is None
async def test_list_all_users(self, user_service: UserService) -> None:
users = await user_service.list_users()
assert len(users) == 2
emails = {u["email"] for u in users}
assert "[email protected]" in emails
# ─────────────────────────────────────────────────────────────────────────────
# 2. AsyncMock — mock coroutines and verify awaits
# ─────────────────────────────────────────────────────────────────────────────
class TestCreateUser:
async def test_creates_and_notifies(
self,
user_service: UserService,
mock_notifier: AsyncMock,
) -> None:
payload = {"email": "[email protected]", "first_name": "Carol", "role": "user"}
user = await user_service.create_user(payload)
# Verify created user
assert user["id"] == 3
assert user["email"] == "[email protected]"
# Verify notifier was awaited with correct args
mock_notifier.send_email.assert_awaited_once_with(
to="[email protected]",
subject="Welcome!",
body="Hi Carol",
)
async def test_notification_failure_propagates(
self,
user_service: UserService,
mock_notifier: AsyncMock,
) -> None:
"""If notifier raises, create_user should propagate the exception."""
mock_notifier.send_email.side_effect = ConnectionError("SMTP unavailable")
with pytest.raises(ConnectionError, match="SMTP unavailable"):
await user_service.create_user({"email": "[email protected]", "role": "user"})
async def test_notifier_not_called_on_db_failure(
self,
mock_notifier: AsyncMock,
) -> None:
"""DB failure should prevent notification."""
failing_db = AsyncMock(spec=DatabasePool)
failing_db.create_user.side_effect = RuntimeError("DB write failed")
service = UserService(db=failing_db, notifier=mock_notifier)
with pytest.raises(RuntimeError):
await service.create_user({"email": "[email protected]", "role": "user"})
mock_notifier.send_email.assert_not_awaited()
# ─────────────────────────────────────────────────────────────────────────────
# 3. asyncio.Queue / EventBus testing
# ─────────────────────────────────────────────────────────────────────────────
class TestEventBus:
async def test_publish_delivers_to_subscriber(self, event_bus: EventBus) -> None:
queue = event_bus.subscribe("orders")
event = {"type": "order_placed", "order_id": "ord-001"}
await event_bus.publish("orders", event)
received = await asyncio.wait_for(queue.get(), timeout=1.0)
assert received["order_id"] == "ord-001"
async def test_multiple_subscribers_receive_event(self, event_bus: EventBus) -> None:
q1 = event_bus.subscribe("orders")
q2 = event_bus.subscribe("orders")
await event_bus.publish("orders", {"type": "order_placed", "id": "x"})
e1 = await asyncio.wait_for(q1.get(), timeout=1.0)
e2 = await asyncio.wait_for(q2.get(), timeout=1.0)
assert e1["id"] == e2["id"] == "x"
async def test_no_delivery_to_different_topic(self, event_bus: EventBus) -> None:
queue = event_bus.subscribe("users")
await event_bus.publish("orders", {"type": "order_placed"})
assert queue.empty()
# ─────────────────────────────────────────────────────────────────────────────
# 4. Timeout testing
# ─────────────────────────────────────────────────────────────────────────────
class TestTimeouts:
async def test_slow_operation_times_out(self) -> None:
async def slow() -> None:
await asyncio.sleep(10)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow(), timeout=0.05)
async def test_fast_operation_completes(self) -> None:
async def fast() -> int:
await asyncio.sleep(0)
return 42
result = await asyncio.wait_for(fast(), timeout=1.0)
assert result == 42
# ─────────────────────────────────────────────────────────────────────────────
# 5. Concurrency — parallel tasks
# ─────────────────────────────────────────────────────────────────────────────
class TestConcurrency:
async def test_gather_multiple_coroutines(self, db: DatabasePool) -> None:
"""asyncio.gather runs coroutines concurrently — all results in order."""
users = await asyncio.gather(
db.fetch_user(1),
db.fetch_user(2),
db.fetch_user(999),
)
assert users[0]["email"] == "[email protected]"
assert users[1]["email"] == "[email protected]"
assert users[2] is None
async def test_task_cancellation(self) -> None:
async def long_running() -> None:
await asyncio.sleep(10)
task = asyncio.create_task(long_running())
await asyncio.sleep(0) # let task start
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# ─────────────────────────────────────────────────────────────────────────────
# 6. FastAPI async endpoint testing (httpx.AsyncClient)
# ─────────────────────────────────────────────────────────────────────────────
try:
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
demo_app = FastAPI()
@demo_app.get("/health")
async def health() -> dict:
return {"status": "ok"}
@demo_app.get("/users/{user_id}")
async def get_user_endpoint(user_id: int) -> dict:
if user_id == 999:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Not found")
return {"id": user_id, "email": "[email protected]"}
class TestFastAPIEndpoints:
async def test_health_check(self) -> None:
async with AsyncClient(
transport=ASGITransport(app=demo_app),
base_url="http://test",
) as client:
resp = await client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
async def test_get_existing_user(self) -> None:
async with AsyncClient(
transport=ASGITransport(app=demo_app),
base_url="http://test",
) as client:
resp = await client.get("/users/1")
assert resp.status_code == 200
assert resp.json()["id"] == 1
async def test_get_missing_user(self) -> None:
async with AsyncClient(
transport=ASGITransport(app=demo_app),
base_url="http://test",
) as client:
resp = await client.get("/users/999")
assert resp.status_code == 404
except ImportError:
pass # FastAPI not installed — skip these tests
For the asynctest alternative — asynctest was the go-to async testing library for Python 3.5–3.7 but is unmaintained, while pytest-asyncio is the current standard — asyncio_mode = "auto" in pyproject.toml makes every async def test_* function run on the event loop automatically, and unittest.mock.AsyncMock (added in Python 3.8) replaces asynctest.mock.CoroutineMock with a stdlib class that supports assert_awaited_once_with, assert_awaited_with, and call_args_list. For the trio / anyio alternative — anyio + pytest-anyio runs the same test suite on both asyncio and trio backends by parametrizing the anyio_backend fixture, which is useful for library authors who need to support both concurrency frameworks, while @pytest.mark.asyncio (pytest-asyncio) is simpler for application code that targets asyncio only — the two are not mutually exclusive and can coexist in the same test suite. The Claude Skills 360 bundle includes pytest-asyncio skill sets covering asyncio_mode=“auto” configuration, async fixtures with yield cleanup, loop_scope session for shared pools, AsyncMock assert_awaited/not_awaited, asyncio.Queue and asyncio.Event testing, asyncio.gather and task cancellation, asyncio.wait_for timeout assertions, FastAPI async endpoint testing with AsyncClient, conftest.py event_loop_policy for uvloop, and anyio backend parametrization. Start with the free tier to try async testing code generation.