+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 104 of 355

πŸ”„ Async Iterators: For-Await-Of Loops Mastery

Master async iterators in TypeScript for streaming data processing, efficient memory usage, and elegant async iteration patterns 🌊

πŸš€Intermediate
20 min read

Prerequisites

  • Basic TypeScript syntax and types πŸ“
  • Understanding of Promises and async/await ⚑
  • Array iteration methods and for-of loops πŸ”

What you'll learn

  • Master async iterators for streaming data processing 🌊
  • Build memory-efficient async data pipelines with for-await-of πŸ“Š
  • Create custom async iterables for complex data sources πŸ—οΈ
  • Handle real-time data streams with elegant async iteration πŸ“‘

🎯 Introduction

Welcome to the streaming world of async iterators and for-await-of loops! πŸŽ‰ In this guide, we’ll explore how to process data streams asynchronously with elegant iteration patterns that handle massive datasets efficiently.

You’ll discover how to transform overwhelming data processing tasks into manageable, memory-efficient operations that can handle anything from API pagination πŸ“„ to real-time data feeds πŸ“‘. Whether you’re processing large files πŸ“, consuming streaming APIs 🌊, or building data pipelines 🏭, mastering async iterators is essential for building scalable TypeScript applications.

By the end of this tutorial, you’ll be creating data processing powerhouses that stream through information like a flowing river! πŸ”„ Let’s dive in! πŸŠβ€β™‚οΈ

πŸ“š Understanding Async Iterators

πŸ€” What are Async Iterators?

Async iterators are like having a conveyor belt that delivers data one piece at a time, asynchronously πŸ“¦. Think of it as the difference between downloading an entire movie before watching versus streaming it - async iterators let you process data as it arrives!

In TypeScript terms, async iterators provide:

  • ✨ Streaming processing - handle data as it becomes available
  • πŸš€ Memory efficiency - process large datasets without loading everything at once
  • πŸ›‘οΈ Type safety - maintain strong typing throughout async iteration
  • πŸ“¦ Lazy evaluation - only fetch data when needed

πŸ’‘ Why Use Async Iterators?

Here’s why async iterators are game-changers:

  1. Memory Efficiency πŸ’ͺ: Process massive datasets without overwhelming memory
  2. Real-Time Processing ⚑: Handle streaming data as it arrives
  3. Better UX 😊: Show progress and partial results immediately
  4. Scalability πŸ“ˆ: Handle datasets of any size gracefully
  5. Resource Management πŸ”§: Optimal use of CPU and network resources

Real-world example: Processing a 10GB CSV file. Instead of loading it all into memory (πŸ’₯), async iterators let you process one row at a time while the file streams in! πŸ“Š

πŸ”§ Basic For-Await-Of Patterns

πŸ“ Simple Async Iteration

Let’s start with fundamental for-await-of usage:

// 🎯 Basic for-await-of with async generators
// TypeScript provides excellent type inference for async iteration

// πŸ“Š Simple async data processor
const processDataStream = async function* (): AsyncGenerator<ProcessedData, void, unknown> {
  const dataSources = ['api-1', 'api-2', 'api-3', 'api-4'];
  
  console.log('πŸš€ Starting data stream processing...');
  
  for (const source of dataSources) {
    try {
      // 🌐 Fetch data asynchronously
      const rawData = await fetchDataFromSource(source);
      
      // πŸ”„ Process each piece
      const processed = await processRawData(rawData);
      
      console.log(`βœ… Processed data from ${source}`);
      yield processed; // πŸ“€ Yield results one at a time
      
    } catch (error) {
      console.error(`πŸ’₯ Error processing ${source}:`, error.message);
      // ⚠️ Continue with next source instead of failing entirely
      continue;
    }
  }
  
  console.log('πŸŽ‰ Stream processing complete!');
};

// πŸ—οΈ Type definitions
interface ProcessedData {
  id: string;
  content: string;
  processedAt: Date;
  source: string;
  metrics: DataMetrics;
}

interface DataMetrics {
  size: number;
  processingTime: number;
  quality: 'high' | 'medium' | 'low';
}

// ✨ Using for-await-of to consume the stream
const consumeDataStream = async (): Promise<void> => {
  const results: ProcessedData[] = [];
  
  console.log('πŸ”„ Starting stream consumption...');
  
  // 🌊 Iterate through async stream
  for await (const data of processDataStream()) {
    // πŸ“¦ Process each piece as it arrives
    results.push(data);
    
    // πŸ“Š Show progress
    console.log(`πŸ“ˆ Processed ${results.length} items so far...`);
    
    // 🎯 Optional: Add delay for demonstration
    await new Promise(resolve => setTimeout(resolve, 100));
  }
  
  console.log(`βœ… Final results: ${results.length} items processed`);
  return results;
};

