Reactive programming models asynchronous data as streams. Events, HTTP responses, WebSocket messages, user input — all become Observables that you transform, combine, and subscribe to. RxJS provides the operator toolkit: switchMap to cancel in-flight requests, throttleTime to debounce rapid events, combineLatest to merge multiple streams. Claude Code generates correct operator chains and explains why a given pattern avoids race conditions.
Observable Fundamentals
I keep getting confused about when to use switchMap vs mergeMap vs concatMap.
Explain the difference with real examples.
| Operator | Behavior | Use When |
|---|---|---|
mergeMap | All inner observables run in parallel | Multiple concurrent requests OK |
concatMap | Inner observables run strictly in order | Order matters, no parallel |
switchMap | Cancels previous inner observable when new outer emits | Latest value only (search, autocomplete) |
exhaustMap | Ignores new outer emissions while inner is active | Prevent duplicate submits |
// switchMap — search autocomplete: cancel previous request when user types
const search$ = fromEvent<InputEvent>(searchInput, 'input').pipe(
map(e => (e.target as HTMLInputElement).value),
debounceTime(300), // Wait 300ms after last keystroke
distinctUntilChanged(), // Don't search if value didn't change
filter(query => query.length >= 2),
switchMap(query => // Cancel previous HTTP request if new query arrives
from(fetch(`/api/search?q=${encodeURIComponent(query)}`)).pipe(
switchMap(res => from(res.json())),
catchError(err => {
console.error('Search failed:', err);
return of({ results: [] }); // Empty results on error — don't break stream
})
)
)
);
search$.subscribe(({ results }) => renderResults(results));
// mergeMap — upload multiple files in parallel
const uploads$ = from(selectedFiles).pipe(
mergeMap(file => uploadFile(file), 3), // Max 3 concurrent uploads
toArray(), // Collect all results when complete
);
// concatMap — database operations that must run in order
const migrations$ = from(pendingMigrations).pipe(
concatMap(migration => runMigration(migration)), // Never parallel
);
// exhaustMap — form submit: ignore clicks until current submit completes
const submitButton = document.getElementById('submit')!;
fromEvent(submitButton, 'click').pipe(
exhaustMap(() => submitForm()), // Drop clicks while form is submitting
).subscribe(handleResult);
Combining Streams
Build a real-time dashboard that shows live metrics.
Combine: current user data, live price feed (WebSocket), and alerts.
import { combineLatest, interval, fromEvent, merge, Subject, EMPTY } from 'rxjs';
import { map, switchMap, shareReplay, retry, delay, startWith } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';
// WebSocket price feed — auto-reconnects on disconnect
const priceFeed$ = webSocket<PriceUpdate>('wss://api.example.com/prices').pipe(
retry({ count: 5, delay: 2000 }), // Reconnect up to 5 times
shareReplay({ bufferSize: 1, refCount: true }), // Share across subscribers
);
// User data — poll every 60 seconds
const user$ = interval(60_000).pipe(
startWith(0), // Emit immediately, then every 60s
switchMap(() => from(fetch('/api/me').then(r => r.json()))),
shareReplay({ bufferSize: 1, refCount: true }),
);
// Alerts — SSE stream
const alerts$ = new Observable<Alert>(subscriber => {
const eventSource = new EventSource('/api/alerts/stream');
eventSource.onmessage = e => subscriber.next(JSON.parse(e.data));
eventSource.onerror = () => subscriber.error(new Error('SSE connection failed'));
return () => eventSource.close();
}).pipe(
retry({ count: 3, delay: 1000 }),
);
// Dashboard: combine all streams — emits when ANY of them emit
const dashboard$ = combineLatest({
user: user$,
prices: priceFeed$,
alerts: alerts$.pipe(startWith([])),
}).pipe(
map(({ user, prices, alerts }) => ({
userName: user.name,
balance: user.balance,
currentPrice: prices.price,
priceChange: prices.change24h,
unreadAlerts: alerts.filter(a => !a.read).length,
}))
);
// Single subscription drives the whole UI
const subscription = dashboard$.subscribe(
data => updateDashboard(data),
err => showErrorState(err),
);
// Cleanup on component destroy
// subscription.unsubscribe();
Error Handling Patterns
My observable stream breaks on the first error and stops emitting.
How do I handle errors without stopping the stream?
// The problem: catchError by default terminates the stream
const broken$ = source$.pipe(
map(processItem),
catchError(err => {
console.error(err);
return EMPTY; // This TERMINATES the stream after the first error
})
);
// Fix 1: catchError inside switchMap — only kills the inner observable, not the outer
const resilient$ = source$.pipe(
switchMap(item =>
processItem(item).pipe(
catchError(err => {
console.error('Item processing failed:', err);
return of({ error: err.message, item }); // Return error object, continue stream
})
)
)
);
// Fix 2: retry with exponential backoff
const withRetry$ = httpRequest$.pipe(
retry({
count: 3,
delay: (error, retryCount) => {
console.log(`Retry ${retryCount} after ${Math.pow(2, retryCount)}s`);
return timer(Math.pow(2, retryCount) * 1000); // 1s, 2s, 4s
},
resetOnSuccess: true,
})
);
// Fix 3: materialize/dematerialize — convert errors to values
const safe$ = source$.pipe(
materialize(), // Wraps emissions in {kind: 'N'|'E'|'C', value?, error?}
map(notification => {
if (notification.kind === 'E') {
return { success: false, error: notification.error };
}
return { success: true, value: notification.value };
}),
// Stream never errors now — errors are just values
);
Hot vs Cold Observables
My HTTP observable is making 3 requests when I subscribe 3 times.
How do I share the single request result?
// Cold observable: each subscriber gets its own execution
const cold$ = from(fetch('/api/data').then(r => r.json()));
// This makes 3 separate HTTP requests:
cold$.subscribe(data => console.log('Sub 1:', data));
cold$.subscribe(data => console.log('Sub 2:', data));
cold$.subscribe(data => console.log('Sub 3:', data));
// Hot observable: all subscribers share one execution
const hot$ = cold$.pipe(
shareReplay({ bufferSize: 1, refCount: true }),
// bufferSize: 1 — new subscribers immediately get the last value
// refCount: true — unsubscribe from source when no subscribers left
);
// This makes 1 HTTP request and shares result:
hot$.subscribe(data => console.log('Sub 1:', data));
hot$.subscribe(data => console.log('Sub 2:', data));
hot$.subscribe(data => console.log('Sub 3:', data));
Real-World Pattern: Optimistic Updates
Update state immediately on user action, sync with server in background.
Rollback if server rejects.
const saveItem = (item: Item) => {
const save$ = new Subject<void>();
// Optimistic update + server sync
return merge(
// 1. Immediately emit optimistic state
of({ optimistic: true, item }),
// 2. Sync with server
save$.pipe(
exhaustMap(() =>
from(api.saveItem(item)).pipe(
map(serverItem => ({ optimistic: false, item: serverItem })),
catchError(err => {
// Rollback: emit the rollback signal
return of({ optimistic: false, error: err, rollback: true });
})
)
)
)
);
};
For integrating RxJS with Angular, see the Angular guide which covers the async pipe and Angular’s reactive patterns. For React state management that complements reactive streaming, the React Query guide covers server state management. The Claude Skills 360 bundle includes reactive programming skill sets covering RxJS operators, stream architecture, and common patterns. Start with the free tier to try observable chain generation.