Claude Code for Reactive Programming: RxJS Patterns and Stream-Based Architecture — Claude Skills 360 Blog
Blog / Frontend / Claude Code for Reactive Programming: RxJS Patterns and Stream-Based Architecture
Frontend

Claude Code for Reactive Programming: RxJS Patterns and Stream-Based Architecture

Published: September 25, 2026
Read time: 8 min read
By: Claude Skills 360

Reactive programming models asynchronous data as streams. Events, HTTP responses, WebSocket messages, user input — all become Observables that you transform, combine, and subscribe to. RxJS provides the operator toolkit: switchMap to cancel in-flight requests, throttleTime to debounce rapid events, combineLatest to merge multiple streams. Claude Code generates correct operator chains and explains why a given pattern avoids race conditions.

Observable Fundamentals

I keep getting confused about when to use switchMap vs mergeMap vs concatMap.
Explain the difference with real examples.
OperatorBehaviorUse When
mergeMapAll inner observables run in parallelMultiple concurrent requests OK
concatMapInner observables run strictly in orderOrder matters, no parallel
switchMapCancels previous inner observable when new outer emitsLatest value only (search, autocomplete)
exhaustMapIgnores new outer emissions while inner is activePrevent duplicate submits
// switchMap — search autocomplete: cancel previous request when user types
const search$ = fromEvent<InputEvent>(searchInput, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  debounceTime(300),          // Wait 300ms after last keystroke
  distinctUntilChanged(),     // Don't search if value didn't change
  filter(query => query.length >= 2),
  switchMap(query =>          // Cancel previous HTTP request if new query arrives
    from(fetch(`/api/search?q=${encodeURIComponent(query)}`)).pipe(
      switchMap(res => from(res.json())),
      catchError(err => {
        console.error('Search failed:', err);
        return of({ results: [] }); // Empty results on error — don't break stream
      })
    )
  )
);

search$.subscribe(({ results }) => renderResults(results));

// mergeMap — upload multiple files in parallel
const uploads$ = from(selectedFiles).pipe(
  mergeMap(file => uploadFile(file), 3),  // Max 3 concurrent uploads
  toArray(),  // Collect all results when complete
);

// concatMap — database operations that must run in order
const migrations$ = from(pendingMigrations).pipe(
  concatMap(migration => runMigration(migration)),  // Never parallel
);

// exhaustMap — form submit: ignore clicks until current submit completes
const submitButton = document.getElementById('submit')!;
fromEvent(submitButton, 'click').pipe(
  exhaustMap(() => submitForm()),  // Drop clicks while form is submitting
).subscribe(handleResult);

Combining Streams

Build a real-time dashboard that shows live metrics.
Combine: current user data, live price feed (WebSocket), and alerts.
import { combineLatest, interval, fromEvent, merge, Subject, EMPTY } from 'rxjs';
import { map, switchMap, shareReplay, retry, delay, startWith } from 'rxjs/operators';
import { webSocket } from 'rxjs/webSocket';

// WebSocket price feed — auto-reconnects on disconnect
const priceFeed$ = webSocket<PriceUpdate>('wss://api.example.com/prices').pipe(
  retry({ count: 5, delay: 2000 }),  // Reconnect up to 5 times
  shareReplay({ bufferSize: 1, refCount: true }),  // Share across subscribers
);

// User data — poll every 60 seconds
const user$ = interval(60_000).pipe(
  startWith(0),  // Emit immediately, then every 60s
  switchMap(() => from(fetch('/api/me').then(r => r.json()))),
  shareReplay({ bufferSize: 1, refCount: true }),
);

// Alerts — SSE stream
const alerts$ = new Observable<Alert>(subscriber => {
  const eventSource = new EventSource('/api/alerts/stream');
  eventSource.onmessage = e => subscriber.next(JSON.parse(e.data));
  eventSource.onerror = () => subscriber.error(new Error('SSE connection failed'));
  return () => eventSource.close();
}).pipe(
  retry({ count: 3, delay: 1000 }),
);

// Dashboard: combine all streams — emits when ANY of them emit
const dashboard$ = combineLatest({
  user: user$,
  prices: priceFeed$,
  alerts: alerts$.pipe(startWith([])),
}).pipe(
  map(({ user, prices, alerts }) => ({
    userName: user.name,
    balance: user.balance,
    currentPrice: prices.price,
    priceChange: prices.change24h,
    unreadAlerts: alerts.filter(a => !a.read).length,
  }))
);

// Single subscription drives the whole UI
const subscription = dashboard$.subscribe(
  data => updateDashboard(data),
  err => showErrorState(err),
);

// Cleanup on component destroy
// subscription.unsubscribe();

Error Handling Patterns

My observable stream breaks on the first error and stops emitting.
How do I handle errors without stopping the stream?
// The problem: catchError by default terminates the stream
const broken$ = source$.pipe(
  map(processItem),
  catchError(err => {
    console.error(err);
    return EMPTY;  // This TERMINATES the stream after the first error
  })
);