πŸ”„ Advanced Streaming Patterns

Let’s explore sophisticated async iteration patterns:

// 🏭 Advanced streaming data pipeline with type safety
class AsyncDataPipeline<TInput, TOutput> {
  private transformers: AsyncTransformer<any, any>[] = [];
  
  // πŸ”— Chainable transformer registration
  pipe<TNext>(transformer: AsyncTransformer<TOutput, TNext>): AsyncDataPipeline<TInput, TNext> {
    return new AsyncDataPipeline<TInput, TNext>([...this.transformers, transformer]);
  }
  
  // 🌊 Execute the pipeline with async iteration
  async *process(source: AsyncIterable<TInput>): AsyncGenerator<TOutput, void, unknown> {
    console.log('πŸš€ Starting pipeline execution...');
    
    for await (const item of source) {
      try {
        let current: any = item;
        
        // πŸ”„ Apply each transformer in sequence
        for (const transformer of this.transformers) {
          current = await transformer.transform(current);
        }
        
        yield current as TOutput;
        
      } catch (error) {
        console.error('πŸ’₯ Pipeline error:', error.message);
        // πŸ›‘οΈ Continue processing other items
        continue;
      }
    }
    
    console.log('βœ… Pipeline execution complete');
  }
  
  // πŸ“Š Collect all results (use with caution for large datasets)
  async collect(source: AsyncIterable<TInput>): Promise<TOutput[]> {
    const results: TOutput[] = [];
    
    for await (const item of this.process(source)) {
      results.push(item);
    }
    
    return results;
  }
  
  constructor(transformers: AsyncTransformer<any, any>[] = []) {
    this.transformers = transformers;
  }
}

// πŸ› οΈ Generic transformer interface
interface AsyncTransformer<TInput, TOutput> {
  transform(input: TInput): Promise<TOutput>;
}

// πŸ”§ Practical transformer implementations
class DataValidationTransformer implements AsyncTransformer<RawDataItem, ValidatedDataItem> {
  async transform(input: RawDataItem): Promise<ValidatedDataItem> {
    console.log('πŸ” Validating data item...');
    
    // πŸ“‹ Validation logic
    if (!input.id || !input.content) {
      throw new Error('Invalid data item: missing required fields');
    }
    
    return {
      ...input,
      isValid: true,
      validatedAt: new Date()
    };
  }
}

class DataEnrichmentTransformer implements AsyncTransformer<ValidatedDataItem, EnrichedDataItem> {
  async transform(input: ValidatedDataItem): Promise<EnrichedDataItem> {
    console.log('✨ Enriching data item...');
    
    // 🌐 Fetch additional data
    const metadata = await fetchMetadata(input.id);
    const tags = await generateTags(input.content);
    
    return {
      ...input,
      metadata,
      tags,
      enrichedAt: new Date()
    };
  }
}

class DataCompressionTransformer implements AsyncTransformer<EnrichedDataItem, CompressedDataItem> {
  async transform(input: EnrichedDataItem): Promise<CompressedDataItem> {
    console.log('πŸ—œοΈ Compressing data item...');
    
    // πŸ“¦ Compression logic
    const compressedContent = await compressData(input.content);
    
    return {
      id: input.id,
      compressedContent,
      originalSize: input.content.length,
      compressedSize: compressedContent.length,
      compressionRatio: compressedContent.length / input.content.length,
      processedAt: new Date()
    };
  }
}

// πŸ—οΈ Type definitions for pipeline stages
interface RawDataItem {
  id: string;
  content: string;
  source: string;
}

interface ValidatedDataItem extends RawDataItem {
  isValid: boolean;
  validatedAt: Date;
}

interface EnrichedDataItem extends ValidatedDataItem {
  metadata: DataMetadata;
  tags: string[];
  enrichedAt: Date;
}

interface CompressedDataItem {
  id: string;
  compressedContent: string;
  originalSize: number;
  compressedSize: number;
  compressionRatio: number;
  processedAt: Date;
}

interface DataMetadata {
  category: string;
  priority: 'high' | 'medium' | 'low';
  region: string;
  version: number;
}

🌊 Working with Async Iterables

