+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 107 of 355

๐Ÿ“ก Observables with TypeScript: RxJS Integration Mastery

Master reactive programming with RxJS observables in TypeScript for powerful data streams, real-time applications, and complex async flows ๐ŸŒŠ

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic TypeScript syntax and types ๐Ÿ“
  • Understanding of Promises and async/await โšก
  • Event handling and callback patterns ๐Ÿ”„

What you'll learn

  • Master RxJS observables for reactive programming patterns ๐Ÿ“ก
  • Build complex data streams with operators and transformations ๐ŸŒŠ
  • Create real-time applications with type-safe reactive patterns ๐Ÿš€
  • Handle error propagation and resource management in reactive systems ๐Ÿ›ก๏ธ

๐ŸŽฏ Introduction

Welcome to the powerful world of reactive programming with RxJS and TypeScript! ๐ŸŽ‰ In this guide, weโ€™ll explore how to build sophisticated data streams that react to changes, handle complex async operations, and create responsive applications.

Youโ€™ll discover how to transform events, user interactions, and data flows into elegant reactive patterns that scale beautifully. Whether youโ€™re building real-time dashboards ๐Ÿ“Š, handling user input streams ๐Ÿ“, or orchestrating complex API workflows ๐ŸŒ, mastering RxJS observables is essential for creating modern, reactive TypeScript applications.

By the end of this tutorial, youโ€™ll be streaming data like a reactive programming wizard! ๐Ÿ“ก Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Observables

๐Ÿค” What are Observables?

Observables are like smart data streams that can emit multiple values over time ๐ŸŒŠ. Think of them as a conveyor belt of data that you can observe, transform, and react to as items pass by!

In TypeScript with RxJS, observables provide:

  • โœจ Reactive patterns - respond to data changes as they happen
  • ๐Ÿš€ Powerful operators - transform, filter, and combine data streams
  • ๐Ÿ›ก๏ธ Type safety - strong typing for all stream operations
  • ๐Ÿ“ฆ Composability - easily combine and chain operations

๐Ÿ’ก Why Use Observables?

Hereโ€™s why observables are game-changers:

  1. Reactive Design โšก: Applications that respond instantly to changes
  2. Complex Async Flows ๐ŸŒŠ: Handle multiple async operations elegantly
  3. Event Handling ๐ŸŽฏ: Transform user interactions into data streams
  4. Real-Time Features ๐Ÿ“ก: Build live updates and streaming interfaces
  5. Cancellation ๐Ÿšซ: Built-in subscription management and cleanup

Real-world example: A search box that automatically fetches suggestions as you type, debounces input, cancels previous requests, and shows loading states - all with a few operators! ๐Ÿ”

๐Ÿ”ง Basic Observable Patterns

๐Ÿ“ Creating and Subscribing to Observables

Letโ€™s start with fundamental observable patterns:

// ๐ŸŽฏ Basic observable creation and subscription patterns
// TypeScript provides excellent type inference for observables

import { Observable, Observer, of, from, interval, fromEvent } from 'rxjs';
import { map, filter, take, debounceTime, distinctUntilChanged } from 'rxjs/operators';

// ๐Ÿ—๏ธ Creating observables from various sources
// Each observable is strongly typed

// ๐Ÿ“Š Simple value observable
const simpleNumbers$ = of(1, 2, 3, 4, 5);
console.log('๐Ÿ“Š Created simple numbers observable');

// ๐Ÿ“ฆ Array to observable
const fromArray$ = from(['apple', 'banana', 'cherry', 'date']);
console.log('๐Ÿ“ฆ Created observable from array');

// โฐ Timer observable
const timer$ = interval(1000).pipe(take(5)); // Emit 5 values, 1 second apart
console.log('โฐ Created timer observable');

// ๐ŸŽฎ Event observable (in browser environment)
// const clickObservable$ = fromEvent(document, 'click');

// ๐Ÿ”ง Custom observable with type safety
interface UserAction {
  type: 'click' | 'scroll' | 'keypress';
  timestamp: Date;
  target: string;
  data?: any;
}

const createUserActionStream = (): Observable<UserAction> => {
  return new Observable<UserAction>((observer: Observer<UserAction>) => {
    console.log('๐Ÿ”ง User action stream subscribed');
    
    let counter = 0;
    const actions: UserAction['type'][] = ['click', 'scroll', 'keypress'];
    
    // ๐ŸŽญ Simulate user actions
    const intervalId = setInterval(() => {
      const action: UserAction = {
        type: actions[counter % actions.length],
        timestamp: new Date(),
        target: `element-${counter}`,
        data: { value: Math.random() * 100 }
      };
      
      console.log(`๐Ÿ‘ค Emitting user action: ${action.type}`);
      observer.next(action); // ๐Ÿ“ค Emit the action
      
      counter++;
      
      // ๐Ÿ”š Complete after 10 actions
      if (counter >= 10) {
        console.log('โœ… User action stream completed');
        observer.complete();
        clearInterval(intervalId);
      }
    }, 800);
    
    // ๐Ÿงน Cleanup function (called when unsubscribed)
    return () => {
      console.log('๐Ÿงน Cleaning up user action stream');
      clearInterval(intervalId);
    };
  });
};

