+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 108 of 355

๐ŸŒŠ Streams in TypeScript: Node.js Streams Mastery

Master Node.js streams in TypeScript for efficient data processing, file handling, and high-performance server applications ๐Ÿš€

๐Ÿš€Intermediate
20 min read

Prerequisites

  • Basic TypeScript syntax and types ๐Ÿ“
  • Understanding of Node.js fundamentals ๐Ÿ”ง
  • Async/await and Promise concepts โšก

What you'll learn

  • Master Node.js streams for efficient data processing and memory management ๐ŸŒŠ
  • Build scalable streaming applications with transform and pipeline patterns ๐Ÿ”„
  • Handle large datasets and files without memory overflow ๐Ÿ’พ
  • Create production-ready streaming APIs and data processors ๐Ÿญ

๐ŸŽฏ Introduction

Welcome to the powerful world of Node.js streams with TypeScript! ๐ŸŽ‰ In this guide, weโ€™ll explore how to process massive amounts of data efficiently, handle large files gracefully, and build high-performance applications that scale beautifully.

Youโ€™ll discover how to transform data processing from memory-hungry operations into elegant, efficient streams that can handle gigabytes of data with minimal memory usage. Whether youโ€™re building file processors ๐Ÿ“, real-time data pipelines ๐Ÿ”„, or high-throughput APIs ๐ŸŒ, mastering Node.js streams is essential for creating scalable TypeScript applications.

By the end of this tutorial, youโ€™ll be streaming data like a performance optimization wizard! ๐ŸŒŠ Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Node.js Streams

๐Ÿค” What are Streams?

Streams are like having a smart assembly line for your data ๐Ÿญ. Think of them as pipes where data flows through in chunks, allowing you to process massive amounts of information without loading everything into memory at once!

In TypeScript with Node.js, streams provide:

  • โœจ Memory efficiency - process large files without memory overflow
  • ๐Ÿš€ Performance - handle data as it arrives, not after everything loads
  • ๐Ÿ›ก๏ธ Backpressure handling - automatic flow control when processing is slow
  • ๐Ÿ“ฆ Composability - chain operations like building blocks

๐Ÿ’ก Why Use Streams?

Hereโ€™s why streams are architectural game-changers:

  1. Memory Efficiency ๐Ÿ’พ: Process 10GB files with only 64KB of memory
  2. Real-Time Processing โšก: Start working on data before the full download completes
  3. Scalability ๐Ÿ“ˆ: Handle thousands of concurrent file operations
  4. Composability ๐Ÿ”—: Chain transformations like functional programming
  5. Backpressure ๐Ÿšฐ: Automatic flow control prevents memory overflow

Real-world example: Processing a 5GB CSV file to extract specific records - streams let you process line-by-line without loading the entire file into memory! ๐Ÿ“Š

๐Ÿ”ง Basic Stream Types and Patterns

๐Ÿ“ Readable Streams - Data Sources

Letโ€™s start with fundamental stream patterns:

// ๐ŸŽฏ Basic readable stream patterns
// TypeScript provides excellent type safety for stream operations

import { Readable, Writable, Transform, pipeline } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
import { promisify } from 'util';

// ๐Ÿ”„ Custom readable stream with type safety
class NumberGenerator extends Readable {
  private current: number = 1;
  private max: number;
  
  constructor(max: number) {
    super({ objectMode: true });
    this.max = max;
    console.log(`๐Ÿ”ข Number generator created (1 to ${max})`);
  }
  
  _read(): void {
    if (this.current <= this.max) {
      console.log(`๐Ÿ“ค Generating number: ${this.current}`);
      this.push(this.current++);
    } else {
      console.log('๐Ÿ Number generation complete');
      this.push(null); // Signal end of stream
    }
  }
}

// ๐Ÿ“ File reading stream with error handling
const createFileReader = (filePath: string): Readable => {
  console.log(`๐Ÿ“– Creating file reader for: ${filePath}`);
  
  const stream = createReadStream(filePath, {
    encoding: 'utf8',
    highWaterMark: 16 * 1024 // 16KB chunks for memory efficiency
  });
  
  stream.on('open', () => {
    console.log(`โœ… File opened successfully: ${filePath}`);
  });
  
  stream.on('error', (error) => {
    console.error(`๐Ÿ’ฅ File read error: ${error.message}`);
  });
  
  return stream;
};

// ๐ŸŒ HTTP response as readable stream
import { IncomingMessage } from 'http';

const processHttpStream = (response: IncomingMessage): void => {
  console.log('๐ŸŒ Processing HTTP response stream');
  
  let totalBytes = 0;
  
  response.on('data', (chunk: Buffer) => {
    totalBytes += chunk.length;
    console.log(`๐Ÿ“ฅ Received chunk: ${chunk.length} bytes (total: ${totalBytes})`);
  });
  
  response.on('end', () => {
    console.log(`๐Ÿ HTTP stream complete: ${totalBytes} total bytes`);
  });
  
  response.on('error', (error) => {
    console.error(`๐Ÿ’ฅ HTTP stream error: ${error.message}`);
  });
};