πŸ“‘ Real-World Stream Processing

Let’s build a comprehensive real-time data processing system:

// πŸš€ Real-time event stream processor
class RealTimeEventProcessor {
  private eventStreams: Map<string, AsyncIterable<Event>> = new Map();
  private processors: Map<EventType, EventProcessor> = new Map();
  private subscribers: Map<string, EventSubscriber[]> = new Map();
  
  // πŸ“‘ Register an event stream
  registerStream(streamId: string, stream: AsyncIterable<Event>): void {
    console.log(`πŸ“‘ Registering event stream: ${streamId}`);
    this.eventStreams.set(streamId, stream);
  }
  
  // πŸ”„ Register event processor
  registerProcessor(eventType: EventType, processor: EventProcessor): void {
    console.log(`πŸ”„ Registering processor for: ${eventType}`);
    this.processors.set(eventType, processor);
  }
  
  // πŸ‘‚ Register event subscriber
  subscribe(eventType: string, subscriber: EventSubscriber): void {
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, []);
    }
    this.subscribers.get(eventType)!.push(subscriber);
    console.log(`πŸ‘‚ New subscriber for: ${eventType}`);
  }
  
  // πŸš€ Start processing all streams
  async startProcessing(): Promise<void> {
    console.log('πŸš€ Starting real-time event processing...');
    
    const streamPromises = Array.from(this.eventStreams.entries()).map(
      ([streamId, stream]) => this.processStream(streamId, stream)
    );
    
    // πŸ”„ Process all streams concurrently
    await Promise.all(streamPromises);
  }
  
  // 🌊 Process individual stream
  private async processStream(streamId: string, stream: AsyncIterable<Event>): Promise<void> {
    console.log(`🌊 Processing stream: ${streamId}`);
    
    try {
      // πŸ”„ Iterate through events as they arrive
      for await (const event of stream) {
        await this.processEvent(streamId, event);
      }
    } catch (error) {
      console.error(`πŸ’₯ Stream ${streamId} error:`, error.message);
      // πŸ”„ Attempt to reconnect or handle gracefully
      await this.handleStreamError(streamId, error);
    }
  }
  
  // ⚑ Process individual event
  private async processEvent(streamId: string, event: Event): Promise<void> {
    try {
      console.log(`⚑ Processing event from ${streamId}:`, event.type);
      
      // πŸ”„ Find and execute processor
      const processor = this.processors.get(event.type);
      if (processor) {
        const processedEvent = await processor.process(event);
        
        // πŸ“€ Notify subscribers
        await this.notifySubscribers(event.type, processedEvent);
      } else {
        console.warn(`⚠️ No processor found for event type: ${event.type}`);
      }
      
    } catch (error) {
      console.error(`πŸ’₯ Event processing error:`, error.message);
      // πŸ›‘οΈ Continue processing other events
    }
  }
  
  // πŸ“€ Notify all subscribers
  private async notifySubscribers(eventType: EventType, event: ProcessedEvent): Promise<void> {
    const subscribers = this.subscribers.get(eventType) || [];
    
    // πŸ“‘ Notify all subscribers concurrently
    const notifications = subscribers.map(subscriber => 
      subscriber.onEvent(event).catch(error => 
        console.error(`πŸ’₯ Subscriber notification error:`, error.message)
      )
    );
    
    await Promise.all(notifications);
  }
  
  // πŸ›‘οΈ Handle stream errors with recovery
  private async handleStreamError(streamId: string, error: Error): Promise<void> {
    console.log(`πŸ›‘οΈ Handling stream error for ${streamId}...`);
    
    // πŸ”„ Implement retry logic or reconnection
    setTimeout(() => {
      console.log(`πŸ”„ Attempting to reconnect stream: ${streamId}`);
      // Reconnection logic would go here
    }, 5000);
  }
}

// πŸ—οΈ Event system type definitions
interface Event {
  id: string;
  type: EventType;
  timestamp: Date;
  data: any;
  metadata: EventMetadata;
}

interface ProcessedEvent extends Event {
  processedAt: Date;
  processingTime: number;
  enrichedData?: any;
}

interface EventMetadata {
  source: string;
  priority: 'critical' | 'high' | 'medium' | 'low';
  correlationId?: string;
  userId?: string;
}

type EventType = 'user_action' | 'system_event' | 'notification' | 'error' | 'metrics';

interface EventProcessor {
  process(event: Event): Promise<ProcessedEvent>;
}