// โœจ Basic subscription patterns
const demonstrateBasicSubscriptions = (): void => {
  console.log('๐Ÿš€ Starting basic subscription demonstration...');
  
  // ๐Ÿ“Š Subscribe to simple observable
  simpleNumbers$.subscribe({
    next: (value) => console.log(`๐Ÿ“Š Number: ${value}`),
    error: (error) => console.error('๐Ÿ’ฅ Error:', error),
    complete: () => console.log('โœ… Numbers complete')
  });
  
  // ๐Ÿ“ฆ Subscribe to array observable with transformation
  fromArray$.pipe(
    map(fruit => fruit.toUpperCase()), // ๐Ÿ”ค Transform to uppercase
    filter(fruit => fruit.length > 5)  // ๐Ÿ“ Filter by length
  ).subscribe({
    next: (fruit) => console.log(`๐ŸŽ Filtered fruit: ${fruit}`),
    complete: () => console.log('โœ… Fruits complete')
  });
  
  // โฐ Subscribe to timer with logging
  timer$.subscribe({
    next: (count) => console.log(`โฐ Timer tick: ${count}`),
    complete: () => console.log('โœ… Timer complete')
  });
  
  // ๐Ÿ‘ค Subscribe to user actions
  const userActionSubscription = createUserActionStream().subscribe({
    next: (action) => {
      console.log(`๐Ÿ‘ค User ${action.type} on ${action.target} at ${action.timestamp.toISOString()}`);
    },
    error: (error) => console.error('๐Ÿ’ฅ User action error:', error),
    complete: () => console.log('โœ… User actions complete')
  });
  
  // ๐Ÿ”„ Demonstrate subscription management
  setTimeout(() => {
    console.log('๐Ÿšซ Unsubscribing from user actions...');
    userActionSubscription.unsubscribe();
  }, 5000);
};

// ๐ŸŽฎ Example usage
// demonstrateBasicSubscriptions();

๐ŸŒŠ Observable Operators and Transformations

Letโ€™s explore powerful operators for data transformation:

// ๐ŸŒŠ Advanced operators for data stream transformation
// Real-world patterns with type safety

import { 
  merge, combineLatest, zip, 
  BehaviorSubject, Subject, ReplaySubject 
} from 'rxjs';
import { 
  map, filter, scan, reduce, switchMap, mergeMap, concatMap,
  debounceTime, throttleTime, distinctUntilChanged, 
  startWith, catchError, retry, finalize,
  tap, shareReplay, withLatestFrom
} from 'rxjs/operators';

// ๐Ÿ—๏ธ Data interfaces for type safety
interface SearchQuery {
  term: string;
  timestamp: Date;
  userId: string;
}

interface SearchResult {
  id: string;
  title: string;
  description: string;
  relevance: number;
  category: string;
}

interface ApiResponse<T> {
  data: T[];
  total: number;
  page: number;
  hasMore: boolean;
}

// ๐Ÿ” Advanced search observable with debouncing and caching
class TypeSafeSearchService {
  private searchSubject = new Subject<string>();
  private resultsCache = new Map<string, SearchResult[]>();
  
  // ๐Ÿ“ก Create search stream with advanced operators
  readonly searchResults$: Observable<ApiResponse<SearchResult>> = this.searchSubject.pipe(
    // ๐Ÿ”ค Transform input
    map(term => term.trim().toLowerCase()),
    
    // ๐Ÿšซ Skip empty searches
    filter(term => term.length >= 2),
    
    // โฐ Debounce user input (wait 300ms after user stops typing)
    debounceTime(300),
    
    // ๐Ÿ”„ Only search if term changed
    distinctUntilChanged(),
    
    // ๐Ÿ“Š Log search attempts
    tap(term => console.log(`๐Ÿ” Searching for: "${term}"`)),
    
    // ๐Ÿ”„ Switch to new search (cancel previous)
    switchMap(term => this.performSearch(term)),
    
    // ๐Ÿ›ก๏ธ Handle errors gracefully
    catchError(error => {
      console.error('๐Ÿ’ฅ Search error:', error.message);
      return of({ data: [], total: 0, page: 1, hasMore: false });
    }),
    
    // ๐Ÿ“Š Log results
    tap(results => console.log(`โœ… Found ${results.data.length} results`)),
    
    // ๐Ÿ“ฆ Share results among multiple subscribers
    shareReplay(1)
  );
  
  // ๐Ÿ” Trigger search
  search(term: string): void {
    console.log(`๐ŸŽฏ Search triggered: "${term}"`);
    this.searchSubject.next(term);
  }
  
  // ๐ŸŒ Simulate API search with caching
  private performSearch(term: string): Observable<ApiResponse<SearchResult>> {
    return new Observable<ApiResponse<SearchResult>>(observer => {
      console.log(`๐ŸŒ Performing API search for: "${term}"`);
      
      // ๐Ÿ“ฆ Check cache first
      if (this.resultsCache.has(term)) {
        console.log('๐Ÿ“ฆ Returning cached results');
        const cachedResults = this.resultsCache.get(term)!;
        observer.next({
          data: cachedResults,
          total: cachedResults.length,
          page: 1,
          hasMore: false
        });
        observer.complete();
        return;
      }
      
      // ๐ŸŽญ Simulate API delay
      const delay = 500 + Math.random() * 1000; // 0.5-1.5 seconds
      
      setTimeout(() => {
        // ๐ŸŽฒ Simulate API results
        const mockResults: SearchResult[] = Array.from(
          { length: Math.floor(Math.random() * 10) + 1 },
          (_, i) => ({
            id: `result-${term}-${i}`,
            title: `${term} Result ${i + 1}`,
            description: `This is a search result for "${term}" with some description text.`,
            relevance: Math.random(),
            category: ['tech', 'business', 'science', 'entertainment'][Math.floor(Math.random() * 4)]
          })
        );
        
        // ๐Ÿ“ฆ Cache results
        this.resultsCache.set(term, mockResults);
        
        observer.next({
          data: mockResults,
          total: mockResults.length,
          page: 1,
          hasMore: false
        });
        observer.complete();
      }, delay);
    });
  }
  