// ๐Ÿ“Š Example usage
const numberStream = new NumberGenerator(5);
numberStream.on('data', (number) => {
  console.log(`๐Ÿ”ข Received number: ${number}`);
});

numberStream.on('end', () => {
  console.log('โœ… Number stream ended');
});

๐Ÿ”„ Transform Streams - Data Processing

Transform streams are where the magic happens:

// ๐ŸŽฏ Advanced transform stream patterns
// Process data as it flows through the pipeline

import { Transform, TransformCallback } from 'stream';

// ๐Ÿ”„ JSON line processor with type safety
interface LogEntry {
  timestamp: string;
  level: 'info' | 'warn' | 'error';
  message: string;
  userId?: string;
  requestId?: string;
}

class JSONLineProcessor extends Transform {
  constructor() {
    super({
      writableObjectMode: false,
      readableObjectMode: true,
      highWaterMark: 1000 // Buffer up to 1000 objects
    });
    console.log('๐Ÿ”ง JSON line processor initialized');
  }
  
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    const lines = chunk.toString().split('\n').filter(line => line.trim());
    
    for (const line of lines) {
      try {
        const logEntry: LogEntry = JSON.parse(line);
        
        // ๐Ÿ” Enhanced log entry with metadata
        const processedEntry = {
          ...logEntry,
          processed: true,
          processingTime: new Date().toISOString(),
          severity: this.calculateSeverity(logEntry.level)
        };
        
        console.log(`โœจ Processed log: ${logEntry.level} - ${logEntry.message.substring(0, 50)}...`);
        this.push(processedEntry);
        
      } catch (error) {
        console.error(`๐Ÿ’ฅ Invalid JSON line: ${line.substring(0, 100)}...`);
        // Continue processing other lines instead of failing
      }
    }
    
    callback();
  }
  
  private calculateSeverity(level: LogEntry['level']): number {
    const severityMap = { info: 1, warn: 2, error: 3 };
    return severityMap[level] || 0;
  }
}

// ๐Ÿงฎ Data aggregation transform
class DataAggregator extends Transform {
  private stats = {
    totalRecords: 0,
    errorCount: 0,
    warningCount: 0,
    infoCount: 0,
    userActions: new Map<string, number>()
  };
  
  constructor() {
    super({ objectMode: true });
    console.log('๐Ÿ“Š Data aggregator initialized');
  }
  
  _transform(chunk: LogEntry & { processed: boolean }, encoding: BufferEncoding, callback: TransformCallback): void {
    this.stats.totalRecords++;
    
    // ๐Ÿ“ˆ Update statistics
    switch (chunk.level) {
      case 'error':
        this.stats.errorCount++;
        break;
      case 'warn':
        this.stats.warningCount++;
        break;
      case 'info':
        this.stats.infoCount++;
        break;
    }
    
    // ๐Ÿ‘ค Track user activity
    if (chunk.userId) {
      const current = this.stats.userActions.get(chunk.userId) || 0;
      this.stats.userActions.set(chunk.userId, current + 1);
    }
    
    // ๐Ÿ”„ Pass through the data
    this.push(chunk);
    
    // ๐Ÿ“Š Periodic stats reporting
    if (this.stats.totalRecords % 1000 === 0) {
      console.log(`๐Ÿ“Š Processed ${this.stats.totalRecords} records`);
      console.log(`   Errors: ${this.stats.errorCount}, Warnings: ${this.stats.warningCount}, Info: ${this.stats.infoCount}`);
      console.log(`   Active users: ${this.stats.userActions.size}`);
    }
    
    callback();
  }
  
  _flush(callback: TransformCallback): void {
    // ๐Ÿ“‹ Final statistics report
    const report = {
      summary: {
        totalRecords: this.stats.totalRecords,
        errorRate: (this.stats.errorCount / this.stats.totalRecords * 100).toFixed(2) + '%',
        topUsers: Array.from(this.stats.userActions.entries())
          .sort((a, b) => b[1] - a[1])
          .slice(0, 5)
      },
      timestamp: new Date().toISOString()
    };
    
    console.log('๐Ÿ“‹ Final Report:', JSON.stringify(report, null, 2));
    this.push(report);
    callback();
  }
}

// ๐ŸŽจ CSV to JSON transform with validation
class CSVTransform extends Transform {
  private headers: string[] = [];
  private isFirstRow = true;
  
  constructor() {
    super({
      writableObjectMode: false,
      readableObjectMode: true
    });
    console.log('๐Ÿ“„ CSV transformer initialized');
  }
  
  _transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
    const lines = chunk.toString().split('\n').filter(line => line.trim());
    