interface EventSubscriber {
  onEvent(event: ProcessedEvent): Promise<void>;
}

// πŸ”§ Concrete processor implementations
class UserActionProcessor implements EventProcessor {
  async process(event: Event): Promise<ProcessedEvent> {
    const startTime = Date.now();
    console.log('πŸ‘€ Processing user action...');
    
    // πŸ” Analyze user behavior
    const analysis = await this.analyzeUserBehavior(event.data);
    
    // πŸ“Š Update user metrics
    await this.updateUserMetrics(event.metadata.userId!, analysis);
    
    return {
      ...event,
      processedAt: new Date(),
      processingTime: Date.now() - startTime,
      enrichedData: analysis
    };
  }
  
  private async analyzeUserBehavior(data: any): Promise<UserBehaviorAnalysis> {
    // 🧠 Complex analysis logic
    await new Promise(resolve => setTimeout(resolve, 50)); // Simulate processing
    
    return {
      actionType: data.action,
      frequency: Math.floor(Math.random() * 100),
      pattern: 'normal',
      riskScore: Math.random()
    };
  }
  
  private async updateUserMetrics(userId: string, analysis: UserBehaviorAnalysis): Promise<void> {
    console.log(`πŸ“Š Updating metrics for user: ${userId}`);
    // Database update logic would go here
  }
}

interface UserBehaviorAnalysis {
  actionType: string;
  frequency: number;
  pattern: 'normal' | 'suspicious' | 'anomalous';
  riskScore: number;
}

// πŸ“§ Email notification subscriber
class EmailNotificationSubscriber implements EventSubscriber {
  async onEvent(event: ProcessedEvent): Promise<void> {
    if (event.metadata.priority === 'critical') {
      console.log('πŸ“§ Sending critical event email notification...');
      
      // πŸ“¬ Send email logic
      await this.sendEmail({
        to: '[email protected]',
        subject: `Critical Event: ${event.type}`,
        body: `Event ${event.id} requires immediate attention.`
      });
    }
  }
  
  private async sendEmail(email: EmailData): Promise<void> {
    // πŸ“¬ Email sending implementation
    console.log(`πŸ“¬ Email sent: ${email.subject}`);
  }
}

interface EmailData {
  to: string;
  subject: string;
  body: string;
}

πŸ› οΈ Custom Async Iterables

πŸ—οΈ Building Custom Iterables

Let’s create powerful custom async iterables:

// πŸ”„ Paginated API data fetcher with async iteration
class PaginatedAPIIterator<T> implements AsyncIterable<T> {
  private baseUrl: string;
  private headers: Record<string, string>;
  private pageSize: number;
  private totalItems?: number;
  
  constructor(
    baseUrl: string,
    options: {
      headers?: Record<string, string>;
      pageSize?: number;
    } = {}
  ) {
    this.baseUrl = baseUrl;
    this.headers = options.headers || {};
    this.pageSize = options.pageSize || 50;
  }
  
  // πŸ”„ Implement AsyncIterable interface
  [Symbol.asyncIterator](): AsyncIterator<T> {
    return this.createIterator();
  }
  
  // πŸ—οΈ Create the actual iterator
  private createIterator(): AsyncIterator<T> {
    let currentPage = 1;
    let currentIndex = 0;
    let currentBatch: T[] = [];
    let finished = false;
    
    return {
      async next(): Promise<IteratorResult<T>> {
        // πŸ“¦ If we have items in current batch, return next item
        if (currentIndex < currentBatch.length) {
          const value = currentBatch[currentIndex++];
          return { value, done: false };
        }
        
        // πŸ”š If we're finished, return done
        if (finished) {
          return { value: undefined, done: true };
        }
        
        try {
          // 🌐 Fetch next page
          console.log(`πŸ“„ Fetching page ${currentPage}...`);
          const response = await fetch(`${this.baseUrl}?page=${currentPage}&limit=${this.pageSize}`, {
            headers: this.headers
          });
          
          if (!response.ok) {
            throw new Error(`API request failed: ${response.status}`);
          }
          
          const data: PaginatedResponse<T> = await response.json();
          
          // πŸ“Š Update state
          currentBatch = data.items;
          currentIndex = 0;
          currentPage++;
          
          // πŸ”š Check if we're done
          if (currentBatch.length === 0 || data.hasNext === false) {
            finished = true;
          }
          
          // πŸ“¦ Return first item from new batch
          if (currentBatch.length > 0) {
            const value = currentBatch[currentIndex++];
            return { value, done: false };
          } else {
            return { value: undefined, done: true };
          }
          
        } catch (error) {
          console.error('πŸ’₯ API iteration error:', error.message);
          finished = true;
          throw error;
        }
      }
    };
  }
  