  // ๐Ÿงน Clear cache
  clearCache(): void {
    console.log('๐Ÿงน Clearing search cache');
    this.resultsCache.clear();
  }
}

// ๐Ÿ“Š Real-time data aggregation with scan operator
class RealTimeMetricsService {
  private metricsSubject = new Subject<MetricEvent>();
  
  // ๐Ÿ“ˆ Running totals and aggregations
  readonly aggregatedMetrics$ = this.metricsSubject.pipe(
    // ๐Ÿ“Š Accumulate metrics over time
    scan((acc, event) => {
      const updated = { ...acc };
      
      // ๐Ÿ“ˆ Update counters
      updated.totalEvents++;
      updated.eventsByType[event.type] = (updated.eventsByType[event.type] || 0) + 1;
      
      // ๐Ÿ’ฐ Update revenue if applicable
      if (event.value) {
        updated.totalValue += event.value;
        updated.averageValue = updated.totalValue / updated.totalEvents;
      }
      
      // โฐ Track last update
      updated.lastUpdated = new Date();
      
      return updated;
    }, {
      totalEvents: 0,
      totalValue: 0,
      averageValue: 0,
      eventsByType: {} as Record<string, number>,
      lastUpdated: new Date()
    } as AggregatedMetrics),
    
    // ๐Ÿ“Š Log updates
    tap(metrics => console.log('๐Ÿ“Š Metrics updated:', {
      total: metrics.totalEvents,
      types: Object.keys(metrics.eventsByType).length,
      value: metrics.totalValue.toFixed(2)
    }))
  );
  