    for (const line of lines) {
      if (this.isFirstRow) {
        this.headers = line.split(',').map(h => h.trim());
        this.isFirstRow = false;
        console.log(`๐Ÿ“‹ CSV headers: ${this.headers.join(', ')}`);
        continue;
      }
      
      const values = line.split(',').map(v => v.trim());
      
      if (values.length === this.headers.length) {
        const record: Record<string, string> = {};
        this.headers.forEach((header, index) => {
          record[header] = values[index];
        });
        
        console.log(`โœ… CSV record processed: ${record[this.headers[0]]}`);
        this.push(record);
      } else {
        console.error(`โŒ Invalid CSV row: expected ${this.headers.length} columns, got ${values.length}`);
      }
    }
    
    callback();
  }
}

๐Ÿš€ Advanced Stream Patterns

๐Ÿ”„ Pipeline Operations and Error Handling

Letโ€™s explore sophisticated stream patterns:

// ๐ŸŽฏ Advanced pipeline patterns with comprehensive error handling
// Building production-ready streaming applications

import { pipeline, Readable, Transform, Writable } from 'stream';
import { promisify } from 'util';
import { createGzip, createGunzip } from 'zlib';

const pipelineAsync = promisify(pipeline);

// ๐Ÿญ Production data processing pipeline
class DataProcessingPipeline {
  private processedCount = 0;
  private errorCount = 0;
  private startTime = Date.now();
  
  async processLargeDataset(
    inputPath: string,
    outputPath: string,
    options: ProcessingOptions = {}
  ): Promise<ProcessingResult> {
    console.log('๐Ÿญ Starting data processing pipeline');
    
    const source = this.createSource(inputPath);
    const validator = this.createValidator();
    const transformer = this.createTransformer(options);
    const aggregator = this.createAggregator();
    const compressor = createGzip();
    const destination = this.createDestination(outputPath);
    
    try {
      await pipelineAsync(
        source,
        validator,
        transformer,
        aggregator,
        compressor,
        destination
      );
      
      return this.generateReport();
      
    } catch (error) {
      console.error('๐Ÿ’ฅ Pipeline failed:', error);
      throw new PipelineError('Data processing failed', error as Error);
    }
  }
  
  private createSource(inputPath: string): Readable {
    console.log(`๐Ÿ“– Creating data source: ${inputPath}`);
    
    const stream = createReadStream(inputPath, {
      encoding: 'utf8',
      highWaterMark: 64 * 1024 // 64KB chunks for optimal performance
    });
    
    stream.on('error', (error) => {
      console.error(`๐Ÿ’ฅ Source error: ${error.message}`);
    });
    
    return stream;
  }
  
  private createValidator(): Transform {
    return new Transform({
      objectMode: true,
      transform: (chunk, encoding, callback) => {
        try {
          // ๐Ÿ” Data validation logic
          const data = typeof chunk === 'string' ? JSON.parse(chunk) : chunk;
          
          if (this.isValidRecord(data)) {
            this.processedCount++;
            callback(null, data);
          } else {
            this.errorCount++;
            console.warn(`โš ๏ธ Invalid record skipped: ${JSON.stringify(data).substring(0, 100)}`);
            callback(); // Skip invalid records
          }
        } catch (error) {
          this.errorCount++;
          console.error(`๐Ÿ’ฅ Validation error: ${error}`);
          callback(); // Skip malformed records
        }
      }
    });
  }
  
  private createTransformer(options: ProcessingOptions): Transform {
    return new Transform({
      objectMode: true,
      transform: (chunk, encoding, callback) => {
        try {
          // ๐Ÿ”„ Data transformation with business logic
          const transformed = {
            ...chunk,
            processed: true,
            processedAt: new Date().toISOString(),
            pipeline: 'v2.0',
            metadata: {
              processingNode: process.env.NODE_ID || 'unknown',
              batchId: options.batchId || 'default'
            }
          };
          
          // ๐ŸŽฏ Apply custom transformations
          if (options.enrichment) {
            transformed.enriched = this.enrichData(chunk);
          }
          
          if (options.normalization) {
            this.normalizeData(transformed);
          }
          
          callback(null, transformed);
          
        } catch (error) {
          console.error(`๐Ÿ’ฅ Transformation error: ${error}`);
          callback(error);
        }
      }
    });
  }
  
  private createAggregator(): Transform {
    const stats = new Map<string, number>();
    
    return new Transform({
      objectMode: true,
      transform: (chunk, encoding, callback) => {
        // ๐Ÿ“Š Real-time aggregation
        const category = chunk.category || 'unknown';
        stats.set(category, (stats.get(category) || 0) + 1);
        
        // ๐Ÿ“ˆ Periodic reporting
        if (this.processedCount % 10000 === 0) {
          console.log(`๐Ÿ“Š Progress: ${this.processedCount} records processed`);
          console.log('๐Ÿ“ˆ Category distribution:', Object.fromEntries(stats));
        }
        
        callback(null, JSON.stringify(chunk) + '\n');
      }
    });
  }
  