  // πŸ“Š Get estimated total count
  async getEstimatedTotal(): Promise<number | undefined> {
    if (this.totalItems !== undefined) {
      return this.totalItems;
    }
    
    try {
      // πŸ” Fetch metadata
      const response = await fetch(`${this.baseUrl}/count`, {
        headers: this.headers
      });
      
      if (response.ok) {
        const { total } = await response.json();
        this.totalItems = total;
        return total;
      }
    } catch (error) {
      console.warn('⚠️ Could not fetch total count:', error.message);
    }
    
    return undefined;
  }
}

// πŸ—οΈ Paginated response interface
interface PaginatedResponse<T> {
  items: T[];
  page: number;
  limit: number;
  total: number;
  hasNext: boolean;
}

// πŸ“Š Advanced batch processor with progress tracking
class BatchProcessor<TInput, TOutput> {
  private batchSize: number;
  private concurrency: number;
  private processor: (batch: TInput[]) => Promise<TOutput[]>;
  
  constructor(
    processor: (batch: TInput[]) => Promise<TOutput[]>,
    options: {
      batchSize?: number;
      concurrency?: number;
    } = {}
  ) {
    this.processor = processor;
    this.batchSize = options.batchSize || 10;
    this.concurrency = options.concurrency || 3;
  }
  
  // πŸ”„ Process async iterable in batches
  async *processBatches(
    source: AsyncIterable<TInput>
  ): AsyncGenerator<BatchResult<TOutput>, void, unknown> {
    const activeBatches = new Map<number, Promise<BatchResult<TOutput>>>();
    let batchId = 0;
    let currentBatch: TInput[] = [];
    let totalItems = 0;
    
    console.log('πŸš€ Starting batch processing...');
    
    try {
      // πŸ”„ Iterate through source
      for await (const item of source) {
        currentBatch.push(item);
        totalItems++;
        
        // πŸ“¦ Process when batch is full
        if (currentBatch.length >= this.batchSize) {
          const batch = [...currentBatch];
          const id = batchId++;
          
          // πŸš€ Start batch processing
          activeBatches.set(id, this.processBatch(id, batch));
          currentBatch = [];
          
          // πŸ”„ Manage concurrency
          if (activeBatches.size >= this.concurrency) {
            const result = await this.getNextCompletedBatch(activeBatches);
            yield result;
          }
        }
      }
      
      // πŸ“¦ Process remaining items
      if (currentBatch.length > 0) {
        const id = batchId++;
        activeBatches.set(id, this.processBatch(id, currentBatch));
      }
      
      // πŸ”š Wait for all remaining batches
      while (activeBatches.size > 0) {
        const result = await this.getNextCompletedBatch(activeBatches);
        yield result;
      }
      
      console.log(`βœ… Batch processing complete: ${totalItems} items processed`);
      
    } catch (error) {
      console.error('πŸ’₯ Batch processing error:', error.message);
      throw error;
    }
  }
  
  // πŸ—οΈ Process single batch
  private async processBatch(id: number, batch: TInput[]): Promise<BatchResult<TOutput>> {
    const startTime = Date.now();
    console.log(`πŸ“¦ Processing batch ${id} with ${batch.length} items...`);
    
    try {
      const results = await this.processor(batch);
      const processingTime = Date.now() - startTime;
      
      console.log(`βœ… Batch ${id} completed in ${processingTime}ms`);
      
      return {
        id,
        results,
        inputCount: batch.length,
        outputCount: results.length,
        processingTime,
        success: true
      };
      
    } catch (error) {
      const processingTime = Date.now() - startTime;
      console.error(`πŸ’₯ Batch ${id} failed:`, error.message);
      
      return {
        id,
        results: [],
        inputCount: batch.length,
        outputCount: 0,
        processingTime,
        success: false,
        error: error.message
      };
    }
  }
  
  // ⏳ Get next completed batch
  private async getNextCompletedBatch(
    activeBatches: Map<number, Promise<BatchResult<TOutput>>>
  ): Promise<BatchResult<TOutput>> {
    const [batchId, promise] = Array.from(activeBatches.entries())[0];
    const result = await promise;
    activeBatches.delete(batchId);
    return result;
  }
}

