tRPC subscriptions extend the end-to-end type-safe RPC model to real-time data streams. The server exports typed subscription procedures that push events via async generators. The client subscribes with api.procedure.useSubscription(), and TypeScript validates both the input and received event types. No separate WebSocket message type management or manual schema synchronization. Claude Code generates tRPC routers with subscriptions, WebSocket server setup, React hooks for real-time data, and the authentication middleware for secure subscription connections.
CLAUDE.md for tRPC Subscriptions
## tRPC Subscription Stack
- Version: @trpc/server 11.x, @trpc/client 11.x, @trpc/react-query 11.x
- Transport: WebSocket (ws package) for subscriptions, HTTP for queries/mutations
- Auth: wss subscription context from cookie/bearer token
- Events: observable from @trpc/server for async generators
- Testing: vitest with custom WebSocket mock
- Next.js: App Router with tRPC on Vercel Edge or Node.js runtime
- React Query: v5 with tRPC adapter — useSuspenseQuery preferred
Router with Queries, Mutations, and Subscriptions
// server/routers/orders.ts
import { z } from 'zod'
import { router, protectedProcedure } from '../trpc'
import { observable } from '@trpc/server/observable'
import { EventEmitter } from 'events'
import { db } from '../db'
// In-memory event emitter — replace with Redis pub/sub in production
const orderEvents = new EventEmitter()
orderEvents.setMaxListeners(100)
interface OrderEvent {
type: 'CREATED' | 'STATUS_CHANGED' | 'CANCELLED'
order: Order
userId: string
}
export const ordersRouter = router({
// Query — fetches current state
list: protectedProcedure
.input(z.object({
status: z.string().optional(),
cursor: z.string().optional(),
limit: z.number().int().min(1).max(100).default(20),
}))
.query(async ({ input, ctx }) => {
const orders = await db.orders.findMany({
where: {
userId: ctx.user.id,
...(input.status && { status: input.status }),
...(input.cursor && { id: { gt: input.cursor } }),
},
take: input.limit + 1,
orderBy: { createdAt: 'desc' },
})
const hasMore = orders.length > input.limit
return {
items: orders.slice(0, input.limit),
nextCursor: hasMore ? orders[input.limit - 1].id : null,
}
}),
// Mutation — creates an order
create: protectedProcedure
.input(z.object({
items: z.array(z.object({
productId: z.string(),
quantity: z.number().int().positive(),
})),
}))
.mutation(async ({ input, ctx }) => {
const order = await db.orders.create({
data: {
userId: ctx.user.id,
items: { create: input.items },
status: 'PENDING',
},
})
// Emit event to all subscribers
orderEvents.emit('order', {
type: 'CREATED',
order,
userId: ctx.user.id,
} satisfies OrderEvent)
return order
}),
// Subscription — streams order events to this user
onOrderUpdate: protectedProcedure
.input(z.object({
orderIds: z.array(z.string()).optional(), // Filter to specific orders
}))
.subscription(({ input, ctx }) => {
return observable<OrderEvent>(emit => {
const handler = (event: OrderEvent) => {
// Security check: only emit events for this user's orders
if (event.userId !== ctx.user.id) return
// Optional: filter by specific order IDs
if (input.orderIds && !input.orderIds.includes(event.order.id)) return
emit.next(event)
}
orderEvents.on('order', handler)
// Cleanup when client disconnects
return () => {
orderEvents.off('order', handler)
}
})
}),
// Subscription with cursor — resumes from last seen event
realtimeFeed: protectedProcedure
.input(z.object({
tenantId: z.string(),
since: z.date().optional(),
}))
.subscription(async function* ({ input, ctx }) {
// Async generator style (tRPC 11+)
const queue: Order[] = []
// First, replay missed events since cursor
if (input.since) {
const missed = await db.orders.findMany({
where: { tenantId: input.tenantId, createdAt: { gt: input.since } },
orderBy: { createdAt: 'asc' },
})
queue.push(...missed)
}
// Then listen for new events
const listener = (event: OrderEvent) => {
queue.push(event.order)
}
orderEvents.on('order', listener)
try {
while (true) {
if (queue.length > 0) {
yield queue.shift()!
} else {
// Wait for next event
await new Promise(resolve => orderEvents.once('order', resolve))
}
}
} finally {
orderEvents.off('order', listener)
}
}),
})
tRPC Server Setup with WebSocket
// server/index.ts — Express + WebSocket server
import { createHTTPServer } from '@trpc/server/adapters/standalone'
import { applyWSSHandler } from '@trpc/server/adapters/ws'
import { WebSocketServer } from 'ws'
import { createContext } from './context'
import { appRouter } from './routers'
// HTTP server for queries and mutations
const { server, listen } = createHTTPServer({
router: appRouter,
createContext,
middleware: cors({ origin: process.env.CLIENT_URL }),
})
// WebSocket server for subscriptions (same port)
const wss = new WebSocketServer({ server })
applyWSSHandler({
wss,
router: appRouter,
createContext: async ({ req }) => {
// Extract auth from WebSocket upgrade request headers
const token = req.headers.authorization?.replace('Bearer ', '')
const user = token ? await verifyToken(token) : null
return { user }
},
})
listen(3001)
console.log('tRPC server on :3001 (HTTP + WS)')
// server/context.ts
import { inferAsyncReturnType } from '@trpc/server'
import type { CreateNextContextOptions } from '@trpc/server/adapters/next'
export async function createContext({ req, res }: CreateNextContextOptions) {
const session = await getServerSession(req, res, authOptions)
return { user: session?.user ?? null, req, res }
}
export type Context = inferAsyncReturnType<typeof createContext>
// server/trpc.ts — procedure builder with auth middleware
import { initTRPC, TRPCError } from '@trpc/server'
import type { Context } from './context'
import superjson from 'superjson'
const t = initTRPC.context<Context>().create({
transformer: superjson, // Handles Date, Map, Set serialization
})
const isAuthed = t.middleware(({ ctx, next }) => {
if (!ctx.user) {
throw new TRPCError({ code: 'UNAUTHORIZED' })
}
return next({ ctx: { ...ctx, user: ctx.user } })
})
export const router = t.router
export const publicProcedure = t.procedure
export const protectedProcedure = t.procedure.use(isAuthed)
React Client with Subscriptions
// lib/trpc.ts — client configuration
import { createTRPCReact } from '@trpc/react-query'
import { httpBatchLink, splitLink, wsLink, createWSClient } from '@trpc/client'
import superjson from 'superjson'
import type { AppRouter } from '../server/routers'
export const api = createTRPCReact<AppRouter>()
// WebSocket client — reconnects automatically
const wsClient = createWSClient({
url: () => {
const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws'
return `${protocol}://${window.location.host}/api/trpc`
},
onOpen: () => console.log('WebSocket connected'),
onClose: () => console.log('WebSocket disconnected'),
})
export const trpcClient = api.createClient({
transformer: superjson,
links: [
splitLink({
// Subscriptions go over WebSocket
condition: op => op.type === 'subscription',
true: wsLink({ client: wsClient }),
// Queries and mutations go over HTTP with batching
false: httpBatchLink({
url: '/api/trpc',
headers: () => ({
authorization: `Bearer ${getToken()}`,
}),
}),
}),
],
})
// components/OrderFeed.tsx — real-time order list
import { api } from '@/lib/trpc'
import { useState, useEffect } from 'react'
export function OrderFeed() {
const [events, setEvents] = useState<string[]>([])
// Initial data query
const { data: orders } = api.orders.list.useQuery({ limit: 20 })
// Real-time subscription — automatically reconnects
api.orders.onOrderUpdate.useSubscription(
{ orderIds: orders?.items.map(o => o.id) },
{
enabled: !!orders,
onData: (event) => {
setEvents(prev => [`${event.type}: ${event.order.id}`, ...prev.slice(0, 19)])
},
onError: (err) => {
console.error('Subscription error:', err)
},
}
)
return (
<div>
<h2>Live Order Feed</h2>
<ul className="space-y-2">
{events.map((e, i) => (
<li key={i} className="text-sm px-3 py-2 bg-gray-50 rounded animate-fade-in">
{e}
</li>
))}
</ul>
<div className="mt-6">
<h3>Orders</h3>
{orders?.items.map(order => (
<OrderCard key={order.id} order={order} />
))}
</div>
</div>
)
}
Next.js App Router Integration
// app/api/trpc/[trpc]/route.ts — App Router handler
import { fetchRequestHandler } from '@trpc/server/adapters/fetch'
import { appRouter } from '@/server/routers'
import { createContext } from '@/server/context'
const handler = (req: Request) =>
fetchRequestHandler({
endpoint: '/api/trpc',
req,
router: appRouter,
createContext: () => createContext({ req }),
})
export { handler as GET, handler as POST }
// app/providers.tsx — wrap with tRPC and React Query
'use client'
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { api, trpcClient } from '@/lib/trpc'
import { useState } from 'react'
export function TRPCProvider({ children }: { children: React.ReactNode }) {
const [queryClient] = useState(() => new QueryClient({
defaultOptions: {
queries: {
staleTime: 30 * 1000,
retry: 2,
},
},
}))
return (
<api.Provider client={trpcClient} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>
{children}
</QueryClientProvider>
</api.Provider>
)
}
For the existing tRPC basics with queries and mutations, see the tRPC guide for router setup and procedure patterns. For the WebSocket scaling patterns needed when running subscriptions in production with multiple server instances, the WebSocket scaling guide covers Redis pub/sub for distributed subscriptions. The Claude Skills 360 bundle includes tRPC skill sets covering subscriptions, WebSocket setup, and real-time React Query integration. Start with the free tier to try tRPC subscription generation.