  private createDestination(outputPath: string): Writable {
    console.log(`๐Ÿ’พ Creating destination: ${outputPath}`);
    
    const stream = createWriteStream(outputPath);
    
    stream.on('error', (error) => {
      console.error(`๐Ÿ’ฅ Destination error: ${error.message}`);
    });
    
    stream.on('finish', () => {
      console.log(`โœ… Data written successfully: ${outputPath}`);
    });
    
    return stream;
  }
  
  private isValidRecord(data: any): boolean {
    return data && 
           typeof data === 'object' && 
           data.id && 
           data.timestamp &&
           new Date(data.timestamp).getTime() > 0;
  }
  
  private enrichData(data: any): EnrichmentData {
    return {
      geoLocation: this.getGeoLocation(data.ip),
      userAgent: this.parseUserAgent(data.userAgent),
      sessionData: this.getSessionData(data.sessionId)
    };
  }
  
  private normalizeData(data: any): void {
    if (data.email) data.email = data.email.toLowerCase();
    if (data.phone) data.phone = data.phone.replace(/\D/g, '');
    if (data.name) data.name = this.titleCase(data.name);
  }
  
  private generateReport(): ProcessingResult {
    const duration = Date.now() - this.startTime;
    
    return {
      processed: this.processedCount,
      errors: this.errorCount,
      duration,
      throughput: Math.round(this.processedCount / (duration / 1000)),
      successRate: ((this.processedCount / (this.processedCount + this.errorCount)) * 100).toFixed(2) + '%'
    };
  }
  
  private getGeoLocation(ip: string): any { return { country: 'US', city: 'Unknown' }; }
  private parseUserAgent(ua: string): any { return { browser: 'Unknown', os: 'Unknown' }; }
  private getSessionData(sessionId: string): any { return { duration: 0, pageViews: 0 }; }
  private titleCase(str: string): string { return str.replace(/\w\S*/g, txt => txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase()); }
}

// ๐Ÿ”ง Supporting types and interfaces
interface ProcessingOptions {
  batchId?: string;
  enrichment?: boolean;
  normalization?: boolean;
  compression?: boolean;
}

interface ProcessingResult {
  processed: number;
  errors: number;
  duration: number;
  throughput: number;
  successRate: string;
}

interface EnrichmentData {
  geoLocation: any;
  userAgent: any;
  sessionData: any;
}

class PipelineError extends Error {
  constructor(message: string, public cause: Error) {
    super(message);
    this.name = 'PipelineError';
  }
}

๐ŸŒ Real-Time Streaming Applications

Letโ€™s build complete streaming applications:

// ๐ŸŽฏ Real-time streaming server with WebSocket integration
// Complete production-ready streaming architecture

import { Readable, Transform, Writable } from 'stream';
import { WebSocketServer, WebSocket } from 'ws';
import { EventEmitter } from 'events';

// ๐Ÿ“ก Real-time data streaming server
class StreamingServer extends EventEmitter {
  private wss: WebSocketServer;
  private activeStreams = new Map<string, ActiveStream>();
  private metrics = new ServerMetrics();
  
  constructor(port: number) {
    super();
    this.wss = new WebSocketServer({ port });
    console.log(`๐Ÿš€ Streaming server started on port ${port}`);
    
    this.setupWebSocketHandlers();
    this.startMetricsReporting();
  }
  
  private setupWebSocketHandlers(): void {
    this.wss.on('connection', (ws: WebSocket, request) => {
      const streamId = this.generateStreamId();
      console.log(`๐Ÿ”Œ New client connected: ${streamId}`);
      
      // ๐ŸŽฏ Create dedicated stream for this client
      const clientStream = this.createClientStream(streamId, ws);
      this.activeStreams.set(streamId, clientStream);
      
      ws.on('message', (data) => {
        this.handleClientMessage(streamId, data);
      });
      
      ws.on('close', () => {
        console.log(`๐Ÿ”Œ Client disconnected: ${streamId}`);
        this.cleanupClientStream(streamId);
      });
      
      ws.on('error', (error) => {
        console.error(`๐Ÿ’ฅ WebSocket error for ${streamId}: ${error.message}`);
        this.cleanupClientStream(streamId);
      });
    });
  }
  
  private createClientStream(streamId: string, ws: WebSocket): ActiveStream {
    // ๐Ÿ“Š Real-time data source (simulated)
    const dataSource = this.createRealTimeDataSource();
    
    // ๐Ÿ”„ Transform stream for client-specific processing
    const transformer = new Transform({
      objectMode: true,
      transform: (chunk, encoding, callback) => {
        try {
          const processed = {
            ...chunk,
            streamId,
            clientTimestamp: Date.now(),
            sequence: this.metrics.getTotalMessages()
          };
          
          callback(null, processed);
        } catch (error) {
          console.error(`๐Ÿ’ฅ Transform error for ${streamId}: ${error}`);
          callback(error);
        }
      }
    });
    
    // ๐Ÿ“ค WebSocket sink
    const webSocketSink = new Writable({
      objectMode: true,
      write: (chunk, encoding, callback) => {
        try {
          if (ws.readyState === WebSocket.OPEN) {
            ws.send(JSON.stringify(chunk));
            this.metrics.incrementMessagesSent();
          }
          callback();
        } catch (error) {
          console.error(`๐Ÿ’ฅ WebSocket send error: ${error}`);
          callback(error);
        }
      }
    });
    
    // ๐Ÿ”— Connect the pipeline
    dataSource.pipe(transformer).pipe(webSocketSink);
    
    return {
      id: streamId,
      source: dataSource,
      transformer,
      sink: webSocketSink,
      ws,
      startTime: Date.now()
    };
  }
  