// πŸ—οΈ Batch result interface
interface BatchResult<T> {
  id: number;
  results: T[];
  inputCount: number;
  outputCount: number;
  processingTime: number;
  success: boolean;
  error?: string;
}

🎯 Practical Applications

πŸ“Š Complete Data Processing Pipeline

Let’s build a comprehensive real-world application:

// 🏭 Complete data ingestion and processing system
class DataIngestionPipeline {
  private sources: Map<string, AsyncIterable<RawData>> = new Map();
  private transformers: DataTransformer[] = [];
  private outputs: Map<string, DataOutput> = new Map();
  private metrics: ProcessingMetrics = {
    totalProcessed: 0,
    successCount: 0,
    errorCount: 0,
    startTime: new Date(),
    averageProcessingTime: 0
  };
  
  // πŸ“‘ Register data source
  addSource(name: string, source: AsyncIterable<RawData>): this {
    console.log(`πŸ“‘ Adding data source: ${name}`);
    this.sources.set(name, source);
    return this;
  }
  
  // πŸ”„ Add data transformer
  addTransformer(transformer: DataTransformer): this {
    console.log(`πŸ”„ Adding transformer: ${transformer.name}`);
    this.transformers.push(transformer);
    return this;
  }
  
  // πŸ“€ Add output destination
  addOutput(name: string, output: DataOutput): this {
    console.log(`πŸ“€ Adding output: ${name}`);
    this.outputs.set(name, output);
    return this;
  }
  
  // πŸš€ Start the pipeline
  async run(): Promise<ProcessingMetrics> {
    console.log('πŸš€ Starting data ingestion pipeline...');
    this.metrics.startTime = new Date();
    
    try {
      // πŸ”„ Process all sources concurrently
      const sourcePromises = Array.from(this.sources.entries()).map(
        ([name, source]) => this.processSource(name, source)
      );
      
      await Promise.all(sourcePromises);
      
      // πŸ“Š Finalize metrics
      this.metrics.averageProcessingTime = this.calculateAverageProcessingTime();
      
      console.log('βœ… Pipeline completed successfully');
      console.log('πŸ“Š Final metrics:', this.metrics);
      
      return this.metrics;
      
    } catch (error) {
      console.error('πŸ’₯ Pipeline error:', error.message);
      throw error;
    }
  }
  
  // 🌊 Process individual data source
  private async processSource(sourceName: string, source: AsyncIterable<RawData>): Promise<void> {
    console.log(`🌊 Processing source: ${sourceName}`);
    
    const processingStartTime = Date.now();
    let itemCount = 0;
    
    try {
      // πŸ”„ Process each item from the source
      for await (const rawItem of source) {
        await this.processItem(sourceName, rawItem);
        itemCount++;
        
        // πŸ“Š Update progress periodically
        if (itemCount % 100 === 0) {
          console.log(`πŸ“ˆ Processed ${itemCount} items from ${sourceName}`);
        }
      }
      
      const processingTime = Date.now() - processingStartTime;
      console.log(`βœ… Source ${sourceName} complete: ${itemCount} items in ${processingTime}ms`);
      
    } catch (error) {
      console.error(`πŸ’₯ Source ${sourceName} error:`, error.message);
      this.metrics.errorCount++;
    }
  }
  
  // ⚑ Process individual item
  private async processItem(sourceName: string, rawItem: RawData): Promise<void> {
    const itemStartTime = Date.now();
    
    try {
      let currentData: any = { ...rawItem, sourceName };
      
      // πŸ”„ Apply all transformers
      for (const transformer of this.transformers) {
        currentData = await transformer.transform(currentData);
      }
      
      // πŸ“€ Send to all outputs
      const outputPromises = Array.from(this.outputs.values()).map(
        output => output.write(currentData).catch(error => {
          console.error(`πŸ’₯ Output error:`, error.message);
        })
      );
      
      await Promise.all(outputPromises);
      
      // πŸ“Š Update metrics
      this.metrics.totalProcessed++;
      this.metrics.successCount++;
      
    } catch (error) {
      console.error(`πŸ’₯ Item processing error:`, error.message);
      this.metrics.errorCount++;
    }
  }
  
  // πŸ“Š Calculate average processing time
  private calculateAverageProcessingTime(): number {
    const totalTime = Date.now() - this.metrics.startTime.getTime();
    return this.metrics.totalProcessed > 0 ? totalTime / this.metrics.totalProcessed : 0;
  }
  
  // πŸ“Š Get real-time metrics
  getMetrics(): ProcessingMetrics {
    return { ...this.metrics };
  }
}