  // ๐Ÿ“ค Emit metric event
  recordEvent(type: string, value?: number): void {
    const event: MetricEvent = {
      type,
      value,
      timestamp: new Date(),
      id: `event-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
    };
    
    console.log(`๐Ÿ“ค Recording metric: ${type}${value ? ` (${value})` : ''}`);
    this.metricsSubject.next(event);
  }
}

// ๐Ÿ—๏ธ Type definitions for metrics
interface MetricEvent {
  id: string;
  type: string;
  value?: number;
  timestamp: Date;
}

interface AggregatedMetrics {
  totalEvents: number;
  totalValue: number;
  averageValue: number;
  eventsByType: Record<string, number>;
  lastUpdated: Date;
}

// ๐Ÿ”„ Combining multiple streams with combineLatest
const createDashboardStream = () => {
  const userActivity$ = interval(2000).pipe(
    map(i => ({ activeUsers: 50 + Math.floor(Math.random() * 100), timestamp: new Date() }))
  );
  
  const systemHealth$ = interval(3000).pipe(
    map(i => ({ 
      cpuUsage: Math.random() * 100, 
      memoryUsage: Math.random() * 100,
      responseTime: 100 + Math.random() * 200,
      timestamp: new Date()
    }))
  );
  
  const notifications$ = interval(5000).pipe(
    map(i => ({ count: Math.floor(Math.random() * 10), timestamp: new Date() }))
  );
  
  // ๐Ÿ”— Combine all dashboard data
  return combineLatest([userActivity$, systemHealth$, notifications$]).pipe(
    map(([activity, health, notifications]) => ({
      userActivity: activity,
      systemHealth: health,
      notifications: notifications,
      combinedAt: new Date()
    })),
    tap(dashboard => console.log('๐Ÿ“Š Dashboard updated:', {
      users: dashboard.userActivity.activeUsers,
      cpu: dashboard.systemHealth.cpuUsage.toFixed(1) + '%',
      notifications: dashboard.notifications.count
    }))
  );
};

๐Ÿš€ Real-World RxJS Applications

๐Ÿ“ฑ Complete Reactive Application

Letโ€™s build a comprehensive real-time application:

// ๐Ÿ“ฑ Complete reactive application architecture
// Real-world patterns with advanced RxJS usage

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { 
  share, takeUntil, switchMap, mergeMap, concatMap,
  bufferTime, groupBy, mergeAll, delay, timeout
} from 'rxjs/operators';

// ๐Ÿ—๏ธ Application data types
interface ChatMessage {
  id: string;
  userId: string;
  username: string;
  content: string;
  timestamp: Date;
  type: 'text' | 'image' | 'file' | 'system';
}

interface UserStatus {
  userId: string;
  username: string;
  status: 'online' | 'offline' | 'away';
  lastSeen: Date;
}

interface TypingIndicator {
  userId: string;
  username: string;
  isTyping: boolean;
}

interface NotificationEvent {
  id: string;
  type: 'message' | 'user_joined' | 'user_left' | 'system';
  title: string;
  message: string;
  timestamp: Date;
  priority: 'low' | 'medium' | 'high';
}

// ๐ŸŒ Real-time chat service with WebSocket integration
class ReactiveChatService {
  private socket$: WebSocketSubject<any>;
  private destroy$ = new Subject<void>();
  
  // ๐Ÿ“ก Message streams
  readonly messages$: Observable<ChatMessage>;
  readonly userStatuses$: Observable<UserStatus>;
  readonly typingIndicators$: Observable<TypingIndicator>;
  readonly notifications$: Observable<NotificationEvent>;
  
  // ๐Ÿ“Š Derived streams
  readonly onlineUsers$: Observable<UserStatus[]>;
  readonly messageHistory$: BehaviorSubject<ChatMessage[]> = new BehaviorSubject<ChatMessage[]>([]);
  
  constructor(private wsUrl: string = 'ws://localhost:8080/chat') {
    this.socket$ = webSocket(this.wsUrl);
    
    // ๐ŸŽฏ Set up message streams
    this.messages$ = this.createMessageStream();
    this.userStatuses$ = this.createUserStatusStream();
    this.typingIndicators$ = this.createTypingStream();
    this.notifications$ = this.createNotificationStream();
    this.onlineUsers$ = this.createOnlineUsersStream();
    
    // ๐Ÿ”„ Initialize connection
    this.initializeConnection();
  }
  
  // ๐Ÿ“ค Send message
  sendMessage(content: string, type: ChatMessage['type'] = 'text'): void {
    const message = {
      action: 'send_message',
      data: {
        content,
        type,
        timestamp: new Date().toISOString()
      }
    };
    
    console.log('๐Ÿ“ค Sending message:', content);
    this.socket$.next(message);
  }
  
  // โŒจ๏ธ Send typing indicator
  sendTypingIndicator(isTyping: boolean): void {
    this.socket$.next({
      action: 'typing',
      data: { isTyping }
    });
  }
  
  // ๐Ÿ”— Create message stream
  private createMessageStream(): Observable<ChatMessage> {
    return this.socket$.pipe(
      filter((data: any) => data.type === 'message'),
      map((data: any) => ({
        id: data.id,
        userId: data.userId,
        username: data.username,
        content: data.content,
        timestamp: new Date(data.timestamp),
        type: data.messageType || 'text'
      } as ChatMessage)),
      tap(message => {
        console.log(`๐Ÿ’ฌ New message from ${message.username}: ${message.content}`);
        // ๐Ÿ“š Update message history
        const current = this.messageHistory$.value;
        this.messageHistory$.next([...current, message]);
      }),
      takeUntil(this.destroy$)
    );
  }
  
  // ๐Ÿ‘ฅ Create user status stream
  private createUserStatusStream(): Observable<UserStatus> {
    return this.socket$.pipe(
      filter((data: any) => data.type === 'user_status'),
      map((data: any) => ({
        userId: data.userId,
        username: data.username,
        status: data.status,
        lastSeen: new Date(data.lastSeen)
      } as UserStatus)),
      tap(status => console.log(`๐Ÿ‘ค ${status.username} is ${status.status}`)),
      takeUntil(this.destroy$)
    );
  }
  
  // โŒจ๏ธ Create typing indicator stream
  private createTypingStream(): Observable<TypingIndicator> {
    return this.socket$.pipe(
      filter((data: any) => data.type === 'typing'),
      map((data: any) => ({
        userId: data.userId,
        username: data.username,
        isTyping: data.isTyping
      } as TypingIndicator)),
      // ๐Ÿ• Auto-clear typing after 3 seconds
      switchMap(indicator => 
        indicator.isTyping 
          ? of(indicator).pipe(
              concatMap(ind => [
                ind,
                of({ ...ind, isTyping: false }).pipe(delay(3000))
              ])
            )
          : of(indicator)
      ),
      distinctUntilChanged((a, b) => 
        a.userId === b.userId && a.isTyping === b.isTyping
      ),
      takeUntil(this.destroy$)
    );
  }
  
  // ๐Ÿ”” Create notification stream
  private createNotificationStream(): Observable<NotificationEvent> {
    return merge(
      // ๐Ÿ’ฌ Message notifications
      this.messages$.pipe(
        map(message => ({
          id: `notif-${message.id}`,
          type: 'message' as const,
          title: `New message from ${message.username}`,
          message: message.content.substring(0, 50) + (message.content.length > 50 ? '...' : ''),
          timestamp: new Date(),
          priority: 'medium' as const
        }))
      ),
      
      // ๐Ÿ‘ค User join/leave notifications
      this.userStatuses$.pipe(
        filter(status => status.status === 'online' || status.status === 'offline'),
        map(status => ({
          id: `notif-${Date.now()}-${status.userId}`,
          type: status.status === 'online' ? 'user_joined' : 'user_left' as const,
          title: status.status === 'online' ? 'User Joined' : 'User Left',
          message: `${status.username} ${status.status === 'online' ? 'joined' : 'left'} the chat`,
          timestamp: new Date(),
          priority: 'low' as const
        }))
      )
    ).pipe(
      tap(notification => console.log(`๐Ÿ”” ${notification.title}: ${notification.message}`)),
      takeUntil(this.destroy$)
    );
  }
  
  // ๐Ÿ‘ฅ Create online users stream
  private createOnlineUsersStream(): Observable<UserStatus[]> {
    return this.userStatuses$.pipe(
      // ๐Ÿ“Š Accumulate user statuses
      scan((users, status) => {
        const updated = users.filter(u => u.userId !== status.userId);
        if (status.status !== 'offline') {
          updated.push(status);
        }
        return updated;
      }, [] as UserStatus[]),
      distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
      tap(users => console.log(`๐Ÿ‘ฅ Online users: ${users.length}`)),
      takeUntil(this.destroy$)
    );
  }
  
  // ๐Ÿ”„ Initialize connection
  private initializeConnection(): void {
    console.log('๐Ÿ”— Connecting to chat service...');
    
    // ๐Ÿ“ก Handle connection events
    this.socket$.subscribe({
      next: (message) => {
        if (message.type === 'connection') {
          console.log('โœ… Connected to chat service');
        }
      },
      error: (error) => {
        console.error('๐Ÿ’ฅ Chat service error:', error);
        this.handleReconnection();
      },
      complete: () => {
        console.log('๐Ÿ”š Chat service connection closed');
      }
    });
  }
  
  // ๐Ÿ”„ Handle reconnection
  private handleReconnection(): void {
    console.log('๐Ÿ”„ Attempting to reconnect...');
    
    // ๐Ÿ• Exponential backoff reconnection
    of(null).pipe(
      delay(1000),
      retry(5),
      takeUntil(this.destroy$)
    ).subscribe(() => {
      this.socket$ = webSocket(this.wsUrl);
      this.initializeConnection();
    });
  }
  
  // ๐Ÿงน Cleanup
  destroy(): void {
    console.log('๐Ÿงน Destroying chat service...');
    this.destroy$.next();
    this.destroy$.complete();
    this.socket$.complete();
  }
}

// ๐ŸŽฎ Advanced reactive game loop
class ReactiveGameEngine {
  private gameLoop$: Observable<number>;
  private input$ = new Subject<GameInput>();
  private gameState$ = new BehaviorSubject<GameState>(this.getInitialState());
  
  // ๐ŸŽฏ Game streams
  readonly entities$: Observable<GameEntity[]>;
  readonly score$: Observable<number>;
  readonly fps$: Observable<number>;
  
  constructor() {
    // ๐Ÿ”„ 60 FPS game loop
    this.gameLoop$ = interval(1000 / 60).pipe(
      map(() => performance.now()),
      share()
    );
    
    this.entities$ = this.createEntityStream();
    this.score$ = this.createScoreStream();
    this.fps$ = this.createFPSStream();
    
    this.initializeGameLoop();
  }
  
  // ๐ŸŽฎ Handle input
  handleInput(input: GameInput): void {
    this.input$.next(input);
  }
  
  // ๐ŸŽฏ Create entity update stream
  private createEntityStream(): Observable<GameEntity[]> {
    return this.gameLoop$.pipe(
      withLatestFrom(this.gameState$, this.input$),
      map(([timestamp, state, input]) => this.updateEntities(state.entities, input, timestamp)),
      tap(entities => {
        // ๐Ÿ“Š Update game state
        const currentState = this.gameState$.value;
        this.gameState$.next({
          ...currentState,
          entities,
          lastUpdate: Date.now()
        });
      })
    );
  }
  
  // ๐Ÿ“Š Create score stream
  private createScoreStream(): Observable<number> {
    return this.gameState$.pipe(
      map(state => state.score),
      distinctUntilChanged(),
      tap(score => console.log(`๐Ÿ† Score: ${score}`))
    );
  }
  
  // ๐Ÿ“ˆ Create FPS monitoring stream
  private createFPSStream(): Observable<number> {
    return this.gameLoop$.pipe(
      bufferTime(1000),
      map(timestamps => timestamps.length),
      tap(fps => console.log(`๐Ÿ“ˆ FPS: ${fps}`))
    );
  }
  
  // ๐Ÿ”„ Initialize game loop
  private initializeGameLoop(): void {
    this.entities$.subscribe();
    this.score$.subscribe();
    this.fps$.subscribe();
  }
  
  // ๐ŸŽฏ Update entities (simplified)
  private updateEntities(entities: GameEntity[], input: GameInput, timestamp: number): GameEntity[] {
    return entities.map(entity => ({
      ...entity,
      position: {
        x: entity.position.x + entity.velocity.x,
        y: entity.position.y + entity.velocity.y
      },
      lastUpdate: timestamp
    }));
  }
  
  // ๐ŸŽฎ Get initial game state
  private getInitialState(): GameState {
    return {
      entities: [],
      score: 0,
      level: 1,
      isActive: true,
      lastUpdate: Date.now()
    };
  }
}

// ๐Ÿ—๏ธ Game type definitions
interface GameEntity {
  id: string;
  type: 'player' | 'enemy' | 'projectile' | 'powerup';
  position: { x: number; y: number };
  velocity: { x: number; y: number };
  health: number;
  lastUpdate: number;
}

interface GameInput {
  type: 'move' | 'shoot' | 'pause';
  data: any;
  timestamp: number;
}

interface GameState {
  entities: GameEntity[];
  score: number;
  level: number;
  isActive: boolean;
  lastUpdate: number;
}

๐Ÿ› ๏ธ Advanced RxJS Patterns

Letโ€™s explore sophisticated reactive patterns:

// ๐Ÿ› ๏ธ Advanced RxJS patterns and utilities
// Production-ready reactive programming techniques

// ๐Ÿ”„ Retry with exponential backoff
const retryWithBackoff = <T>(maxRetries: number = 3, baseDelay: number = 1000) => {
  return (source: Observable<T>) => source.pipe(
    retryWhen(errors => 
      errors.pipe(
        scan((acc, error) => ({ count: acc.count + 1, error }), { count: 0, error: null }),
        switchMap(({ count, error }) => {
          if (count <= maxRetries) {
            const delay = baseDelay * Math.pow(2, count - 1);
            console.log(`๐Ÿ”„ Retry attempt ${count}/${maxRetries} in ${delay}ms`);
            return timer(delay);
          } else {
            console.error(`๐Ÿ’ฅ Max retries (${maxRetries}) reached`);
            return throwError(error);
          }
        })
      )
    )
  );
};

// ๐Ÿ• Time-based window operations
const createTimeWindowOperator = <T>(windowDuration: number) => {
  return (source: Observable<T>) => source.pipe(
    bufferTime(windowDuration),
    filter(buffer => buffer.length > 0),
    map(buffer => ({
      items: buffer,
      count: buffer.length,
      windowStart: new Date(Date.now() - windowDuration),
      windowEnd: new Date()
    }))
  );
};

// ๐Ÿ“Š Rate limiting operator
const rateLimit = <T>(maxEmissions: number, timeWindow: number) => {
  return (source: Observable<T>) => {
    let emissions: number[] = [];
    
    return source.pipe(
      filter(() => {
        const now = Date.now();
        
        // ๐Ÿงน Remove old emissions outside the time window
        emissions = emissions.filter(time => now - time < timeWindow);
        
        // ๐Ÿšฆ Check if we can emit
        if (emissions.length < maxEmissions) {
          emissions.push(now);
          return true;
        } else {
          console.warn(`๐Ÿšฆ Rate limit exceeded: ${maxEmissions}/${timeWindow}ms`);
          return false;
        }
      })
    );
  };
};

// ๐Ÿ”„ State management with reactive patterns
class ReactiveStateManager<T> {
  private state$ = new BehaviorSubject<T>(this.initialState);
  private actions$ = new Subject<Action<T>>();
  
  // ๐Ÿ“ก Public state observable
  readonly state: Observable<T> = this.state$.asObservable();
  
  constructor(private initialState: T, private reducer: Reducer<T>) {
    // ๐Ÿ”„ Process actions and update state
    this.actions$.pipe(
      scan((state, action) => {
        console.log(`๐Ÿ”„ Processing action: ${action.type}`);
        const newState = this.reducer(state, action);
        console.log('๐Ÿ“Š State updated:', { 
          action: action.type,
          hasChanges: JSON.stringify(state) !== JSON.stringify(newState)
        });
        return newState;
      }, this.initialState)
    ).subscribe(this.state$);
  }
  
  // ๐Ÿ“ค Dispatch action
  dispatch(action: Action<T>): void {
    console.log(`๐Ÿ“ค Dispatching action: ${action.type}`);
    this.actions$.next(action);
  }
  
  // ๐Ÿ“Š Get current state
  getCurrentState(): T {
    return this.state$.value;
  }
  
  // ๐Ÿ” Select specific state properties
  select<K extends keyof T>(key: K): Observable<T[K]> {
    return this.state$.pipe(
      map(state => state[key]),
      distinctUntilChanged()
    );
  }
}

// ๐Ÿ—๏ธ State management types
interface Action<T> {
  type: string;
  payload?: any;
}

type Reducer<T> = (state: T, action: Action<T>) => T;

// ๐Ÿ“Š Performance monitoring with RxJS
class PerformanceMonitor {
  private metrics$ = new Subject<PerformanceMetric>();
  
  // ๐Ÿ“ˆ Aggregated performance data
  readonly performanceStats$ = this.metrics$.pipe(
    groupBy(metric => metric.type),
    mergeMap(group => 
      group.pipe(
        scan((acc, metric) => ({
          type: metric.type,
          count: acc.count + 1,
          totalDuration: acc.totalDuration + metric.duration,
          averageDuration: (acc.totalDuration + metric.duration) / (acc.count + 1),
          minDuration: Math.min(acc.minDuration, metric.duration),
          maxDuration: Math.max(acc.maxDuration, metric.duration),
          lastUpdate: new Date()
        }), {
          type: group.key,
          count: 0,
          totalDuration: 0,
          averageDuration: 0,
          minDuration: Infinity,
          maxDuration: -Infinity,
          lastUpdate: new Date()
        })
      )
    ),
    tap(stats => console.log(`๐Ÿ“Š Performance stats for ${stats.type}:`, {
      count: stats.count,
      avg: stats.averageDuration.toFixed(2) + 'ms',
      min: stats.minDuration.toFixed(2) + 'ms',
      max: stats.maxDuration.toFixed(2) + 'ms'
    }))
  );
  
  // ๐Ÿ“ Measure operation performance
  measure<T>(operation: () => Observable<T>, type: string): Observable<T> {
    const startTime = performance.now();
    
    return operation().pipe(
      finalize(() => {
        const duration = performance.now() - startTime;
        this.metrics$.next({
          type,
          duration,
          timestamp: new Date()
        });
      })
    );
  }
  
  // ๐Ÿ“ค Record custom metric
  recordMetric(type: string, duration: number): void {
    this.metrics$.next({
      type,
      duration,
      timestamp: new Date()
    });
  }
}

interface PerformanceMetric {
  type: string;
  duration: number;
  timestamp: Date;
}

// ๐Ÿ”„ Auto-retry observable creator
const createRetryableObservable = <T>(
  operation: () => Observable<T>,
  options: {
    maxRetries?: number;
    retryDelay?: number;
    retryCondition?: (error: any) => boolean;
  } = {}
): Observable<T> => {
  const { maxRetries = 3, retryDelay = 1000, retryCondition = () => true } = options;
  
  return defer(() => operation()).pipe(
    retryWhen(errors =>
      errors.pipe(
        scan((acc, error) => ({ count: acc.count + 1, error }), { count: 0, error: null }),
        switchMap(({ count, error }) => {
          if (count <= maxRetries && retryCondition(error)) {
            console.log(`๐Ÿ”„ Retrying operation (attempt ${count}/${maxRetries})`);
            return timer(retryDelay * count);
          } else {
            console.error('๐Ÿ’ฅ Operation failed after retries');
            return throwError(error);
          }
        })
      )
    ),
    tap(() => console.log('โœ… Operation succeeded')),
    catchError(error => {
      console.error('๐Ÿ’ฅ Operation finally failed:', error.message);
      return throwError(error);
    })
  );
};

๐Ÿ’ก Best Practices & Performance

๐ŸŽฏ RxJS Best Practices

Here are essential patterns for production RxJS code:

// โœ… Best practices for RxJS in production
// Memory management, performance, and maintainability

// โœ… DO: Always unsubscribe to prevent memory leaks
class ComponentWithSubscriptions {
  private destroy$ = new Subject<void>();
  
  constructor() {
    // โœ… Use takeUntil for automatic cleanup
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(value => {
      console.log('Timer:', value);
    });
    
    // โœ… Multiple subscriptions with same cleanup
    merge(
      fromEvent(window, 'resize'),
      fromEvent(window, 'scroll')
    ).pipe(
      takeUntil(this.destroy$)
    ).subscribe(event => {
      console.log('Window event:', event.type);
    });
  }
  
  // ๐Ÿงน Cleanup method
  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// โœ… DO: Use shareReplay for expensive operations
const expensiveOperation$ = defer(() => {
  console.log('๐Ÿ”„ Performing expensive calculation...');
  return timer(2000).pipe(
    map(() => Math.random() * 1000)
  );
}).pipe(
  shareReplay(1) // โœ… Cache result for multiple subscribers
);

// โŒ DON'T: Create new observables in templates or frequently called functions
const badExample = () => {
  // โŒ Creates new observable on every call
  return interval(1000).pipe(map(x => x * 2));
};

// โœ… DO: Create observables once and reuse
const goodExample$ = interval(1000).pipe(
  map(x => x * 2),
  shareReplay(1)
);

// โœ… DO: Use appropriate operators for performance
const efficientDataProcessing$ = of(...Array.from({ length: 10000 }, (_, i) => i)).pipe(
  // โœ… Use bufferCount for batch processing
  bufferCount(100),
  
  // โœ… Use concatMap for ordered processing
  concatMap(batch => 
    from(batch).pipe(
      map(x => x * 2),
      toArray()
    )
  ),
  
  // โœ… Use scan for cumulative operations
  scan((acc, batch) => [...acc, ...batch], [] as number[]),
  
  // โœ… Use distinctUntilChanged to avoid unnecessary updates
  distinctUntilChanged((a, b) => a.length === b.length)
);

// โœ… DO: Handle errors gracefully
const robustApiCall$ = (url: string) => {
  return from(fetch(url)).pipe(
    switchMap(response => {
      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }
      return response.json();
    }),
    
    // โœ… Retry with exponential backoff
    retryWithBackoff(3, 1000),
    
    // โœ… Provide fallback value
    catchError(error => {
      console.error('API call failed:', error.message);
      return of({ data: null, error: error.message });
    }),
    
    // โœ… Timeout to prevent hanging
    timeout(10000)
  );
};

// โœ… DO: Use subjects appropriately
class NotificationService {
  // โœ… Use Subject for events
  private notificationSubject = new Subject<Notification>();
  
  // โœ… Use BehaviorSubject for state
  private isConnectedSubject = new BehaviorSubject<boolean>(false);
  
  // โœ… Use ReplaySubject for history
  private messageHistorySubject = new ReplaySubject<Message>(50);
  
  readonly notifications$ = this.notificationSubject.asObservable();
  readonly isConnected$ = this.isConnectedSubject.asObservable();
  readonly messageHistory$ = this.messageHistorySubject.asObservable();
  
  sendNotification(notification: Notification): void {
    this.notificationSubject.next(notification);
  }
  
  updateConnectionStatus(connected: boolean): void {
    this.isConnectedSubject.next(connected);
  }
  
  addMessage(message: Message): void {
    this.messageHistorySubject.next(message);
  }
}

// ๐Ÿ”ง Memory leak detection utility
class SubscriptionTracker {
  private subscriptions = new Map<string, Subscription>();
  private subscriptionCounts = new Map<string, number>();
  
  track(name: string, subscription: Subscription): Subscription {
    // ๐Ÿงน Clean up existing subscription
    if (this.subscriptions.has(name)) {
      this.subscriptions.get(name)!.unsubscribe();
    }
    
    // ๐Ÿ“Š Track subscription count
    const count = (this.subscriptionCounts.get(name) || 0) + 1;
    this.subscriptionCounts.set(name, count);
    
    console.log(`๐Ÿ“Š Tracking subscription: ${name} (count: ${count})`);
    
    // ๐ŸŽฏ Wrap subscription for automatic cleanup tracking
    const wrappedSubscription = new Subscription(() => {
      subscription.unsubscribe();
      this.subscriptions.delete(name);
      console.log(`๐Ÿงน Cleaned up subscription: ${name}`);
    });
    
    this.subscriptions.set(name, wrappedSubscription);
    return wrappedSubscription;
  }
  
  unsubscribeAll(): void {
    console.log(`๐Ÿงน Cleaning up ${this.subscriptions.size} subscriptions`);
    
    for (const [name, subscription] of this.subscriptions) {
      subscription.unsubscribe();
      console.log(`๐Ÿงน Unsubscribed: ${name}`);
    }
    
    this.subscriptions.clear();
  }
  
  getReport(): SubscriptionReport {
    return {
      activeSubscriptions: this.subscriptions.size,
      subscriptionHistory: Object.fromEntries(this.subscriptionCounts),
      activeSubscriptionNames: Array.from(this.subscriptions.keys())
    };
  }
}

interface SubscriptionReport {
  activeSubscriptions: number;
  subscriptionHistory: Record<string, number>;
  activeSubscriptionNames: string[];
}

// ๐Ÿ“Š Performance testing utility
const benchmarkObservable = <T>(
  name: string,
  observable: Observable<T>,
  expectedItems: number = 1000
): Observable<BenchmarkResult> => {
  return new Observable<BenchmarkResult>(observer => {
    const startTime = performance.now();
    let itemCount = 0;
    let totalSize = 0;
    
    console.log(`๐Ÿ“Š Starting benchmark: ${name}`);
    
    const subscription = observable.subscribe({
      next: (value) => {
        itemCount++;
        totalSize += JSON.stringify(value).length;
        
        if (itemCount % 100 === 0) {
          console.log(`๐Ÿ“ˆ Processed ${itemCount}/${expectedItems} items`);
        }
      },
      error: (error) => {
        const duration = performance.now() - startTime;
        observer.next({
          name,
          duration,
          itemCount,
          totalSize,
          averageItemSize: totalSize / itemCount,
          itemsPerSecond: itemCount / (duration / 1000),
          success: false,
          error: error.message
        });
        observer.complete();
      },
      complete: () => {
        const duration = performance.now() - startTime;
        const result: BenchmarkResult = {
          name,
          duration,
          itemCount,
          totalSize,
          averageItemSize: totalSize / itemCount,
          itemsPerSecond: itemCount / (duration / 1000),
          success: true
        };
        
        console.log(`โœ… Benchmark complete: ${name}`, result);
        observer.next(result);
        observer.complete();
      }
    });
    
    return () => subscription.unsubscribe();
  });
};

interface BenchmarkResult {
  name: string;
  duration: number;
  itemCount: number;
  totalSize: number;
  averageItemSize: number;
  itemsPerSecond: number;
  success: boolean;
  error?: string;
}

๐Ÿ† Summary

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered RxJS observables and reactive programming in TypeScript! Hereโ€™s what youโ€™ve accomplished:

๐ŸŽฏ Key Takeaways

  1. Reactive Programming ๐Ÿ“ก: Build applications that respond elegantly to data changes
  2. Stream Composition ๐ŸŒŠ: Combine and transform data streams with powerful operators
  3. Real-Time Applications โšก: Create live, responsive user experiences
  4. Memory Management ๐Ÿงน: Prevent memory leaks with proper subscription handling
  5. Performance Optimization ๐Ÿš€: Build efficient reactive systems that scale

๐Ÿ› ๏ธ What You Can Build

  • Real-Time Dashboards ๐Ÿ“Š: Live data visualization with streaming updates
  • Chat Applications ๐Ÿ’ฌ: WebSocket-based messaging with reactive patterns
  • Search Interfaces ๐Ÿ”: Debounced, cached search with instant results
  • Game Engines ๐ŸŽฎ: Reactive game loops and state management
  • IoT Monitoring ๐Ÿ“ก: Real-time sensor data processing and alerts

๐Ÿš€ Next Steps

Ready to take your reactive skills even further? Consider exploring:

  • Node.js Streams ๐ŸŒŠ: Server-side streaming data processing
  • WebWorkers ๐Ÿ‘ฅ: Reactive patterns in parallel processing
  • State Management ๐Ÿ—„๏ธ: Advanced reactive state patterns with NgRx/Redux
  • Real-Time APIs ๐Ÿ“ก: WebSocket and Server-Sent Events integration

Youโ€™re now equipped to build sophisticated reactive applications that handle complex data flows with elegance and performance! ๐ŸŽฏ Keep streaming! ๐Ÿ“กโœจ