  private createRealTimeDataSource(): Readable {
    let messageCount = 0;
    
    const stream = new Readable({
      objectMode: true,
      read() {} // Will be pushed to externally
    });
    
    // ๐ŸŽฒ Simulate real-time data
    const interval = setInterval(() => {
      const data = this.generateSampleData(++messageCount);
      stream.push(data);
      
      // ๐Ÿ“Š Update metrics
      this.metrics.incrementMessagesGenerated();
    }, 100); // 10 messages per second
    
    stream.on('close', () => {
      clearInterval(interval);
    });
    
    return stream;
  }
  
  private generateSampleData(sequence: number): StreamingData {
    const types: DataType[] = ['user_action', 'system_event', 'performance_metric'];
    const type = types[Math.floor(Math.random() * types.length)];
    
    const baseData: StreamingData = {
      id: `msg_${sequence}_${Date.now()}`,
      type,
      timestamp: new Date().toISOString(),
      sequence,
      payload: {}
    };
    
    // ๐ŸŽฏ Type-specific data generation
    switch (type) {
      case 'user_action':
        baseData.payload = {
          userId: `user_${Math.floor(Math.random() * 1000)}`,
          action: ['click', 'scroll', 'type', 'navigate'][Math.floor(Math.random() * 4)],
          page: `/page/${Math.floor(Math.random() * 10)}`,
          duration: Math.floor(Math.random() * 5000)
        };
        break;
        
      case 'system_event':
        baseData.payload = {
          service: ['api', 'database', 'cache', 'queue'][Math.floor(Math.random() * 4)],
          level: ['info', 'warn', 'error'][Math.floor(Math.random() * 3)],
          message: `System event ${sequence}`,
          metadata: { server: `server_${Math.floor(Math.random() * 5)}` }
        };
        break;
        
      case 'performance_metric':
        baseData.payload = {
          metric: ['cpu', 'memory', 'disk', 'network'][Math.floor(Math.random() * 4)],
          value: Math.random() * 100,
          unit: '%',
          threshold: 80,
          status: Math.random() > 0.8 ? 'alert' : 'normal'
        };
        break;
    }
    
    return baseData;
  }
  
  private handleClientMessage(streamId: string, data: Buffer): void {
    try {
      const message = JSON.parse(data.toString());
      console.log(`๐Ÿ“จ Received from ${streamId}: ${message.type}`);
      
      // ๐ŸŽฏ Handle different message types
      switch (message.type) {
        case 'subscribe':
          this.handleSubscription(streamId, message.payload);
          break;
        case 'filter':
          this.handleFilterUpdate(streamId, message.payload);
          break;
        case 'pause':
          this.handleStreamPause(streamId);
          break;
        case 'resume':
          this.handleStreamResume(streamId);
          break;
      }
    } catch (error) {
      console.error(`๐Ÿ’ฅ Message parsing error: ${error}`);
    }
  }
  
  private handleSubscription(streamId: string, payload: any): void {
    console.log(`๐Ÿ“ก Client ${streamId} subscribed to: ${payload.topic}`);
    // Implementation for topic-based subscriptions
  }
  
  private handleFilterUpdate(streamId: string, payload: any): void {
    console.log(`๐Ÿ” Filter updated for ${streamId}: ${JSON.stringify(payload)}`);
    // Implementation for dynamic filtering
  }
  
  private handleStreamPause(streamId: string): void {
    const stream = this.activeStreams.get(streamId);
    if (stream) {
      stream.source.pause();
      console.log(`โธ๏ธ Stream paused: ${streamId}`);
    }
  }
  
  private handleStreamResume(streamId: string): void {
    const stream = this.activeStreams.get(streamId);
    if (stream) {
      stream.source.resume();
      console.log(`โ–ถ๏ธ Stream resumed: ${streamId}`);
    }
  }
  
  private cleanupClientStream(streamId: string): void {
    const stream = this.activeStreams.get(streamId);
    if (stream) {
      stream.source.destroy();
      stream.transformer.destroy();
      stream.sink.destroy();
      this.activeStreams.delete(streamId);
      this.metrics.decrementActiveConnections();
    }
  }
  