// πŸ—οΈ System interfaces
interface RawData {
  id: string;
  content: any;
  timestamp: Date;
  metadata: Record<string, any>;
}

interface DataTransformer {
  name: string;
  transform(data: any): Promise<any>;
}

interface DataOutput {
  write(data: any): Promise<void>;
}

interface ProcessingMetrics {
  totalProcessed: number;
  successCount: number;
  errorCount: number;
  startTime: Date;
  averageProcessingTime: number;
}

// πŸ”§ Concrete transformer implementations
class ValidationTransformer implements DataTransformer {
  name = 'ValidationTransformer';
  
  async transform(data: any): Promise<any> {
    // πŸ” Validate required fields
    if (!data.id || !data.content) {
      throw new Error('Missing required fields');
    }
    
    return {
      ...data,
      validated: true,
      validatedAt: new Date()
    };
  }
}

class EnrichmentTransformer implements DataTransformer {
  name = 'EnrichmentTransformer';
  
  async transform(data: any): Promise<any> {
    // ✨ Add enriched data
    const enrichment = await this.fetchEnrichmentData(data.id);
    
    return {
      ...data,
      enrichment,
      enrichedAt: new Date()
    };
  }
  
  private async fetchEnrichmentData(id: string): Promise<any> {
    // 🌐 Simulate API call
    await new Promise(resolve => setTimeout(resolve, 10));
    return { category: 'default', priority: 'medium' };
  }
}

// πŸ“Š Database output implementation
class DatabaseOutput implements DataOutput {
  async write(data: any): Promise<void> {
    console.log(`πŸ’Ύ Writing to database: ${data.id}`);
    
    // πŸ—„οΈ Simulate database write
    await new Promise(resolve => setTimeout(resolve, 5));
  }
}

// πŸ“ File output implementation
class FileOutput implements DataOutput {
  private filename: string;
  
  constructor(filename: string) {
    this.filename = filename;
  }
  
  async write(data: any): Promise<void> {
    console.log(`πŸ“ Writing to file ${this.filename}: ${data.id}`);
    
    // πŸ“ Simulate file write
    await new Promise(resolve => setTimeout(resolve, 3));
  }
}

πŸ’‘ Best Practices & Tips

🎯 Performance Optimization

Here are essential tips for optimal async iteration:

// βœ… DO: Use async iterators for large datasets
const processLargeDataset = async function* (source: AsyncIterable<DataItem>) {
  // πŸ”„ Process one item at a time, keeping memory usage low
  for await (const item of source) {
    const processed = await processItem(item);
    yield processed; // πŸ“€ Yield immediately to free memory
  }
};

// ❌ DON'T: Load everything into memory first
const processLargeDatasetBad = async (source: AsyncIterable<DataItem>) => {
  const allItems: DataItem[] = [];
  
  // πŸ’₯ This will consume massive memory for large datasets!
  for await (const item of source) {
    allItems.push(item);
  }
  
  return allItems.map(processItem); // 🚫 Memory explosion!
};

// βœ… DO: Handle errors gracefully without stopping iteration
const resilientProcessing = async function* (source: AsyncIterable<DataItem>) {
  for await (const item of source) {
    try {
      const result = await processItem(item);
      yield { success: true, data: result };
    } catch (error) {
      console.error(`πŸ’₯ Error processing item:`, error.message);
      yield { success: false, error: error.message };
      // βœ… Continue processing instead of failing entirely
    }
  }
};

// βœ… DO: Implement backpressure for high-throughput scenarios
class BackpressureController<T> {
  private buffer: T[] = [];
  private maxBufferSize: number;
  private processing = false;
  
  constructor(maxBufferSize: number = 100) {
    this.maxBufferSize = maxBufferSize;
  }
  
  async *process(source: AsyncIterable<T>): AsyncGenerator<T[], void, unknown> {
    for await (const item of source) {
      this.buffer.push(item);
      
      // πŸ“¦ Yield batches when buffer is full
      if (this.buffer.length >= this.maxBufferSize) {
        const batch = [...this.buffer];
        this.buffer = [];
        yield batch;
      }
    }
    
    // πŸ“¦ Yield remaining items
    if (this.buffer.length > 0) {
      yield [...this.buffer];
    }
  }
}

πŸ›‘οΈ Error Handling Strategies

Robust error handling for async iteration:

// πŸ›‘οΈ Comprehensive error handling patterns
class RobustAsyncProcessor<T, R> {
  private retryAttempts: number;
  private retryDelay: number;
  
