Prerequisites
- Basic TypeScript syntax and types ๐
- Understanding of Promises and async/await โก
- Event handling and callback patterns ๐
What you'll learn
- Master RxJS observables for reactive programming patterns ๐ก
- Build complex data streams with operators and transformations ๐
- Create real-time applications with type-safe reactive patterns ๐
- Handle error propagation and resource management in reactive systems ๐ก๏ธ
๐ฏ Introduction
Welcome to the powerful world of reactive programming with RxJS and TypeScript! ๐ In this guide, weโll explore how to build sophisticated data streams that react to changes, handle complex async operations, and create responsive applications.
Youโll discover how to transform events, user interactions, and data flows into elegant reactive patterns that scale beautifully. Whether youโre building real-time dashboards ๐, handling user input streams ๐, or orchestrating complex API workflows ๐, mastering RxJS observables is essential for creating modern, reactive TypeScript applications.
By the end of this tutorial, youโll be streaming data like a reactive programming wizard! ๐ก Letโs dive in! ๐โโ๏ธ
๐ Understanding Observables
๐ค What are Observables?
Observables are like smart data streams that can emit multiple values over time ๐. Think of them as a conveyor belt of data that you can observe, transform, and react to as items pass by!
In TypeScript with RxJS, observables provide:
- โจ Reactive patterns - respond to data changes as they happen
- ๐ Powerful operators - transform, filter, and combine data streams
- ๐ก๏ธ Type safety - strong typing for all stream operations
- ๐ฆ Composability - easily combine and chain operations
๐ก Why Use Observables?
Hereโs why observables are game-changers:
- Reactive Design โก: Applications that respond instantly to changes
- Complex Async Flows ๐: Handle multiple async operations elegantly
- Event Handling ๐ฏ: Transform user interactions into data streams
- Real-Time Features ๐ก: Build live updates and streaming interfaces
- Cancellation ๐ซ: Built-in subscription management and cleanup
Real-world example: A search box that automatically fetches suggestions as you type, debounces input, cancels previous requests, and shows loading states - all with a few operators! ๐
๐ง Basic Observable Patterns
๐ Creating and Subscribing to Observables
Letโs start with fundamental observable patterns:
// ๐ฏ Basic observable creation and subscription patterns
// TypeScript provides excellent type inference for observables
import { Observable, Observer, of, from, interval, fromEvent } from 'rxjs';
import { map, filter, take, debounceTime, distinctUntilChanged } from 'rxjs/operators';
// ๐๏ธ Creating observables from various sources
// Each observable is strongly typed
// ๐ Simple value observable
const simpleNumbers$ = of(1, 2, 3, 4, 5);
console.log('๐ Created simple numbers observable');
// ๐ฆ Array to observable
const fromArray$ = from(['apple', 'banana', 'cherry', 'date']);
console.log('๐ฆ Created observable from array');
// โฐ Timer observable
const timer$ = interval(1000).pipe(take(5)); // Emit 5 values, 1 second apart
console.log('โฐ Created timer observable');
// ๐ฎ Event observable (in browser environment)
// const clickObservable$ = fromEvent(document, 'click');
// ๐ง Custom observable with type safety
interface UserAction {
type: 'click' | 'scroll' | 'keypress';
timestamp: Date;
target: string;
data?: any;
}
const createUserActionStream = (): Observable<UserAction> => {
return new Observable<UserAction>((observer: Observer<UserAction>) => {
console.log('๐ง User action stream subscribed');
let counter = 0;
const actions: UserAction['type'][] = ['click', 'scroll', 'keypress'];
// ๐ญ Simulate user actions
const intervalId = setInterval(() => {
const action: UserAction = {
type: actions[counter % actions.length],
timestamp: new Date(),
target: `element-${counter}`,
data: { value: Math.random() * 100 }
};
console.log(`๐ค Emitting user action: ${action.type}`);
observer.next(action); // ๐ค Emit the action
counter++;
// ๐ Complete after 10 actions
if (counter >= 10) {
console.log('โ
User action stream completed');
observer.complete();
clearInterval(intervalId);
}
}, 800);
// ๐งน Cleanup function (called when unsubscribed)
return () => {
console.log('๐งน Cleaning up user action stream');
clearInterval(intervalId);
};
});
};
// โจ Basic subscription patterns
const demonstrateBasicSubscriptions = (): void => {
console.log('๐ Starting basic subscription demonstration...');
// ๐ Subscribe to simple observable
simpleNumbers$.subscribe({
next: (value) => console.log(`๐ Number: ${value}`),
error: (error) => console.error('๐ฅ Error:', error),
complete: () => console.log('โ
Numbers complete')
});
// ๐ฆ Subscribe to array observable with transformation
fromArray$.pipe(
map(fruit => fruit.toUpperCase()), // ๐ค Transform to uppercase
filter(fruit => fruit.length > 5) // ๐ Filter by length
).subscribe({
next: (fruit) => console.log(`๐ Filtered fruit: ${fruit}`),
complete: () => console.log('โ
Fruits complete')
});
// โฐ Subscribe to timer with logging
timer$.subscribe({
next: (count) => console.log(`โฐ Timer tick: ${count}`),
complete: () => console.log('โ
Timer complete')
});
// ๐ค Subscribe to user actions
const userActionSubscription = createUserActionStream().subscribe({
next: (action) => {
console.log(`๐ค User ${action.type} on ${action.target} at ${action.timestamp.toISOString()}`);
},
error: (error) => console.error('๐ฅ User action error:', error),
complete: () => console.log('โ
User actions complete')
});
// ๐ Demonstrate subscription management
setTimeout(() => {
console.log('๐ซ Unsubscribing from user actions...');
userActionSubscription.unsubscribe();
}, 5000);
};
// ๐ฎ Example usage
// demonstrateBasicSubscriptions();
๐ Observable Operators and Transformations
Letโs explore powerful operators for data transformation:
// ๐ Advanced operators for data stream transformation
// Real-world patterns with type safety
import {
merge, combineLatest, zip,
BehaviorSubject, Subject, ReplaySubject
} from 'rxjs';
import {
map, filter, scan, reduce, switchMap, mergeMap, concatMap,
debounceTime, throttleTime, distinctUntilChanged,
startWith, catchError, retry, finalize,
tap, shareReplay, withLatestFrom
} from 'rxjs/operators';
// ๐๏ธ Data interfaces for type safety
interface SearchQuery {
term: string;
timestamp: Date;
userId: string;
}
interface SearchResult {
id: string;
title: string;
description: string;
relevance: number;
category: string;
}
interface ApiResponse<T> {
data: T[];
total: number;
page: number;
hasMore: boolean;
}
// ๐ Advanced search observable with debouncing and caching
class TypeSafeSearchService {
private searchSubject = new Subject<string>();
private resultsCache = new Map<string, SearchResult[]>();
// ๐ก Create search stream with advanced operators
readonly searchResults$: Observable<ApiResponse<SearchResult>> = this.searchSubject.pipe(
// ๐ค Transform input
map(term => term.trim().toLowerCase()),
// ๐ซ Skip empty searches
filter(term => term.length >= 2),
// โฐ Debounce user input (wait 300ms after user stops typing)
debounceTime(300),
// ๐ Only search if term changed
distinctUntilChanged(),
// ๐ Log search attempts
tap(term => console.log(`๐ Searching for: "${term}"`)),
// ๐ Switch to new search (cancel previous)
switchMap(term => this.performSearch(term)),
// ๐ก๏ธ Handle errors gracefully
catchError(error => {
console.error('๐ฅ Search error:', error.message);
return of({ data: [], total: 0, page: 1, hasMore: false });
}),
// ๐ Log results
tap(results => console.log(`โ
Found ${results.data.length} results`)),
// ๐ฆ Share results among multiple subscribers
shareReplay(1)
);
// ๐ Trigger search
search(term: string): void {
console.log(`๐ฏ Search triggered: "${term}"`);
this.searchSubject.next(term);
}
// ๐ Simulate API search with caching
private performSearch(term: string): Observable<ApiResponse<SearchResult>> {
return new Observable<ApiResponse<SearchResult>>(observer => {
console.log(`๐ Performing API search for: "${term}"`);
// ๐ฆ Check cache first
if (this.resultsCache.has(term)) {
console.log('๐ฆ Returning cached results');
const cachedResults = this.resultsCache.get(term)!;
observer.next({
data: cachedResults,
total: cachedResults.length,
page: 1,
hasMore: false
});
observer.complete();
return;
}
// ๐ญ Simulate API delay
const delay = 500 + Math.random() * 1000; // 0.5-1.5 seconds
setTimeout(() => {
// ๐ฒ Simulate API results
const mockResults: SearchResult[] = Array.from(
{ length: Math.floor(Math.random() * 10) + 1 },
(_, i) => ({
id: `result-${term}-${i}`,
title: `${term} Result ${i + 1}`,
description: `This is a search result for "${term}" with some description text.`,
relevance: Math.random(),
category: ['tech', 'business', 'science', 'entertainment'][Math.floor(Math.random() * 4)]
})
);
// ๐ฆ Cache results
this.resultsCache.set(term, mockResults);
observer.next({
data: mockResults,
total: mockResults.length,
page: 1,
hasMore: false
});
observer.complete();
}, delay);
});
}
// ๐งน Clear cache
clearCache(): void {
console.log('๐งน Clearing search cache');
this.resultsCache.clear();
}
}
// ๐ Real-time data aggregation with scan operator
class RealTimeMetricsService {
private metricsSubject = new Subject<MetricEvent>();
// ๐ Running totals and aggregations
readonly aggregatedMetrics$ = this.metricsSubject.pipe(
// ๐ Accumulate metrics over time
scan((acc, event) => {
const updated = { ...acc };
// ๐ Update counters
updated.totalEvents++;
updated.eventsByType[event.type] = (updated.eventsByType[event.type] || 0) + 1;
// ๐ฐ Update revenue if applicable
if (event.value) {
updated.totalValue += event.value;
updated.averageValue = updated.totalValue / updated.totalEvents;
}
// โฐ Track last update
updated.lastUpdated = new Date();
return updated;
}, {
totalEvents: 0,
totalValue: 0,
averageValue: 0,
eventsByType: {} as Record<string, number>,
lastUpdated: new Date()
} as AggregatedMetrics),
// ๐ Log updates
tap(metrics => console.log('๐ Metrics updated:', {
total: metrics.totalEvents,
types: Object.keys(metrics.eventsByType).length,
value: metrics.totalValue.toFixed(2)
}))
);
// ๐ค Emit metric event
recordEvent(type: string, value?: number): void {
const event: MetricEvent = {
type,
value,
timestamp: new Date(),
id: `event-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
};
console.log(`๐ค Recording metric: ${type}${value ? ` (${value})` : ''}`);
this.metricsSubject.next(event);
}
}
// ๐๏ธ Type definitions for metrics
interface MetricEvent {
id: string;
type: string;
value?: number;
timestamp: Date;
}
interface AggregatedMetrics {
totalEvents: number;
totalValue: number;
averageValue: number;
eventsByType: Record<string, number>;
lastUpdated: Date;
}
// ๐ Combining multiple streams with combineLatest
const createDashboardStream = () => {
const userActivity$ = interval(2000).pipe(
map(i => ({ activeUsers: 50 + Math.floor(Math.random() * 100), timestamp: new Date() }))
);
const systemHealth$ = interval(3000).pipe(
map(i => ({
cpuUsage: Math.random() * 100,
memoryUsage: Math.random() * 100,
responseTime: 100 + Math.random() * 200,
timestamp: new Date()
}))
);
const notifications$ = interval(5000).pipe(
map(i => ({ count: Math.floor(Math.random() * 10), timestamp: new Date() }))
);
// ๐ Combine all dashboard data
return combineLatest([userActivity$, systemHealth$, notifications$]).pipe(
map(([activity, health, notifications]) => ({
userActivity: activity,
systemHealth: health,
notifications: notifications,
combinedAt: new Date()
})),
tap(dashboard => console.log('๐ Dashboard updated:', {
users: dashboard.userActivity.activeUsers,
cpu: dashboard.systemHealth.cpuUsage.toFixed(1) + '%',
notifications: dashboard.notifications.count
}))
);
};
๐ Real-World RxJS Applications
๐ฑ Complete Reactive Application
Letโs build a comprehensive real-time application:
// ๐ฑ Complete reactive application architecture
// Real-world patterns with advanced RxJS usage
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
share, takeUntil, switchMap, mergeMap, concatMap,
bufferTime, groupBy, mergeAll, delay, timeout
} from 'rxjs/operators';
// ๐๏ธ Application data types
interface ChatMessage {
id: string;
userId: string;
username: string;
content: string;
timestamp: Date;
type: 'text' | 'image' | 'file' | 'system';
}
interface UserStatus {
userId: string;
username: string;
status: 'online' | 'offline' | 'away';
lastSeen: Date;
}
interface TypingIndicator {
userId: string;
username: string;
isTyping: boolean;
}
interface NotificationEvent {
id: string;
type: 'message' | 'user_joined' | 'user_left' | 'system';
title: string;
message: string;
timestamp: Date;
priority: 'low' | 'medium' | 'high';
}
// ๐ Real-time chat service with WebSocket integration
class ReactiveChatService {
private socket$: WebSocketSubject<any>;
private destroy$ = new Subject<void>();
// ๐ก Message streams
readonly messages$: Observable<ChatMessage>;
readonly userStatuses$: Observable<UserStatus>;
readonly typingIndicators$: Observable<TypingIndicator>;
readonly notifications$: Observable<NotificationEvent>;
// ๐ Derived streams
readonly onlineUsers$: Observable<UserStatus[]>;
readonly messageHistory$: BehaviorSubject<ChatMessage[]> = new BehaviorSubject<ChatMessage[]>([]);
constructor(private wsUrl: string = 'ws://localhost:8080/chat') {
this.socket$ = webSocket(this.wsUrl);
// ๐ฏ Set up message streams
this.messages$ = this.createMessageStream();
this.userStatuses$ = this.createUserStatusStream();
this.typingIndicators$ = this.createTypingStream();
this.notifications$ = this.createNotificationStream();
this.onlineUsers$ = this.createOnlineUsersStream();
// ๐ Initialize connection
this.initializeConnection();
}
// ๐ค Send message
sendMessage(content: string, type: ChatMessage['type'] = 'text'): void {
const message = {
action: 'send_message',
data: {
content,
type,
timestamp: new Date().toISOString()
}
};
console.log('๐ค Sending message:', content);
this.socket$.next(message);
}
// โจ๏ธ Send typing indicator
sendTypingIndicator(isTyping: boolean): void {
this.socket$.next({
action: 'typing',
data: { isTyping }
});
}
// ๐ Create message stream
private createMessageStream(): Observable<ChatMessage> {
return this.socket$.pipe(
filter((data: any) => data.type === 'message'),
map((data: any) => ({
id: data.id,
userId: data.userId,
username: data.username,
content: data.content,
timestamp: new Date(data.timestamp),
type: data.messageType || 'text'
} as ChatMessage)),
tap(message => {
console.log(`๐ฌ New message from ${message.username}: ${message.content}`);
// ๐ Update message history
const current = this.messageHistory$.value;
this.messageHistory$.next([...current, message]);
}),
takeUntil(this.destroy$)
);
}
// ๐ฅ Create user status stream
private createUserStatusStream(): Observable<UserStatus> {
return this.socket$.pipe(
filter((data: any) => data.type === 'user_status'),
map((data: any) => ({
userId: data.userId,
username: data.username,
status: data.status,
lastSeen: new Date(data.lastSeen)
} as UserStatus)),
tap(status => console.log(`๐ค ${status.username} is ${status.status}`)),
takeUntil(this.destroy$)
);
}
// โจ๏ธ Create typing indicator stream
private createTypingStream(): Observable<TypingIndicator> {
return this.socket$.pipe(
filter((data: any) => data.type === 'typing'),
map((data: any) => ({
userId: data.userId,
username: data.username,
isTyping: data.isTyping
} as TypingIndicator)),
// ๐ Auto-clear typing after 3 seconds
switchMap(indicator =>
indicator.isTyping
? of(indicator).pipe(
concatMap(ind => [
ind,
of({ ...ind, isTyping: false }).pipe(delay(3000))
])
)
: of(indicator)
),
distinctUntilChanged((a, b) =>
a.userId === b.userId && a.isTyping === b.isTyping
),
takeUntil(this.destroy$)
);
}
// ๐ Create notification stream
private createNotificationStream(): Observable<NotificationEvent> {
return merge(
// ๐ฌ Message notifications
this.messages$.pipe(
map(message => ({
id: `notif-${message.id}`,
type: 'message' as const,
title: `New message from ${message.username}`,
message: message.content.substring(0, 50) + (message.content.length > 50 ? '...' : ''),
timestamp: new Date(),
priority: 'medium' as const
}))
),
// ๐ค User join/leave notifications
this.userStatuses$.pipe(
filter(status => status.status === 'online' || status.status === 'offline'),
map(status => ({
id: `notif-${Date.now()}-${status.userId}`,
type: status.status === 'online' ? 'user_joined' : 'user_left' as const,
title: status.status === 'online' ? 'User Joined' : 'User Left',
message: `${status.username} ${status.status === 'online' ? 'joined' : 'left'} the chat`,
timestamp: new Date(),
priority: 'low' as const
}))
)
).pipe(
tap(notification => console.log(`๐ ${notification.title}: ${notification.message}`)),
takeUntil(this.destroy$)
);
}
// ๐ฅ Create online users stream
private createOnlineUsersStream(): Observable<UserStatus[]> {
return this.userStatuses$.pipe(
// ๐ Accumulate user statuses
scan((users, status) => {
const updated = users.filter(u => u.userId !== status.userId);
if (status.status !== 'offline') {
updated.push(status);
}
return updated;
}, [] as UserStatus[]),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
tap(users => console.log(`๐ฅ Online users: ${users.length}`)),
takeUntil(this.destroy$)
);
}
// ๐ Initialize connection
private initializeConnection(): void {
console.log('๐ Connecting to chat service...');
// ๐ก Handle connection events
this.socket$.subscribe({
next: (message) => {
if (message.type === 'connection') {
console.log('โ
Connected to chat service');
}
},
error: (error) => {
console.error('๐ฅ Chat service error:', error);
this.handleReconnection();
},
complete: () => {
console.log('๐ Chat service connection closed');
}
});
}
// ๐ Handle reconnection
private handleReconnection(): void {
console.log('๐ Attempting to reconnect...');
// ๐ Exponential backoff reconnection
of(null).pipe(
delay(1000),
retry(5),
takeUntil(this.destroy$)
).subscribe(() => {
this.socket$ = webSocket(this.wsUrl);
this.initializeConnection();
});
}
// ๐งน Cleanup
destroy(): void {
console.log('๐งน Destroying chat service...');
this.destroy$.next();
this.destroy$.complete();
this.socket$.complete();
}
}
// ๐ฎ Advanced reactive game loop
class ReactiveGameEngine {
private gameLoop$: Observable<number>;
private input$ = new Subject<GameInput>();
private gameState$ = new BehaviorSubject<GameState>(this.getInitialState());
// ๐ฏ Game streams
readonly entities$: Observable<GameEntity[]>;
readonly score$: Observable<number>;
readonly fps$: Observable<number>;
constructor() {
// ๐ 60 FPS game loop
this.gameLoop$ = interval(1000 / 60).pipe(
map(() => performance.now()),
share()
);
this.entities$ = this.createEntityStream();
this.score$ = this.createScoreStream();
this.fps$ = this.createFPSStream();
this.initializeGameLoop();
}
// ๐ฎ Handle input
handleInput(input: GameInput): void {
this.input$.next(input);
}
// ๐ฏ Create entity update stream
private createEntityStream(): Observable<GameEntity[]> {
return this.gameLoop$.pipe(
withLatestFrom(this.gameState$, this.input$),
map(([timestamp, state, input]) => this.updateEntities(state.entities, input, timestamp)),
tap(entities => {
// ๐ Update game state
const currentState = this.gameState$.value;
this.gameState$.next({
...currentState,
entities,
lastUpdate: Date.now()
});
})
);
}
// ๐ Create score stream
private createScoreStream(): Observable<number> {
return this.gameState$.pipe(
map(state => state.score),
distinctUntilChanged(),
tap(score => console.log(`๐ Score: ${score}`))
);
}
// ๐ Create FPS monitoring stream
private createFPSStream(): Observable<number> {
return this.gameLoop$.pipe(
bufferTime(1000),
map(timestamps => timestamps.length),
tap(fps => console.log(`๐ FPS: ${fps}`))
);
}
// ๐ Initialize game loop
private initializeGameLoop(): void {
this.entities$.subscribe();
this.score$.subscribe();
this.fps$.subscribe();
}
// ๐ฏ Update entities (simplified)
private updateEntities(entities: GameEntity[], input: GameInput, timestamp: number): GameEntity[] {
return entities.map(entity => ({
...entity,
position: {
x: entity.position.x + entity.velocity.x,
y: entity.position.y + entity.velocity.y
},
lastUpdate: timestamp
}));
}
// ๐ฎ Get initial game state
private getInitialState(): GameState {
return {
entities: [],
score: 0,
level: 1,
isActive: true,
lastUpdate: Date.now()
};
}
}
// ๐๏ธ Game type definitions
interface GameEntity {
id: string;
type: 'player' | 'enemy' | 'projectile' | 'powerup';
position: { x: number; y: number };
velocity: { x: number; y: number };
health: number;
lastUpdate: number;
}
interface GameInput {
type: 'move' | 'shoot' | 'pause';
data: any;
timestamp: number;
}
interface GameState {
entities: GameEntity[];
score: number;
level: number;
isActive: boolean;
lastUpdate: number;
}
๐ ๏ธ Advanced RxJS Patterns
Letโs explore sophisticated reactive patterns:
// ๐ ๏ธ Advanced RxJS patterns and utilities
// Production-ready reactive programming techniques
// ๐ Retry with exponential backoff
const retryWithBackoff = <T>(maxRetries: number = 3, baseDelay: number = 1000) => {
return (source: Observable<T>) => source.pipe(
retryWhen(errors =>
errors.pipe(
scan((acc, error) => ({ count: acc.count + 1, error }), { count: 0, error: null }),
switchMap(({ count, error }) => {
if (count <= maxRetries) {
const delay = baseDelay * Math.pow(2, count - 1);
console.log(`๐ Retry attempt ${count}/${maxRetries} in ${delay}ms`);
return timer(delay);
} else {
console.error(`๐ฅ Max retries (${maxRetries}) reached`);
return throwError(error);
}
})
)
)
);
};
// ๐ Time-based window operations
const createTimeWindowOperator = <T>(windowDuration: number) => {
return (source: Observable<T>) => source.pipe(
bufferTime(windowDuration),
filter(buffer => buffer.length > 0),
map(buffer => ({
items: buffer,
count: buffer.length,
windowStart: new Date(Date.now() - windowDuration),
windowEnd: new Date()
}))
);
};
// ๐ Rate limiting operator
const rateLimit = <T>(maxEmissions: number, timeWindow: number) => {
return (source: Observable<T>) => {
let emissions: number[] = [];
return source.pipe(
filter(() => {
const now = Date.now();
// ๐งน Remove old emissions outside the time window
emissions = emissions.filter(time => now - time < timeWindow);
// ๐ฆ Check if we can emit
if (emissions.length < maxEmissions) {
emissions.push(now);
return true;
} else {
console.warn(`๐ฆ Rate limit exceeded: ${maxEmissions}/${timeWindow}ms`);
return false;
}
})
);
};
};
// ๐ State management with reactive patterns
class ReactiveStateManager<T> {
private state$ = new BehaviorSubject<T>(this.initialState);
private actions$ = new Subject<Action<T>>();
// ๐ก Public state observable
readonly state: Observable<T> = this.state$.asObservable();
constructor(private initialState: T, private reducer: Reducer<T>) {
// ๐ Process actions and update state
this.actions$.pipe(
scan((state, action) => {
console.log(`๐ Processing action: ${action.type}`);
const newState = this.reducer(state, action);
console.log('๐ State updated:', {
action: action.type,
hasChanges: JSON.stringify(state) !== JSON.stringify(newState)
});
return newState;
}, this.initialState)
).subscribe(this.state$);
}
// ๐ค Dispatch action
dispatch(action: Action<T>): void {
console.log(`๐ค Dispatching action: ${action.type}`);
this.actions$.next(action);
}
// ๐ Get current state
getCurrentState(): T {
return this.state$.value;
}
// ๐ Select specific state properties
select<K extends keyof T>(key: K): Observable<T[K]> {
return this.state$.pipe(
map(state => state[key]),
distinctUntilChanged()
);
}
}
// ๐๏ธ State management types
interface Action<T> {
type: string;
payload?: any;
}
type Reducer<T> = (state: T, action: Action<T>) => T;
// ๐ Performance monitoring with RxJS
class PerformanceMonitor {
private metrics$ = new Subject<PerformanceMetric>();
// ๐ Aggregated performance data
readonly performanceStats$ = this.metrics$.pipe(
groupBy(metric => metric.type),
mergeMap(group =>
group.pipe(
scan((acc, metric) => ({
type: metric.type,
count: acc.count + 1,
totalDuration: acc.totalDuration + metric.duration,
averageDuration: (acc.totalDuration + metric.duration) / (acc.count + 1),
minDuration: Math.min(acc.minDuration, metric.duration),
maxDuration: Math.max(acc.maxDuration, metric.duration),
lastUpdate: new Date()
}), {
type: group.key,
count: 0,
totalDuration: 0,
averageDuration: 0,
minDuration: Infinity,
maxDuration: -Infinity,
lastUpdate: new Date()
})
)
),
tap(stats => console.log(`๐ Performance stats for ${stats.type}:`, {
count: stats.count,
avg: stats.averageDuration.toFixed(2) + 'ms',
min: stats.minDuration.toFixed(2) + 'ms',
max: stats.maxDuration.toFixed(2) + 'ms'
}))
);
// ๐ Measure operation performance
measure<T>(operation: () => Observable<T>, type: string): Observable<T> {
const startTime = performance.now();
return operation().pipe(
finalize(() => {
const duration = performance.now() - startTime;
this.metrics$.next({
type,
duration,
timestamp: new Date()
});
})
);
}
// ๐ค Record custom metric
recordMetric(type: string, duration: number): void {
this.metrics$.next({
type,
duration,
timestamp: new Date()
});
}
}
interface PerformanceMetric {
type: string;
duration: number;
timestamp: Date;
}
// ๐ Auto-retry observable creator
const createRetryableObservable = <T>(
operation: () => Observable<T>,
options: {
maxRetries?: number;
retryDelay?: number;
retryCondition?: (error: any) => boolean;
} = {}
): Observable<T> => {
const { maxRetries = 3, retryDelay = 1000, retryCondition = () => true } = options;
return defer(() => operation()).pipe(
retryWhen(errors =>
errors.pipe(
scan((acc, error) => ({ count: acc.count + 1, error }), { count: 0, error: null }),
switchMap(({ count, error }) => {
if (count <= maxRetries && retryCondition(error)) {
console.log(`๐ Retrying operation (attempt ${count}/${maxRetries})`);
return timer(retryDelay * count);
} else {
console.error('๐ฅ Operation failed after retries');
return throwError(error);
}
})
)
),
tap(() => console.log('โ
Operation succeeded')),
catchError(error => {
console.error('๐ฅ Operation finally failed:', error.message);
return throwError(error);
})
);
};
๐ก Best Practices & Performance
๐ฏ RxJS Best Practices
Here are essential patterns for production RxJS code:
// โ
Best practices for RxJS in production
// Memory management, performance, and maintainability
// โ
DO: Always unsubscribe to prevent memory leaks
class ComponentWithSubscriptions {
private destroy$ = new Subject<void>();
constructor() {
// โ
Use takeUntil for automatic cleanup
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(value => {
console.log('Timer:', value);
});
// โ
Multiple subscriptions with same cleanup
merge(
fromEvent(window, 'resize'),
fromEvent(window, 'scroll')
).pipe(
takeUntil(this.destroy$)
).subscribe(event => {
console.log('Window event:', event.type);
});
}
// ๐งน Cleanup method
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
// โ
DO: Use shareReplay for expensive operations
const expensiveOperation$ = defer(() => {
console.log('๐ Performing expensive calculation...');
return timer(2000).pipe(
map(() => Math.random() * 1000)
);
}).pipe(
shareReplay(1) // โ
Cache result for multiple subscribers
);
// โ DON'T: Create new observables in templates or frequently called functions
const badExample = () => {
// โ Creates new observable on every call
return interval(1000).pipe(map(x => x * 2));
};
// โ
DO: Create observables once and reuse
const goodExample$ = interval(1000).pipe(
map(x => x * 2),
shareReplay(1)
);
// โ
DO: Use appropriate operators for performance
const efficientDataProcessing$ = of(...Array.from({ length: 10000 }, (_, i) => i)).pipe(
// โ
Use bufferCount for batch processing
bufferCount(100),
// โ
Use concatMap for ordered processing
concatMap(batch =>
from(batch).pipe(
map(x => x * 2),
toArray()
)
),
// โ
Use scan for cumulative operations
scan((acc, batch) => [...acc, ...batch], [] as number[]),
// โ
Use distinctUntilChanged to avoid unnecessary updates
distinctUntilChanged((a, b) => a.length === b.length)
);
// โ
DO: Handle errors gracefully
const robustApiCall$ = (url: string) => {
return from(fetch(url)).pipe(
switchMap(response => {
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return response.json();
}),
// โ
Retry with exponential backoff
retryWithBackoff(3, 1000),
// โ
Provide fallback value
catchError(error => {
console.error('API call failed:', error.message);
return of({ data: null, error: error.message });
}),
// โ
Timeout to prevent hanging
timeout(10000)
);
};
// โ
DO: Use subjects appropriately
class NotificationService {
// โ
Use Subject for events
private notificationSubject = new Subject<Notification>();
// โ
Use BehaviorSubject for state
private isConnectedSubject = new BehaviorSubject<boolean>(false);
// โ
Use ReplaySubject for history
private messageHistorySubject = new ReplaySubject<Message>(50);
readonly notifications$ = this.notificationSubject.asObservable();
readonly isConnected$ = this.isConnectedSubject.asObservable();
readonly messageHistory$ = this.messageHistorySubject.asObservable();
sendNotification(notification: Notification): void {
this.notificationSubject.next(notification);
}
updateConnectionStatus(connected: boolean): void {
this.isConnectedSubject.next(connected);
}
addMessage(message: Message): void {
this.messageHistorySubject.next(message);
}
}
// ๐ง Memory leak detection utility
class SubscriptionTracker {
private subscriptions = new Map<string, Subscription>();
private subscriptionCounts = new Map<string, number>();
track(name: string, subscription: Subscription): Subscription {
// ๐งน Clean up existing subscription
if (this.subscriptions.has(name)) {
this.subscriptions.get(name)!.unsubscribe();
}
// ๐ Track subscription count
const count = (this.subscriptionCounts.get(name) || 0) + 1;
this.subscriptionCounts.set(name, count);
console.log(`๐ Tracking subscription: ${name} (count: ${count})`);
// ๐ฏ Wrap subscription for automatic cleanup tracking
const wrappedSubscription = new Subscription(() => {
subscription.unsubscribe();
this.subscriptions.delete(name);
console.log(`๐งน Cleaned up subscription: ${name}`);
});
this.subscriptions.set(name, wrappedSubscription);
return wrappedSubscription;
}
unsubscribeAll(): void {
console.log(`๐งน Cleaning up ${this.subscriptions.size} subscriptions`);
for (const [name, subscription] of this.subscriptions) {
subscription.unsubscribe();
console.log(`๐งน Unsubscribed: ${name}`);
}
this.subscriptions.clear();
}
getReport(): SubscriptionReport {
return {
activeSubscriptions: this.subscriptions.size,
subscriptionHistory: Object.fromEntries(this.subscriptionCounts),
activeSubscriptionNames: Array.from(this.subscriptions.keys())
};
}
}
interface SubscriptionReport {
activeSubscriptions: number;
subscriptionHistory: Record<string, number>;
activeSubscriptionNames: string[];
}
// ๐ Performance testing utility
const benchmarkObservable = <T>(
name: string,
observable: Observable<T>,
expectedItems: number = 1000
): Observable<BenchmarkResult> => {
return new Observable<BenchmarkResult>(observer => {
const startTime = performance.now();
let itemCount = 0;
let totalSize = 0;
console.log(`๐ Starting benchmark: ${name}`);
const subscription = observable.subscribe({
next: (value) => {
itemCount++;
totalSize += JSON.stringify(value).length;
if (itemCount % 100 === 0) {
console.log(`๐ Processed ${itemCount}/${expectedItems} items`);
}
},
error: (error) => {
const duration = performance.now() - startTime;
observer.next({
name,
duration,
itemCount,
totalSize,
averageItemSize: totalSize / itemCount,
itemsPerSecond: itemCount / (duration / 1000),
success: false,
error: error.message
});
observer.complete();
},
complete: () => {
const duration = performance.now() - startTime;
const result: BenchmarkResult = {
name,
duration,
itemCount,
totalSize,
averageItemSize: totalSize / itemCount,
itemsPerSecond: itemCount / (duration / 1000),
success: true
};
console.log(`โ
Benchmark complete: ${name}`, result);
observer.next(result);
observer.complete();
}
});
return () => subscription.unsubscribe();
});
};
interface BenchmarkResult {
name: string;
duration: number;
itemCount: number;
totalSize: number;
averageItemSize: number;
itemsPerSecond: number;
success: boolean;
error?: string;
}
๐ Summary
Congratulations! ๐ Youโve mastered RxJS observables and reactive programming in TypeScript! Hereโs what youโve accomplished:
๐ฏ Key Takeaways
- Reactive Programming ๐ก: Build applications that respond elegantly to data changes
- Stream Composition ๐: Combine and transform data streams with powerful operators
- Real-Time Applications โก: Create live, responsive user experiences
- Memory Management ๐งน: Prevent memory leaks with proper subscription handling
- Performance Optimization ๐: Build efficient reactive systems that scale
๐ ๏ธ What You Can Build
- Real-Time Dashboards ๐: Live data visualization with streaming updates
- Chat Applications ๐ฌ: WebSocket-based messaging with reactive patterns
- Search Interfaces ๐: Debounced, cached search with instant results
- Game Engines ๐ฎ: Reactive game loops and state management
- IoT Monitoring ๐ก: Real-time sensor data processing and alerts
๐ Next Steps
Ready to take your reactive skills even further? Consider exploring:
- Node.js Streams ๐: Server-side streaming data processing
- WebWorkers ๐ฅ: Reactive patterns in parallel processing
- State Management ๐๏ธ: Advanced reactive state patterns with NgRx/Redux
- Real-Time APIs ๐ก: WebSocket and Server-Sent Events integration
Youโre now equipped to build sophisticated reactive applications that handle complex data flows with elegance and performance! ๐ฏ Keep streaming! ๐กโจ