  private generateStreamId(): string {
    return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
  
  private startMetricsReporting(): void {
    setInterval(() => {
      const report = this.metrics.generateReport();
      console.log('๐Ÿ“Š Server Metrics:', JSON.stringify(report, null, 2));
      
      // ๐Ÿ“ก Broadcast metrics to all connected clients
      this.broadcastToAll({
        type: 'server_metrics',
        data: report,
        timestamp: new Date().toISOString()
      });
    }, 10000); // Every 10 seconds
  }
  
  private broadcastToAll(message: any): void {
    this.activeStreams.forEach((stream) => {
      if (stream.ws.readyState === WebSocket.OPEN) {
        stream.ws.send(JSON.stringify(message));
      }
    });
  }
}

// ๐Ÿ”ง Supporting types and classes
interface ActiveStream {
  id: string;
  source: Readable;
  transformer: Transform;
  sink: Writable;
  ws: WebSocket;
  startTime: number;
}

interface StreamingData {
  id: string;
  type: DataType;
  timestamp: string;
  sequence: number;
  payload: any;
}

type DataType = 'user_action' | 'system_event' | 'performance_metric';

class ServerMetrics {
  private messagesGenerated = 0;
  private messagesSent = 0;
  private activeConnections = 0;
  private startTime = Date.now();
  
  incrementMessagesGenerated(): void { this.messagesGenerated++; }
  incrementMessagesSent(): void { this.messagesSent++; }
  getTotalMessages(): number { return this.messagesGenerated; }
  decrementActiveConnections(): void { this.activeConnections--; }
  
  generateReport(): MetricsReport {
    const uptime = Date.now() - this.startTime;
    
    return {
      uptime: Math.floor(uptime / 1000),
      activeConnections: this.activeConnections,
      totalMessagesGenerated: this.messagesGenerated,
      totalMessagesSent: this.messagesSent,
      averageMessagesPerSecond: Math.round(this.messagesGenerated / (uptime / 1000)),
      memoryUsage: process.memoryUsage(),
      timestamp: new Date().toISOString()
    };
  }
}

interface MetricsReport {
  uptime: number;
  activeConnections: number;
  totalMessagesGenerated: number;
  totalMessagesSent: number;
  averageMessagesPerSecond: number;
  memoryUsage: NodeJS.MemoryUsage;
  timestamp: string;
}

// ๐Ÿš€ Usage example
const server = new StreamingServer(8080);

console.log('๐ŸŒŠ Node.js Streaming Server Ready!');
console.log('Connect with WebSocket clients to ws://localhost:8080');

๐Ÿ›ก๏ธ Error Handling and Performance Optimization

๐Ÿ”ง Production-Ready Stream Management

Hereโ€™s how to handle errors and optimize performance:

// ๐ŸŽฏ Production stream management with comprehensive error handling
// Enterprise-grade streaming patterns

import { Readable, Transform, Writable, pipeline } from 'stream';
import { promisify } from 'util';
import { createBrotliCompress, createBrotliDecompress } from 'zlib';

const pipelineAsync = promisify(pipeline);

// ๐Ÿ›ก๏ธ Resilient stream manager
class StreamManager {
  private retryAttempts = 3;
  private backoffDelay = 1000;
  private concurrencyLimit = 10;
  private activeStreams = new Set<string>();
  
  async processWithRetry<T>(
    streamFactory: () => Promise<T>,
    options: RetryOptions = {}
  ): Promise<T> {
    const maxAttempts = options.maxAttempts || this.retryAttempts;
    const delay = options.backoffDelay || this.backoffDelay;
    
    for (let attempt = 1; attempt <= maxAttempts; attempt++) {
      try {
        console.log(`๐Ÿ”„ Processing attempt ${attempt}/${maxAttempts}`);
        return await streamFactory();
        
      } catch (error) {
        console.error(`๐Ÿ’ฅ Attempt ${attempt} failed: ${error}`);
        
        if (attempt === maxAttempts) {
          throw new StreamProcessingError(`Failed after ${maxAttempts} attempts`, error as Error);
        }
        
        // ๐Ÿ• Exponential backoff
        const backoffTime = delay * Math.pow(2, attempt - 1);
        console.log(`โณ Waiting ${backoffTime}ms before retry...`);
        await new Promise(resolve => setTimeout(resolve, backoffTime));
      }
    }
    
    throw new Error('Unexpected retry loop exit');
  }
  
  async createRobustPipeline(config: PipelineConfig): Promise<ProcessingStats> {
    const streamId = this.generateStreamId();
    this.activeStreams.add(streamId);
    
    try {
      console.log(`๐Ÿš€ Starting robust pipeline: ${streamId}`);
      
      // ๐Ÿ“Š Performance monitoring
      const stats = new ProcessingStats();
      
      // ๐Ÿ”ง Create pipeline components with error handling
      const source = this.createMonitoredSource(config.input, stats);
      const transforms = config.transforms.map(t => this.wrapWithMonitoring(t, stats));
      const destination = this.createMonitoredDestination(config.output, stats);
      
      // ๐Ÿ”— Build pipeline with error boundaries
      const components = [source, ...transforms, destination];
      
      await pipelineAsync(...components);
      
      console.log(`โœ… Pipeline completed successfully: ${streamId}`);
      return stats;
      
    } catch (error) {
      console.error(`๐Ÿ’ฅ Pipeline failed: ${streamId}`, error);
      throw error;
    } finally {
      this.activeStreams.delete(streamId);
    }
  }
  