  constructor(options: { retryAttempts?: number; retryDelay?: number } = {}) {
    this.retryAttempts = options.retryAttempts || 3;
    this.retryDelay = options.retryDelay || 1000;
  }
  
  // πŸ”„ Process with automatic retry and error recovery
  async *processWithRetry(
    source: AsyncIterable<T>,
    processor: (item: T) => Promise<R>
  ): AsyncGenerator<ProcessingResult<R>, void, unknown> {
    for await (const item of source) {
      yield await this.processItemWithRetry(item, processor);
    }
  }
  
  // πŸ”„ Individual item processing with retry logic
  private async processItemWithRetry(
    item: T,
    processor: (item: T) => Promise<R>
  ): Promise<ProcessingResult<R>> {
    let lastError: Error | null = null;
    
    for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
      try {
        console.log(`πŸ”„ Processing attempt ${attempt}/${this.retryAttempts}`);
        
        const result = await processor(item);
        
        return {
          success: true,
          data: result,
          attempts: attempt
        };
        
      } catch (error) {
        lastError = error as Error;
        console.error(`πŸ’₯ Attempt ${attempt} failed:`, error.message);
        
        // πŸ”„ Wait before retrying (except on last attempt)
        if (attempt < this.retryAttempts) {
          await this.delay(this.retryDelay * attempt);
        }
      }
    }
    
    // 🚫 All attempts failed
    return {
      success: false,
      error: lastError!.message,
      attempts: this.retryAttempts
    };
  }
  
  // ⏳ Delay helper
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// πŸ—οΈ Processing result interface
interface ProcessingResult<T> {
  success: boolean;
  data?: T;
  error?: string;
  attempts: number;
}

// πŸ”§ Circuit breaker for failing services
class CircuitBreaker {
  private failures = 0;
  private lastFailureTime = 0;
  private state: 'closed' | 'open' | 'half-open' = 'closed';
  
  constructor(
    private failureThreshold: number = 5,
    private recoveryTimeout: number = 30000
  ) {}
  
  async execute<T>(operation: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
        this.state = 'half-open';
        console.log('πŸ”„ Circuit breaker half-open, testing...');
      } else {
        throw new Error('Circuit breaker is open - service unavailable');
      }
    }
    
    try {
      const result = await operation();
      
      // βœ… Success - reset circuit breaker
      if (this.state === 'half-open') {
        this.state = 'closed';
        this.failures = 0;
        console.log('βœ… Circuit breaker closed - service recovered');
      }
      
      return result;
      
    } catch (error) {
      this.failures++;
      this.lastFailureTime = Date.now();
      
      if (this.failures >= this.failureThreshold) {
        this.state = 'open';
        console.log('🚫 Circuit breaker opened - service failing');
      }
      
      throw error;
    }
  }
}

πŸ† Summary

Congratulations! πŸŽ‰ You’ve mastered async iterators and for-await-of loops in TypeScript! Here’s what you’ve accomplished:

🎯 Key Takeaways

  1. Streaming Power 🌊: You can now process large datasets efficiently without memory overload
  2. Type Safety πŸ›‘οΈ: Maintain strong typing throughout complex async iteration flows
  3. Performance Optimization πŸš€: Build high-performance data processing pipelines
  4. Error Resilience πŸ’ͺ: Handle failures gracefully with retry and recovery patterns
  5. Real-World Applications 🏭: Create production-ready data ingestion systems

πŸ› οΈ What You Can Build

  • Data Processing Pipelines πŸ“Š: Stream through massive datasets efficiently
  • Real-Time Event Systems ⚑: Handle live data feeds and streaming APIs
  • Batch Processing Systems πŸ“¦: Process large volumes of data in manageable chunks
  • API Pagination Handlers πŸ“„: Seamlessly iterate through paginated API responses
  • File Streaming Processors πŸ“: Process large files without loading them entirely

πŸš€ Next Steps

Ready to take your async skills even further? Consider exploring:

  • Event Emitters πŸ“‘: Type-safe event-driven programming patterns
  • Reactive Streams 🌊: RxJS integration with TypeScript
  • Worker Threads πŸ‘₯: Parallel processing with Web Workers
  • Performance Monitoring πŸ“ˆ: Advanced async operation profiling

You’re now equipped to build scalable, efficient async applications that can handle any data processing challenge! 🎯 Keep iterating and streaming! πŸ”„βœ¨