// Fix 1: catchError inside switchMap — only kills the inner observable, not the outer
const resilient$ = source$.pipe(
  switchMap(item =>
    processItem(item).pipe(
      catchError(err => {
        console.error('Item processing failed:', err);
        return of({ error: err.message, item });  // Return error object, continue stream
      })
    )
  )
);

// Fix 2: retry with exponential backoff
const withRetry$ = httpRequest$.pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => {
      console.log(`Retry ${retryCount} after ${Math.pow(2, retryCount)}s`);
      return timer(Math.pow(2, retryCount) * 1000);  // 1s, 2s, 4s
    },
    resetOnSuccess: true,
  })
);

// Fix 3: materialize/dematerialize — convert errors to values
const safe$ = source$.pipe(
  materialize(),  // Wraps emissions in {kind: 'N'|'E'|'C', value?, error?}
  map(notification => {
    if (notification.kind === 'E') {
      return { success: false, error: notification.error };
    }
    return { success: true, value: notification.value };
  }),
  // Stream never errors now — errors are just values
);

Hot vs Cold Observables

My HTTP observable is making 3 requests when I subscribe 3 times.
How do I share the single request result?
// Cold observable: each subscriber gets its own execution
const cold$ = from(fetch('/api/data').then(r => r.json()));

// This makes 3 separate HTTP requests:
cold$.subscribe(data => console.log('Sub 1:', data));
cold$.subscribe(data => console.log('Sub 2:', data));
cold$.subscribe(data => console.log('Sub 3:', data));

// Hot observable: all subscribers share one execution
const hot$ = cold$.pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
  // bufferSize: 1 — new subscribers immediately get the last value
  // refCount: true — unsubscribe from source when no subscribers left
);

// This makes 1 HTTP request and shares result:
hot$.subscribe(data => console.log('Sub 1:', data));
hot$.subscribe(data => console.log('Sub 2:', data));
hot$.subscribe(data => console.log('Sub 3:', data));

Real-World Pattern: Optimistic Updates

Update state immediately on user action, sync with server in background.
Rollback if server rejects.
const saveItem = (item: Item) => {
  const save$ = new Subject<void>();
  
  // Optimistic update + server sync
  return merge(
    // 1. Immediately emit optimistic state
    of({ optimistic: true, item }),
    
    // 2. Sync with server
    save$.pipe(
      exhaustMap(() =>
        from(api.saveItem(item)).pipe(
          map(serverItem => ({ optimistic: false, item: serverItem })),
          catchError(err => {
            // Rollback: emit the rollback signal
            return of({ optimistic: false, error: err, rollback: true });
          })
        )
      )
    )
  );
};

For integrating RxJS with Angular, see the Angular guide which covers the async pipe and Angular’s reactive patterns. For React state management that complements reactive streaming, the React Query guide covers server state management. The Claude Skills 360 bundle includes reactive programming skill sets covering RxJS operators, stream architecture, and common patterns. Start with the free tier to try observable chain generation.

Keep Reading

Frontend

Claude Code for Chart.js Advanced: Custom Plugins and Mixed Charts

Advanced Chart.js patterns with Claude Code — chart.register() for tree-shaking, mixed chart types combining bar and line, custom plugin API with beforeDraw and afterDatasetsDraw hooks, ScriptableContext for computed colors, ChartDataLabels plugin for value labels, chartjs-plugin-zoom for pan and zoom, custom gradient fills via ctx.createLinearGradient, ChartJS annotation plugin for threshold lines, streaming data with chartjs-plugin-streaming, and react-chartjs-2 with useRef and chart instance.

6 min read Jun 27, 2027
Frontend

Claude Code for Nivo: Rich SVG and Canvas Charts

Build rich data visualizations with Nivo and Claude Code — ResponsiveLine and ResponsiveBar for adaptive charts, ResponsiveHeatMap for matrix data, ResponsiveTreeMap for hierarchal data, ResponsiveSunburst for nested proportions, ResponsiveChord for relationship diagrams, ResponsiveCalendar for activity heat maps, ResponsiveNetwork for force graphs, NivoTheme for consistent styling, tooltip customization with sliceTooltip, and motion config for spring animations.

6 min read Jun 26, 2027
Frontend

Claude Code for Victory Charts: React Native and Web Charts

Build cross-platform charts with Victory and Claude Code — VictoryChart, VictoryLine, VictoryBar, and VictoryScatter for web and React Native, VictoryPie for donut charts, VictoryArea for stacked areas, VictoryAxis for custom axes, VictoryTooltip and VictoryVoronoiContainer for hover tooltips, VictoryBrushContainer for range selection, VictoryZoomContainer for pan and zoom, VictoryLegend for series labels, custom theme with VictoryTheme, and VictoryStack for grouped bars.

6 min read Jun 25, 2027

Put these ideas into practice

Claude Skills 360 gives you production-ready skills for everything in this article — and 2,350+ more. Start free or go all-in.

Back to Blog

Get 360 skills free