  private createMonitoredSource(input: InputConfig, stats: ProcessingStats): Readable {
    let stream: Readable;
    
    switch (input.type) {
      case 'file':
        stream = createReadStream(input.path, {
          highWaterMark: input.bufferSize || 64 * 1024
        });
        break;
      case 'http':
        stream = this.createHttpSource(input.url);
        break;
      case 'generator':
        stream = this.createGeneratorSource(input.generator);
        break;
      default:
        throw new Error(`Unsupported input type: ${input.type}`);
    }
    
    // ๐Ÿ“Š Add monitoring
    stream.on('data', (chunk) => {
      stats.addBytesRead(chunk.length);
    });
    
    stream.on('error', (error) => {
      console.error(`๐Ÿ’ฅ Source error: ${error.message}`);
      stats.addError(error);
    });
    
    stream.on('end', () => {
      console.log('๐Ÿ“– Source stream ended');
      stats.markSourceComplete();
    });
    
    return stream;
  }
  
  private wrapWithMonitoring(transform: Transform, stats: ProcessingStats): Transform {
    const wrapper = new Transform({
      objectMode: transform._readableState.objectMode,
      transform: async (chunk, encoding, callback) => {
        try {
          const startTime = process.hrtime.bigint();
          
          // ๐Ÿ”„ Execute original transformation
          transform._transform(chunk, encoding, (error, result) => {
            const endTime = process.hrtime.bigint();
            const duration = Number(endTime - startTime) / 1000000; // Convert to ms
            
            if (error) {
              stats.addError(error);
              callback(error);
            } else {
              stats.addProcessingTime(duration);
              stats.incrementProcessedCount();
              callback(null, result);
            }
          });
          
        } catch (error) {
          console.error(`๐Ÿ’ฅ Transform error: ${error}`);
          stats.addError(error as Error);
          callback(error);
        }
      }
    });
    
    return wrapper;
  }
  
  private createMonitoredDestination(output: OutputConfig, stats: ProcessingStats): Writable {
    let stream: Writable;
    
    switch (output.type) {
      case 'file':
        stream = createWriteStream(output.path);
        break;
      case 'http':
        stream = this.createHttpDestination(output.url);
        break;
      case 'memory':
        stream = this.createMemoryDestination();
        break;
      default:
        throw new Error(`Unsupported output type: ${output.type}`);
    }
    
    // ๐Ÿ“Š Add monitoring
    stream.on('write', (chunk) => {
      stats.addBytesWritten(chunk.length);
    });
    
    stream.on('error', (error) => {
      console.error(`๐Ÿ’ฅ Destination error: ${error.message}`);
      stats.addError(error);
    });
    
    stream.on('finish', () => {
      console.log('๐Ÿ’พ Destination stream finished');
      stats.markDestinationComplete();
    });
    
    return stream;
  }
  
  // ๐ŸŽฏ Memory-efficient batch processor
  async processBatch<T, R>(
    items: T[],
    processor: (item: T) => Promise<R>,
    batchSize: number = 100
  ): Promise<R[]> {
    const results: R[] = [];
    
    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize);
      console.log(`๐Ÿ“ฆ Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(items.length / batchSize)}`);
      
      // ๐Ÿ”„ Process batch with concurrency control
      const batchPromises = batch.map(async (item, index) => {
        try {
          return await processor(item);
        } catch (error) {
          console.error(`๐Ÿ’ฅ Item ${i + index} failed: ${error}`);
          throw error;
        }
      });
      
      const batchResults = await Promise.allSettled(batchPromises);
      
      // ๐Ÿ“Š Collect successful results and log failures
      batchResults.forEach((result, index) => {
        if (result.status === 'fulfilled') {
          results.push(result.value);
        } else {
          console.error(`๐Ÿ’ฅ Batch item ${i + index} failed: ${result.reason}`);
        }
      });
    }
    
    return results;
  }
  
  private generateStreamId(): string {
    return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }
  
  private createHttpSource(url: string): Readable {
    // Implementation for HTTP source
    throw new Error('HTTP source not implemented');
  }
  
  private createGeneratorSource(generator: () => Generator<any>): Readable {
    // Implementation for generator source
    throw new Error('Generator source not implemented');
  }
  
  private createHttpDestination(url: string): Writable {
    // Implementation for HTTP destination
    throw new Error('HTTP destination not implemented');
  }
  
  private createMemoryDestination(): Writable {
    const chunks: Buffer[] = [];
    
    return new Writable({
      write(chunk, encoding, callback) {
        chunks.push(Buffer.from(chunk));
        callback();
      },
      final(callback) {
        console.log(`๐Ÿ’พ Memory destination complete: ${chunks.length} chunks`);
        callback();
      }
    });
  }
}

// ๐Ÿ“Š Performance tracking
class ProcessingStats {
  private startTime = Date.now();
  private bytesRead = 0;
  private bytesWritten = 0;
  private processedCount = 0;
  private errors: Error[] = [];
  private processingTimes: number[] = [];
  private sourceComplete = false;
  private destinationComplete = false;
  
  addBytesRead(bytes: number): void { this.bytesRead += bytes; }
  addBytesWritten(bytes: number): void { this.bytesWritten += bytes; }
  incrementProcessedCount(): void { this.processedCount++; }
  addError(error: Error): void { this.errors.push(error); }
  addProcessingTime(ms: number): void { this.processingTimes.push(ms); }
  markSourceComplete(): void { this.sourceComplete = true; }
  markDestinationComplete(): void { this.destinationComplete = true; }
  
  generateReport(): StatsReport {
    const duration = Date.now() - this.startTime;
    const avgProcessingTime = this.processingTimes.length > 0 
      ? this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length 
      : 0;
    
    return {
      duration,
      throughput: Math.round(this.processedCount / (duration / 1000)),
      bytesRead: this.bytesRead,
      bytesWritten: this.bytesWritten,
      processedCount: this.processedCount,
      errorCount: this.errors.length,
      averageProcessingTime: Math.round(avgProcessingTime * 100) / 100,
      memoryUsage: process.memoryUsage(),
      complete: this.sourceComplete && this.destinationComplete
    };
  }
}

// ๐Ÿ”ง Supporting types
interface RetryOptions {
  maxAttempts?: number;
  backoffDelay?: number;
}

interface PipelineConfig {
  input: InputConfig;
  transforms: Transform[];
  output: OutputConfig;
}

interface InputConfig {
  type: 'file' | 'http' | 'generator';
  path?: string;
  url?: string;
  generator?: () => Generator<any>;
  bufferSize?: number;
}

interface OutputConfig {
  type: 'file' | 'http' | 'memory';
  path?: string;
  url?: string;
}

interface StatsReport {
  duration: number;
  throughput: number;
  bytesRead: number;
  bytesWritten: number;
  processedCount: number;
  errorCount: number;
  averageProcessingTime: number;
  memoryUsage: NodeJS.MemoryUsage;
  complete: boolean;
}

class StreamProcessingError extends Error {
  constructor(message: string, public cause: Error) {
    super(message);
    this.name = 'StreamProcessingError';
  }
}

// ๐Ÿš€ Usage example
const streamManager = new StreamManager();

// Example: Process large dataset with retry logic
streamManager.processWithRetry(async () => {
  return await streamManager.createRobustPipeline({
    input: { type: 'file', path: 'large-dataset.json' },
    transforms: [
      new JSONLineProcessor(),
      new DataAggregator()
    ],
    output: { type: 'file', path: 'processed-output.json' }
  });
}).then(stats => {
  console.log('๐ŸŽ‰ Processing complete!', stats.generateReport());
}).catch(error => {
  console.error('๐Ÿ’ฅ Processing failed:', error);
});

๐ŸŽฏ Summary and Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered Node.js streams with TypeScript and learned how to build efficient, scalable data processing applications. You now have the skills to:

๐Ÿ† What Youโ€™ve Accomplished

โœ… Stream Fundamentals - Understanding readable, writable, and transform streams
โœ… Type Safety - Using TypeScript for robust stream implementations
โœ… Data Processing - Building efficient data transformation pipelines
โœ… Real-Time Applications - Creating WebSocket-based streaming servers
โœ… Error Handling - Implementing resilient stream management
โœ… Performance Optimization - Memory-efficient processing patterns

๐Ÿš€ Key Takeaways

  1. Memory Efficiency ๐Ÿ’พ: Streams process data in chunks, not all at once
  2. Composability ๐Ÿ”—: Chain transforms like building blocks
  3. Backpressure ๐Ÿšฐ: Automatic flow control prevents memory overflow
  4. Error Resilience ๐Ÿ›ก๏ธ: Proper error handling keeps streams running
  5. Type Safety ๐Ÿ“: TypeScript ensures correct stream operations

๐ŸŽฏ Next Steps

Ready to expand your streaming expertise? Consider exploring:

  • Advanced Patterns ๐Ÿ—๏ธ: Custom duplex streams and more complex pipelines
  • Microservices ๐ŸŒ: Stream-based inter-service communication
  • Real-Time Analytics ๐Ÿ“Š: Building live dashboards and monitoring
  • File Processing ๐Ÿ“: Large file handling and cloud storage integration
  • Message Queues ๐Ÿ“ฎ: Streaming with RabbitMQ, Kafka, and Redis

Youโ€™re now equipped to build production-grade streaming applications that handle massive datasets with grace and efficiency! ๐ŸŒŠ

Keep streaming, keep building! ๐Ÿš€โœจ