+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 114 of 355

๐ŸŒŠ Async Generators: Yielding Promises in TypeScript

Master async generators for streaming data, lazy evaluation, and memory-efficient processing with practical examples and advanced patterns ๐Ÿš€

๐Ÿš€Intermediate
19 min read

Prerequisites

  • Understanding of async functions and Promises ๐Ÿ“
  • Basic generator function knowledge โšก
  • Experience with iterators and iteration protocols ๐Ÿ’ป

What you'll learn

  • Master async generator functions and yield syntax ๐ŸŽฏ
  • Build streaming data processing systems ๐Ÿ—๏ธ
  • Handle backpressure and memory-efficient operations ๐Ÿ›
  • Create powerful async iteration patterns with TypeScript โœจ

๐ŸŽฏ Introduction

Welcome to the powerful world of async generators! ๐ŸŒŠ If regular generators are like lazy evaluation for synchronous data, then async generators are like lazy evaluation for asynchronous data streams. Theyโ€™re the perfect fusion of async/await and generator functions!

Think of async generators as data faucets ๐Ÿšฐ - they can produce data on-demand, handle massive datasets without consuming all your memory, and process streams of information that might come from APIs, databases, or file systems.

By the end of this tutorial, youโ€™ll be a master of async iteration, able to build memory-efficient streaming applications that handle real-time data with grace and performance. Letโ€™s dive into the flow! ๐Ÿ„โ€โ™‚๏ธ

๐Ÿ“š Understanding Async Generators

๐Ÿค” What Are Async Generators?

Async generators combine the lazy evaluation power of generators with the asynchronous capabilities of async/await. Theyโ€™re functions that can yield multiple values over time, where each yield operation can involve asynchronous work!

// ๐ŸŒŸ Basic async generator example
async function* countWithDelay(): AsyncGenerator<number, void, unknown> {
  let count = 1;
  
  while (count <= 5) {
    console.log(`๐Ÿ“Š Generating number ${count}`);
    await new Promise(resolve => setTimeout(resolve, 1000)); // Async delay
    yield count; // Yield the current count
    count++;
  }
  
  console.log('โœ… Counting complete!');
}

// ๐Ÿ”„ Using the async generator
async function demonstrateBasicUsage(): Promise<void> {
  console.log('๐Ÿš€ Starting async generator demo...');
  
  for await (const number of countWithDelay()) {
    console.log(`๐Ÿ“จ Received: ${number}`);
  }
  
  console.log('๐ŸŽ‰ Demo complete!');
}

๐Ÿ’ก Key Characteristics

  • ๐ŸŽ Lazy Evaluation: Values are generated on-demand, not all at once
  • โšก Async Yields: Each yield can involve asynchronous operations
  • ๐ŸŽฏ Memory Efficient: Only one value in memory at a time
  • ๐Ÿ›ก๏ธ Backpressure Handling: Consumer controls the pace of data generation
// ๐ŸŽจ TypeScript's type inference with async generators
async function* generateMixedData(): AsyncGenerator<string | number, void, unknown> {
  yield "starting..."; // string
  await new Promise(resolve => setTimeout(resolve, 100));
  yield 42; // number
  await new Promise(resolve => setTimeout(resolve, 100));
  yield "ending..."; // string
}

// Type: AsyncGenerator<string | number, void, unknown>

๐Ÿ†š Async Generators vs Other Patterns

// ๐Ÿƒโ€โ™‚๏ธ Regular array - all data loaded at once
async function getAllDataAtOnce(): Promise<number[]> {
  const data: number[] = [];
  
  for (let i = 1; i <= 1000000; i++) {
    // ๐Ÿ’พ All 1 million items in memory!
    data.push(await fetchDataFromAPI(i));
  }
  
  return data; // ๐ŸŒ Slow, memory intensive
}

// ๐Ÿ”ฎ Promise-based streaming - complex callback handling
function streamWithCallbacks(
  onData: (item: number) => void,
  onComplete: () => void
): void {
  let i = 1;
  
  const processNext = async () => {
    if (i <= 1000000) {
      const item = await fetchDataFromAPI(i);
      onData(item);
      i++;
      setImmediate(processNext); // Next iteration
    } else {
      onComplete();
    }
  };
  
  processNext();
}

// ๐ŸŒŠ Async generator - clean and memory efficient
async function* streamData(): AsyncGenerator<number, void, unknown> {
  for (let i = 1; i <= 1000000; i++) {
    // ๐Ÿ’ก Only one item in memory at a time
    yield await fetchDataFromAPI(i);
  }
}

async function fetchDataFromAPI(id: number): Promise<number> {
  await new Promise(resolve => setTimeout(resolve, 10));
  return id * 2;
}

๐Ÿ”ง Core Syntax and Patterns

๐Ÿ“ Basic Async Generator Syntax

// ๐Ÿ—๏ธ Data processing pipeline
interface DataItem {
  id: string;
  value: number;
  timestamp: Date;
  processed: boolean;
}

class DataProcessor {
  
  // ๐ŸŒŠ Generate data items with async processing
  async* processDataStream(source: string[]): AsyncGenerator<DataItem, void, unknown> {
    console.log(`๐Ÿš€ Starting to process ${source.length} items...`);
    
    for (const [index, item] of source.entries()) {
      console.log(`โš™๏ธ Processing item ${index + 1}/${source.length}: ${item}`);
      
      // ๐Ÿ”„ Simulate async processing (API call, database query, etc.)
      await this.delay(100);
      
      // ๐Ÿ—๏ธ Create processed data item
      const dataItem: DataItem = {
        id: `item_${index + 1}`,
        value: item.length * Math.random(),
        timestamp: new Date(),
        processed: true
      };
      
      // ๐ŸŽ Yield the processed item
      yield dataItem;
      
      // ๐Ÿ“Š Progress logging
      if ((index + 1) % 10 === 0) {
        console.log(`๐Ÿ“ˆ Progress: ${index + 1}/${source.length} items processed`);
      }
    }
    
    console.log('โœ… All items processed!');
  }
  
  // ๐Ÿ”„ Process with conditional yielding
  async* filterAndProcess(
    source: number[],
    condition: (value: number) => boolean
  ): AsyncGenerator<{ original: number; processed: number }, void, unknown> {
    let processedCount = 0;
    
    for (const value of source) {
      // ๐Ÿ” Check condition first
      if (condition(value)) {
        // ๐ŸŒ Async processing only for items that pass the filter
        await this.delay(50);
        
        const processed = await this.complexProcessing(value);
        processedCount++;
        
        yield {
          original: value,
          processed
        };
        
        console.log(`โœ… Processed item ${processedCount}: ${value} -> ${processed}`);
      } else {
        console.log(`โญ๏ธ Skipped: ${value} (didn't meet condition)`);
      }
    }
    
    console.log(`๐ŸŽฏ Total processed: ${processedCount} items`);
  }
  
  // ๐ŸŽฎ Helper methods
  private async delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
  
  private async complexProcessing(value: number): Promise<number> {
    // Simulate complex async calculation
    await this.delay(Math.random() * 100);
    return value * value + Math.floor(Math.random() * 100);
  }
}

// ๐ŸŽฏ Usage examples
async function demonstrateDataProcessing(): Promise<void> {
  const processor = new DataProcessor();
  
  // ๐Ÿ“‹ Source data
  const sourceItems = ['apple', 'banana', 'cherry', 'date', 'elderberry'];
  
  console.log('๐ŸŒŠ Streaming data processing:');
  for await (const item of processor.processDataStream(sourceItems)) {
    console.log(`๐Ÿ“จ Received processed item:`, {
      id: item.id,
      value: item.value.toFixed(2),
      processed: item.processed
    });
  }
  
  console.log('\n๐Ÿ” Filtered processing:');
  const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  
  for await (const result of processor.filterAndProcess(numbers, n => n % 2 === 0)) {
    console.log(`๐Ÿ“จ Even number processed: ${result.original} -> ${result.processed}`);
  }
}

๐ŸŽจ Advanced Yielding Patterns

// ๐Ÿ—๏ธ Multi-source data aggregation
class DataAggregator {
  
  // ๐ŸŒ Merge multiple async data sources
  async* mergeMultipleSources(
    ...sources: AsyncGenerator<any, void, unknown>[]
  ): AsyncGenerator<{ source: number; data: any }, void, unknown> {
    
    // ๐ŸŽฏ Create iterators for all sources
    const iterators = sources.map((source, index) => ({
      index,
      iterator: source[Symbol.asyncIterator](),
      done: false,
      pending: null as Promise<IteratorResult<any, void>> | null
    }));
    
    // ๐Ÿ”„ Start first iteration for all sources
    for (const iter of iterators) {
      if (!iter.done) {
        iter.pending = iter.iterator.next();
      }
    }
    
    while (iterators.some(iter => !iter.done)) {
      // ๐Ÿ Race all pending operations
      const pendingPromises = iterators
        .filter(iter => iter.pending)
        .map(async (iter) => {
          const result = await iter.pending!;
          return { iter, result };
        });
      
      if (pendingPromises.length === 0) break;
      
      // โšก Get the first result
      const { iter, result } = await Promise.race(pendingPromises);
      
      if (result.done) {
        iter.done = true;
        iter.pending = null;
      } else {
        // ๐ŸŽ Yield the result with source information
        yield {
          source: iter.index,
          data: result.value
        };
        
        // ๐Ÿ”„ Start next iteration for this source
        iter.pending = iter.iterator.next();
      }
    }
    
    console.log('โœ… All sources exhausted!');
  }
  
  // ๐Ÿ“Š Batch yielding for efficiency
  async* batchProcessor<T>(
    source: AsyncGenerator<T, void, unknown>,
    batchSize: number
  ): AsyncGenerator<T[], void, unknown> {
    let batch: T[] = [];
    
    for await (const item of source) {
      batch.push(item);
      
      // ๐Ÿ“ฆ Yield when batch is full
      if (batch.length >= batchSize) {
        console.log(`๐Ÿ“ฆ Yielding batch of ${batch.length} items`);
        yield [...batch]; // Create a copy
        batch = []; // Reset batch
      }
    }
    
    // ๐Ÿ“‹ Yield remaining items if any
    if (batch.length > 0) {
      console.log(`๐Ÿ“ฆ Yielding final batch of ${batch.length} items`);
      yield batch;
    }
  }
  
  // ๐ŸŽฏ Rate-limited processing
  async* rateLimitedProcessor<T>(
    source: AsyncGenerator<T, void, unknown>,
    itemsPerSecond: number
  ): AsyncGenerator<T, void, unknown> {
    const delayMs = 1000 / itemsPerSecond;
    let lastYieldTime = 0;
    
    for await (const item of source) {
      const now = Date.now();
      const timeSinceLastYield = now - lastYieldTime;
      
      // โฑ๏ธ Enforce rate limit
      if (timeSinceLastYield < delayMs) {
        const remainingDelay = delayMs - timeSinceLastYield;
        console.log(`โฐ Rate limiting: waiting ${remainingDelay.toFixed(0)}ms`);
        await new Promise(resolve => setTimeout(resolve, remainingDelay));
      }
      
      lastYieldTime = Date.now();
      yield item;
    }
  }
}

๐Ÿ’ก Real-World Applications

๐Ÿ›’ Example 1: E-commerce Order Processing

// ๐Ÿช E-commerce order processing system
interface Order {
  id: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  status: 'pending' | 'processing' | 'fulfilled' | 'failed';
  createdAt: Date;
}

interface ProcessedOrder extends Order {
  inventoryChecked: boolean;
  paymentProcessed: boolean;
  shippingCalculated: boolean;
  total: number;
}

class OrderProcessingSystem {
  
  // ๐ŸŒŠ Process orders as they come in (streaming)
  async* processOrderStream(
    orderSource: AsyncGenerator<Order, void, unknown>
  ): AsyncGenerator<ProcessedOrder, void, unknown> {
    
    console.log('๐Ÿช Starting order processing stream...');
    let processedCount = 0;
    
    for await (const order of orderSource) {
      try {
        console.log(`๐Ÿ“‹ Processing order ${order.id} for customer ${order.customerId}`);
        
        // ๐Ÿ” Step 1: Check inventory
        console.log(`๐Ÿ“ฆ Checking inventory for order ${order.id}...`);
        await this.checkInventory(order);
        
        // ๐Ÿ’ณ Step 2: Process payment
        console.log(`๐Ÿ’ณ Processing payment for order ${order.id}...`);
        await this.processPayment(order);
        
        // ๐Ÿšš Step 3: Calculate shipping
        console.log(`๐Ÿšš Calculating shipping for order ${order.id}...`);
        const shippingCost = await this.calculateShipping(order);
        
        // ๐Ÿ’ฐ Calculate total
        const itemTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
        const total = itemTotal + shippingCost;
        
        // โœ… Create processed order
        const processedOrder: ProcessedOrder = {
          ...order,
          status: 'fulfilled',
          inventoryChecked: true,
          paymentProcessed: true,
          shippingCalculated: true,
          total
        };
        
        processedCount++;
        console.log(`โœ… Order ${order.id} processed successfully! Total: $${total.toFixed(2)}`);
        
        // ๐ŸŽ Yield the processed order
        yield processedOrder;
        
      } catch (error) {
        console.error(`โŒ Failed to process order ${order.id}:`, error.message);
        
        // ๐Ÿšจ Yield failed order
        yield {
          ...order,
          status: 'failed',
          inventoryChecked: false,
          paymentProcessed: false,
          shippingCalculated: false,
          total: 0
        };
      }
    }
    
    console.log(`๐ŸŽฏ Processing complete! Processed ${processedCount} orders`);
  }
  
  // ๐Ÿ“Š Generate analytics from processed orders
  async* analyzeOrders(
    processedOrders: AsyncGenerator<ProcessedOrder, void, unknown>
  ): AsyncGenerator<{ metric: string; value: number; timestamp: Date }, void, unknown> {
    
    let totalRevenue = 0;
    let orderCount = 0;
    let successfulOrders = 0;
    let failedOrders = 0;
    
    for await (const order of processedOrders) {
      orderCount++;
      
      if (order.status === 'fulfilled') {
        successfulOrders++;
        totalRevenue += order.total;
      } else {
        failedOrders++;
      }
      
      // ๐Ÿ“ˆ Yield metrics every 10 orders
      if (orderCount % 10 === 0) {
        yield {
          metric: 'total_revenue',
          value: totalRevenue,
          timestamp: new Date()
        };
        
        yield {
          metric: 'success_rate',
          value: (successfulOrders / orderCount) * 100,
          timestamp: new Date()
        };
        
        yield {
          metric: 'average_order_value',
          value: successfulOrders > 0 ? totalRevenue / successfulOrders : 0,
          timestamp: new Date()
        };
      }
    }
    
    // ๐Ÿ“Š Final metrics
    console.log(`๐Ÿ“Š Final Analytics:
      ๐Ÿ“‹ Total Orders: ${orderCount}
      โœ… Successful: ${successfulOrders}
      โŒ Failed: ${failedOrders}
      ๐Ÿ’ฐ Total Revenue: $${totalRevenue.toFixed(2)}
      ๐Ÿ“ˆ Success Rate: ${((successfulOrders / orderCount) * 100).toFixed(1)}%
    `);
  }
  
  // ๐Ÿ—๏ธ Helper methods
  private async checkInventory(order: Order): Promise<void> {
    await this.delay(100 + Math.random() * 200); // Simulate inventory check
    if (Math.random() < 0.05) { // 5% chance of inventory failure
      throw new Error('Insufficient inventory');
    }
  }
  
  private async processPayment(order: Order): Promise<void> {
    await this.delay(200 + Math.random() * 300); // Simulate payment processing
    if (Math.random() < 0.03) { // 3% chance of payment failure
      throw new Error('Payment processing failed');
    }
  }
  
  private async calculateShipping(order: Order): Promise<number> {
    await this.delay(50 + Math.random() * 100); // Simulate shipping calculation
    return 5.99 + (Math.random() * 10); // Base shipping + variable cost
  }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// ๐ŸŽฎ Demo order processing
async function demonstrateOrderProcessing(): Promise<void> {
  const orderSystem = new OrderProcessingSystem();
  
  // ๐Ÿญ Create a mock order generator
  async function* generateOrders(): AsyncGenerator<Order, void, unknown> {
    for (let i = 1; i <= 25; i++) {
      yield {
        id: `ORDER_${i.toString().padStart(3, '0')}`,
        customerId: `CUST_${Math.floor(Math.random() * 100) + 1}`,
        items: [
          {
            productId: `PROD_${Math.floor(Math.random() * 50) + 1}`,
            quantity: Math.floor(Math.random() * 3) + 1,
            price: Math.random() * 100 + 10
          }
        ],
        status: 'pending',
        createdAt: new Date()
      };
      
      // ๐Ÿ“… Simulate orders coming in over time
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
  
  // ๐ŸŒŠ Process the stream
  const processedStream = orderSystem.processOrderStream(generateOrders());
  const analyticsStream = orderSystem.analyzeOrders(processedStream);
  
  // ๐Ÿ“Š Consume analytics
  for await (const metric of analyticsStream) {
    console.log(`๐Ÿ“Š Metric: ${metric.metric} = ${metric.value.toFixed(2)} at ${metric.timestamp.toLocaleTimeString()}`);
  }
}

๐ŸŽฎ Example 2: Real-Time Log Processing

// ๐Ÿ“œ Real-time log processing system
interface LogEntry {
  timestamp: Date;
  level: 'info' | 'warn' | 'error' | 'debug';
  message: string;
  source: string;
  metadata?: Record<string, any>;
}

interface ProcessedLog extends LogEntry {
  processed: true;
  severity: number;
  category: string;
  alertTriggered?: boolean;
}

class LogProcessor {
  
  // ๐Ÿ“Š Process log streams with filtering and alerting
  async* processLogStream(
    logSource: AsyncGenerator<LogEntry, void, unknown>,
    config: {
      minLevel: LogEntry['level'];
      alertPatterns: RegExp[];
      categoryRules: Array<{ pattern: RegExp; category: string }>;
    }
  ): AsyncGenerator<ProcessedLog, void, unknown> {
    
    const levelSeverity = { debug: 1, info: 2, warn: 3, error: 4 };
    const minSeverity = levelSeverity[config.minLevel];
    
    console.log(`๐Ÿ“œ Starting log processing with min level: ${config.minLevel}`);
    
    for await (const log of logSource) {
      const severity = levelSeverity[log.level];
      
      // ๐Ÿ” Filter by minimum level
      if (severity < minSeverity) {
        continue; // Skip low-severity logs
      }
      
      // ๐Ÿท๏ธ Categorize the log
      let category = 'general';
      for (const rule of config.categoryRules) {
        if (rule.pattern.test(log.message)) {
          category = rule.category;
          break;
        }
      }
      
      // ๐Ÿšจ Check for alert patterns
      let alertTriggered = false;
      for (const pattern of config.alertPatterns) {
        if (pattern.test(log.message)) {
          alertTriggered = true;
          console.log(`๐Ÿšจ ALERT TRIGGERED: ${log.message}`);
          break;
        }
      }
      
      // ๐Ÿ—๏ธ Create processed log
      const processedLog: ProcessedLog = {
        ...log,
        processed: true,
        severity,
        category,
        alertTriggered
      };
      
      // ๐ŸŽ Yield processed log
      yield processedLog;
    }
  }
  
  // ๐Ÿ“Š Aggregate log statistics
  async* aggregateLogStats(
    processedLogs: AsyncGenerator<ProcessedLog, void, unknown>,
    windowSizeMs: number = 60000 // 1 minute windows
  ): AsyncGenerator<{
    windowStart: Date;
    windowEnd: Date;
    stats: {
      total: number;
      byLevel: Record<string, number>;
      byCategory: Record<string, number>;
      alertsTriggered: number;
      errorRate: number;
    };
  }, void, unknown> {
    
    let windowStart = new Date();
    let windowEnd = new Date(windowStart.getTime() + windowSizeMs);
    
    let currentWindow = {
      total: 0,
      byLevel: {} as Record<string, number>,
      byCategory: {} as Record<string, number>,
      alertsTriggered: 0,
      errorRate: 0
    };
    
    for await (const log of processedLogs) {
      // ๐Ÿ“… Check if we need to start a new window
      if (log.timestamp >= windowEnd) {
        // ๐Ÿ“Š Calculate final stats for current window
        currentWindow.errorRate = currentWindow.total > 0 
          ? ((currentWindow.byLevel.error || 0) / currentWindow.total) * 100 
          : 0;
        
        // ๐ŸŽ Yield current window stats
        yield {
          windowStart,
          windowEnd,
          stats: { ...currentWindow }
        };
        
        // ๐Ÿ”„ Start new window
        windowStart = new Date(windowEnd);
        windowEnd = new Date(windowStart.getTime() + windowSizeMs);
        currentWindow = {
          total: 0,
          byLevel: {},
          byCategory: {},
          alertsTriggered: 0,
          errorRate: 0
        };
      }
      
      // ๐Ÿ“ˆ Update current window stats
      currentWindow.total++;
      currentWindow.byLevel[log.level] = (currentWindow.byLevel[log.level] || 0) + 1;
      currentWindow.byCategory[log.category] = (currentWindow.byCategory[log.category] || 0) + 1;
      
      if (log.alertTriggered) {
        currentWindow.alertsTriggered++;
      }
    }
    
    // ๐Ÿ“Š Yield final window if it has data
    if (currentWindow.total > 0) {
      currentWindow.errorRate = (currentWindow.byLevel.error || 0) / currentWindow.total * 100;
      yield {
        windowStart,
        windowEnd,
        stats: currentWindow
      };
    }
  }
}

// ๐ŸŽฎ Demo log processing
async function demonstrateLogProcessing(): Promise<void> {
  const logProcessor = new LogProcessor();
  
  // ๐Ÿญ Generate sample logs
  async function* generateLogs(): AsyncGenerator<LogEntry, void, unknown> {
    const sources = ['web-server', 'database', 'auth-service', 'payment-gateway'];
    const messages = [
      'User login successful',
      'Database connection established',
      'Payment processed successfully',
      'Error: Connection timeout',
      'Warning: High memory usage detected',
      'Critical: Database connection failed',
      'Info: Cache cleared',
      'Debug: Request processed in 150ms'
    ];
    
    for (let i = 0; i < 100; i++) {
      const level = ['info', 'warn', 'error', 'debug'][Math.floor(Math.random() * 4)] as LogEntry['level'];
      const source = sources[Math.floor(Math.random() * sources.length)];
      const message = messages[Math.floor(Math.random() * messages.length)];
      
      yield {
        timestamp: new Date(),
        level,
        message,
        source,
        metadata: { requestId: `req_${i}`, userId: `user_${Math.floor(Math.random() * 100)}` }
      };
      
      // ๐Ÿ“… Simulate logs coming in over time
      await new Promise(resolve => setTimeout(resolve, 50));
    }
  }
  
  // โš™๏ธ Configure processing
  const config = {
    minLevel: 'info' as const,
    alertPatterns: [/Error:|Critical:/],
    categoryRules: [
      { pattern: /login|auth/i, category: 'authentication' },
      { pattern: /payment|billing/i, category: 'financial' },
      { pattern: /database|connection/i, category: 'infrastructure' },
      { pattern: /memory|cpu/i, category: 'performance' }
    ]
  };
  
  // ๐ŸŒŠ Process the log stream
  const processedStream = logProcessor.processLogStream(generateLogs(), config);
  const statsStream = logProcessor.aggregateLogStats(processedStream, 5000); // 5-second windows for demo
  
  // ๐Ÿ“Š Consume statistics
  for await (const window of statsStream) {
    console.log(`๐Ÿ“Š Window ${window.windowStart.toLocaleTimeString()} - ${window.windowEnd.toLocaleTimeString()}:`);
    console.log(`   ๐Ÿ“‹ Total logs: ${window.stats.total}`);
    console.log(`   ๐Ÿ“ˆ By level:`, window.stats.byLevel);
    console.log(`   ๐Ÿท๏ธ By category:`, window.stats.byCategory);
    console.log(`   ๐Ÿšจ Alerts: ${window.stats.alertsTriggered}`);
    console.log(`   โŒ Error rate: ${window.stats.errorRate.toFixed(1)}%\n`);
  }
}

๐Ÿš€ Advanced Patterns

๐Ÿง™โ€โ™‚๏ธ Error Handling in Async Generators

// ๐Ÿ›ก๏ธ Robust error handling patterns
class ResilientDataProcessor {
  
  // ๐Ÿ”„ Retry logic with backoff
  async* processWithRetry<T, R>(
    source: AsyncGenerator<T, void, unknown>,
    processor: (item: T) => Promise<R>,
    maxRetries: number = 3
  ): AsyncGenerator<{ success: true; data: R } | { success: false; error: Error; item: T }, void, unknown> {
    
    for await (const item of source) {
      let attempts = 0;
      let lastError: Error | null = null;
      
      while (attempts <= maxRetries) {
        try {
          console.log(`๐Ÿ”„ Processing item (attempt ${attempts + 1}/${maxRetries + 1})`);
          const result = await processor(item);
          
          // โœ… Success!
          yield { success: true, data: result };
          break; // Move to next item
          
        } catch (error) {
          attempts++;
          lastError = error instanceof Error ? error : new Error('Unknown error');
          
          if (attempts <= maxRetries) {
            // โฑ๏ธ Exponential backoff
            const backoffMs = Math.pow(2, attempts) * 1000;
            console.log(`โš ๏ธ Attempt ${attempts} failed, retrying in ${backoffMs}ms...`);
            await new Promise(resolve => setTimeout(resolve, backoffMs));
          }
        }
      }
      
      // โŒ All retries exhausted
      if (lastError) {
        console.log(`โŒ Item processing failed after ${maxRetries + 1} attempts`);
        yield { success: false, error: lastError, item };
      }
    }
  }
  
  // ๐Ÿ›ก๏ธ Circuit breaker pattern
  async* processWithCircuitBreaker<T, R>(
    source: AsyncGenerator<T, void, unknown>,
    processor: (item: T) => Promise<R>,
    config: {
      failureThreshold: number;
      resetTimeoutMs: number;
      monitoringWindowMs: number;
    }
  ): AsyncGenerator<{ success: true; data: R } | { success: false; error: Error; circuitOpen: boolean }, void, unknown> {
    
    let failures = 0;
    let lastFailureTime = 0;
    let circuitOpen = false;
    let windowStart = Date.now();
    let requestsInWindow = 0;
    
    for await (const item of source) {
      const now = Date.now();
      
      // ๐Ÿ”„ Reset monitoring window
      if (now - windowStart > config.monitoringWindowMs) {
        windowStart = now;
        requestsInWindow = 0;
        failures = 0;
      }
      
      requestsInWindow++;
      
      // ๐Ÿšช Check if circuit should be closed (reset)
      if (circuitOpen && (now - lastFailureTime) > config.resetTimeoutMs) {
        console.log('๐Ÿ”„ Circuit breaker: Attempting to close circuit');
        circuitOpen = false;
        failures = 0;
      }
      
      // โšก Circuit breaker is open - fail fast
      if (circuitOpen) {
        yield {
          success: false,
          error: new Error('Circuit breaker is open'),
          circuitOpen: true
        };
        continue;
      }
      
      try {
        const result = await processor(item);
        
        // โœ… Success - reset failure count
        if (failures > 0) {
          console.log('โœ… Success after failures - resetting circuit breaker');
          failures = 0;
        }
        
        yield { success: true, data: result };
        
      } catch (error) {
        failures++;
        lastFailureTime = now;
        
        // ๐Ÿšจ Check if we should open the circuit
        if (failures >= config.failureThreshold) {
          console.log(`๐Ÿšจ Circuit breaker: Opening circuit (${failures} failures in window)`);
          circuitOpen = true;
        }
        
        yield {
          success: false,
          error: error instanceof Error ? error : new Error('Unknown error'),
          circuitOpen
        };
      }
    }
  }
}

๐ŸŽฏ Memory-Efficient Large Dataset Processing

// ๐Ÿ’พ Memory-conscious data processing
class LargeDataProcessor {
  
  // ๐Ÿ—‚๏ธ Process CSV files without loading entire file into memory
  async* processCsvStream(
    filePath: string,
    chunkSize: number = 1024 * 1024 // 1MB chunks
  ): AsyncGenerator<{ [key: string]: string }, void, unknown> {
    
    console.log(`๐Ÿ“„ Processing CSV file: ${filePath}`);
    
    // ๐ŸŽฏ In a real implementation, you'd use Node.js streams
    // This is a simplified example
    const fileContent = await this.readFileInChunks(filePath, chunkSize);
    
    let buffer = '';
    let headers: string[] = [];
    let isFirstRow = true;
    
    for await (const chunk of fileContent) {
      buffer += chunk;
      
      // ๐Ÿ“ Process complete lines
      const lines = buffer.split('\n');
      buffer = lines.pop() || ''; // Keep incomplete line in buffer
      
      for (const line of lines) {
        if (line.trim() === '') continue;
        
        const values = this.parseCsvLine(line);
        
        if (isFirstRow) {
          headers = values;
          isFirstRow = false;
          console.log(`๐Ÿ“‹ CSV Headers: ${headers.join(', ')}`);
          continue;
        }
        
        // ๐Ÿ—๏ธ Create row object
        const row: { [key: string]: string } = {};
        for (let i = 0; i < Math.min(headers.length, values.length); i++) {
          row[headers[i]] = values[i];
        }
        
        yield row;
      }
    }
    
    // ๐Ÿ“ Process any remaining data in buffer
    if (buffer.trim() && headers.length > 0) {
      const values = this.parseCsvLine(buffer);
      const row: { [key: string]: string } = {};
      for (let i = 0; i < Math.min(headers.length, values.length); i++) {
        row[headers[i]] = values[i];
      }
      yield row;
    }
    
    console.log('โœ… CSV processing complete');
  }
  
  // ๐Ÿ”„ Paginated API data fetching
  async* fetchPaginatedData<T>(
    baseUrl: string,
    pageSize: number = 100,
    maxPages?: number
  ): AsyncGenerator<T, void, unknown> {
    
    let page = 1;
    let hasMore = true;
    let totalFetched = 0;
    
    while (hasMore && (!maxPages || page <= maxPages)) {
      console.log(`๐Ÿ“„ Fetching page ${page} (${pageSize} items per page)`);
      
      try {
        // ๐ŸŒ Fetch current page
        const response = await fetch(`${baseUrl}?page=${page}&limit=${pageSize}`);
        
        if (!response.ok) {
          throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
        
        const data = await response.json();
        const items = data.items || [];
        
        // ๐ŸŽ Yield each item individually
        for (const item of items) {
          yield item;
          totalFetched++;
        }
        
        // ๐Ÿ” Check if there are more pages
        hasMore = data.hasMore || items.length === pageSize;
        page++;
        
        console.log(`๐Ÿ“Š Progress: ${totalFetched} items fetched so far`);
        
        // ๐Ÿ˜ด Rate limiting - be nice to the API
        await new Promise(resolve => setTimeout(resolve, 100));
        
      } catch (error) {
        console.error(`โŒ Error fetching page ${page}:`, error.message);
        break;
      }
    }
    
    console.log(`โœ… Pagination complete. Total items: ${totalFetched}`);
  }
  
  // ๐Ÿ—‚๏ธ Helper methods
  private async* readFileInChunks(filePath: string, chunkSize: number): AsyncGenerator<string, void, unknown> {
    // ๐ŸŽฏ Simulate file reading in chunks
    const mockFileContent = `name,age,city,occupation
John Doe,30,New York,Engineer
Jane Smith,25,San Francisco,Designer
Bob Johnson,35,Chicago,Manager
Alice Brown,28,Boston,Developer
Charlie Wilson,32,Seattle,Analyst`.repeat(1000); // Large mock file
    
    for (let i = 0; i < mockFileContent.length; i += chunkSize) {
      const chunk = mockFileContent.slice(i, i + chunkSize);
      yield chunk;
      
      // ๐Ÿ“… Simulate I/O delay
      await new Promise(resolve => setTimeout(resolve, 10));
    }
  }
  
  private parseCsvLine(line: string): string[] {
    // ๐ŸŽฏ Simple CSV parser (doesn't handle quotes/escaping)
    return line.split(',').map(value => value.trim());
  }
}

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Memory Leaks with Infinite Generators

// โŒ Dangerous - infinite generator without proper cleanup
async function* dangerousInfiniteGenerator(): AsyncGenerator<number, never, unknown> {
  let count = 0;
  const cache = new Map(); // ๐Ÿ’ฅ This keeps growing!
  
  while (true) {
    cache.set(count, `value_${count}`); // Memory leak!
    yield count++;
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

// โœ… Safe - infinite generator with proper memory management
async function* safeInfiniteGenerator(maxCacheSize: number = 1000): AsyncGenerator<number, never, unknown> {
  let count = 0;
  const cache = new Map();
  
  while (true) {
    // ๐Ÿงน Clean up cache when it gets too large
    if (cache.size >= maxCacheSize) {
      const keysToDelete = Array.from(cache.keys()).slice(0, Math.floor(maxCacheSize / 2));
      keysToDelete.forEach(key => cache.delete(key));
      console.log(`๐Ÿงน Cleaned up cache, removed ${keysToDelete.length} entries`);
    }
    
    cache.set(count, `value_${count}`);
    yield count++;
    await new Promise(resolve => setTimeout(resolve, 100));
  }
}

// ๐ŸŽฏ Safe consumption with early termination
async function consumeInfiniteGenerator(): Promise<void> {
  const generator = safeInfiniteGenerator();
  let consumed = 0;
  
  for await (const value of generator) {
    console.log(`๐Ÿ“จ Received: ${value}`);
    consumed++;
    
    // โœ… Always have an exit condition for infinite generators
    if (consumed >= 50) {
      console.log('๐Ÿ›‘ Terminating infinite generator');
      await generator.return(undefined); // Clean shutdown
      break;
    }
  }
}

๐Ÿคฏ Pitfall 2: Unhandled Async Generator Errors

// โŒ Dangerous - errors can crash the entire stream
async function* dangerousGenerator(): AsyncGenerator<string, void, unknown> {
  for (let i = 1; i <= 10; i++) {
    if (i === 5) {
      throw new Error('Something went wrong!'); // ๐Ÿ’ฅ Crashes everything
    }
    yield `Item ${i}`;
  }
}

// โœ… Safe - proper error handling within generator
async function* resilientGenerator(): AsyncGenerator<{ success: true; data: string } | { success: false; error: string }, void, unknown> {
  for (let i = 1; i <= 10; i++) {
    try {
      // ๐ŸŽฏ Simulate work that might fail
      await new Promise(resolve => setTimeout(resolve, 100));
      
      if (i === 5 && Math.random() < 0.7) {
        throw new Error(`Processing failed for item ${i}`);
      }
      
      yield { success: true, data: `Item ${i}` };
      
    } catch (error) {
      console.error(`โš ๏ธ Error processing item ${i}:`, error.message);
      yield { 
        success: false, 
        error: error instanceof Error ? error.message : 'Unknown error' 
      };
    }
  }
}

// ๐Ÿ›ก๏ธ Safe consumption with error handling
async function safeConsumption(): Promise<void> {
  try {
    for await (const result of resilientGenerator()) {
      if (result.success) {
        console.log(`โœ… ${result.data}`);
      } else {
        console.log(`โŒ Error: ${result.error}`);
      }
    }
  } catch (error) {
    console.error('๐Ÿ’ฅ Generator threw an unhandled error:', error);
  }
}

๐Ÿ› ๏ธ Best Practices

๐ŸŽฏ Performance Optimization

// ๐Ÿ“Š Performance monitoring and optimization
class OptimizedAsyncGenerator {
  
  // ๐Ÿš€ Buffered processing for better throughput
  async* bufferedProcessor<T, R>(
    source: AsyncGenerator<T, void, unknown>,
    processor: (items: T[]) => Promise<R[]>,
    bufferSize: number = 10
  ): AsyncGenerator<R, void, unknown> {
    
    let buffer: T[] = [];
    
    for await (const item of source) {
      buffer.push(item);
      
      // ๐Ÿ“ฆ Process when buffer is full
      if (buffer.length >= bufferSize) {
        const results = await processor(buffer);
        for (const result of results) {
          yield result;
        }
        buffer = [];
      }
    }
    
    // ๐Ÿ“‹ Process remaining items
    if (buffer.length > 0) {
      const results = await processor(buffer);
      for (const result of results) {
        yield result;
      }
    }
  }
  
  // โšก Parallel processing with controlled concurrency
  async* parallelProcessor<T, R>(
    source: AsyncGenerator<T, void, unknown>,
    processor: (item: T) => Promise<R>,
    concurrency: number = 3
  ): AsyncGenerator<R, void, unknown> {
    
    const semaphore = new Semaphore(concurrency);
    const pending = new Map<number, Promise<{ index: number; result: R }>>();
    let inputIndex = 0;
    let outputIndex = 0;
    let sourceExhausted = false;
    
    // ๐Ÿ”„ Start initial batch
    const sourceIterator = source[Symbol.asyncIterator]();
    
    while (!sourceExhausted || pending.size > 0) {
      // ๐Ÿš€ Fill up to concurrency limit
      while (pending.size < concurrency && !sourceExhausted) {
        const { value, done } = await sourceIterator.next();
        
        if (done) {
          sourceExhausted = true;
          break;
        }
        
        const currentIndex = inputIndex++;
        const promise = semaphore.acquire().then(async () => {
          try {
            const result = await processor(value);
            return { index: currentIndex, result };
          } finally {
            semaphore.release();
          }
        });
        
        pending.set(currentIndex, promise);
      }
      
      // ๐ŸŽ Yield results in order
      while (pending.has(outputIndex)) {
        const { result } = await pending.get(outputIndex)!;
        pending.delete(outputIndex);
        yield result;
        outputIndex++;
      }
      
      // โฑ๏ธ If no results ready, wait for at least one
      if (pending.size > 0 && !pending.has(outputIndex)) {
        await Promise.race(Array.from(pending.values()));
      }
    }
  }
}

// ๐Ÿ”’ Semaphore for concurrency control
class Semaphore {
  private available: number;
  private waiters: Array<() => void> = [];
  
  constructor(count: number) {
    this.available = count;
  }
  
  async acquire(): Promise<void> {
    if (this.available > 0) {
      this.available--;
      return;
    }
    
    return new Promise<void>(resolve => {
      this.waiters.push(resolve);
    });
  }
  
  release(): void {
    if (this.waiters.length > 0) {
      const waiter = this.waiters.shift()!;
      waiter();
    } else {
      this.available++;
    }
  }
}

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Data Pipeline System

Create a comprehensive data pipeline using async generators that can:

๐Ÿ“‹ Requirements:

  • โœ… Read data from multiple sources (CSV, JSON, API)
  • ๐Ÿท๏ธ Transform and validate data with customizable rules
  • ๐Ÿ‘ค Handle errors gracefully without stopping the pipeline
  • ๐Ÿ“… Provide real-time progress monitoring
  • ๐ŸŽจ Support backpressure and rate limiting

๐Ÿš€ Bonus Points:

  • Add data deduplication
  • Implement pipeline branching (one input, multiple outputs)
  • Create a visual progress dashboard

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
// ๐ŸŽฏ Comprehensive data pipeline system!

interface DataSource {
  name: string;
  type: 'csv' | 'json' | 'api';
  config: any;
}

interface PipelineStage<TInput, TOutput> {
  name: string;
  transform: (input: TInput) => Promise<TOutput>;
  validate?: (output: TOutput) => boolean;
  onError?: (error: Error, input: TInput) => TOutput | null;
}

interface PipelineProgress {
  stageName: string;
  processed: number;
  errors: number;
  throughput: number; // items per second
  timestamp: Date;
}

class DataPipeline<TInput, TOutput> {
  private stages: PipelineStage<any, any>[] = [];
  private progressCallbacks: Array<(progress: PipelineProgress) => void> = [];
  
  // โž• Add pipeline stage
  addStage<TStageOutput>(stage: PipelineStage<TInput, TStageOutput>): DataPipeline<TInput, TStageOutput> {
    this.stages.push(stage);
    return this as any;
  }
  
  // ๐Ÿ“Š Monitor progress
  onProgress(callback: (progress: PipelineProgress) => void): void {
    this.progressCallbacks.push(callback);
  }
  
  // ๐ŸŒŠ Execute pipeline
  async* execute(
    sources: AsyncGenerator<TInput, void, unknown>[],
    config: {
      concurrency?: number;
      errorThreshold?: number;
      rateLimitPerSecond?: number;
    } = {}
  ): AsyncGenerator<TOutput, void, unknown> {
    
    const { concurrency = 5, errorThreshold = 0.1, rateLimitPerSecond } = config;
    
    // ๐Ÿ”€ Merge all sources
    const mergedSource = this.mergeSources(sources);
    
    // ๐ŸŽฏ Apply rate limiting if specified
    const rateLimitedSource = rateLimitPerSecond 
      ? this.rateLimitedProcessor(mergedSource, rateLimitPerSecond)
      : mergedSource;
    
    // ๐Ÿ”„ Process through all stages
    let currentStream: AsyncGenerator<any, void, unknown> = rateLimitedSource;
    
    for (const [index, stage] of this.stages.entries()) {
      currentStream = this.processStage(
        currentStream,
        stage,
        concurrency,
        errorThreshold,
        `Stage ${index + 1}: ${stage.name}`
      );
    }
    
    // ๐ŸŽ Yield final results
    for await (const result of currentStream) {
      yield result;
    }
  }
  
  // ๐Ÿ”€ Merge multiple sources
  private async* mergeSources(
    sources: AsyncGenerator<TInput, void, unknown>[]
  ): AsyncGenerator<TInput, void, unknown> {
    
    const iterators = sources.map(source => ({
      iterator: source[Symbol.asyncIterator](),
      done: false,
      pending: null as Promise<IteratorResult<TInput, void>> | null
    }));
    
    // ๐Ÿš€ Start first iteration for all sources
    for (const iter of iterators) {
      iter.pending = iter.iterator.next();
    }
    
    while (iterators.some(iter => !iter.done)) {
      const pendingPromises = iterators
        .filter(iter => iter.pending && !iter.done)
        .map(async (iter) => {
          const result = await iter.pending!;
          return { iter, result };
        });
      
      if (pendingPromises.length === 0) break;
      
      const { iter, result } = await Promise.race(pendingPromises);
      
      if (result.done) {
        iter.done = true;
        iter.pending = null;
      } else {
        yield result.value;
        iter.pending = iter.iterator.next();
      }
    }
  }
  
  // โš™๏ธ Process single stage
  private async* processStage<TStageInput, TStageOutput>(
    source: AsyncGenerator<TStageInput, void, unknown>,
    stage: PipelineStage<TStageInput, TStageOutput>,
    concurrency: number,
    errorThreshold: number,
    stageName: string
  ): AsyncGenerator<TStageOutput, void, unknown> {
    
    let processed = 0;
    let errors = 0;
    const startTime = Date.now();
    let lastProgressUpdate = startTime;
    
    const processor = new OptimizedAsyncGenerator();
    
    const processItem = async (item: TStageInput): Promise<TStageOutput | null> => {
      try {
        const result = await stage.transform(item);
        
        // ๐Ÿ” Validate if validator provided
        if (stage.validate && !stage.validate(result)) {
          throw new Error('Validation failed');
        }
        
        return result;
        
      } catch (error) {
        errors++;
        
        // ๐Ÿ“Š Check error threshold
        if (processed > 0 && (errors / processed) > errorThreshold) {
          throw new Error(`Error threshold exceeded: ${errors}/${processed} = ${((errors/processed)*100).toFixed(1)}%`);
        }
        
        // ๐Ÿ›ก๏ธ Use error handler if available
        if (stage.onError) {
          return stage.onError(error instanceof Error ? error : new Error('Unknown error'), item);
        }
        
        return null; // Skip this item
      }
    };
    
    for await (const result of processor.parallelProcessor(source, processItem, concurrency)) {
      if (result !== null) {
        processed++;
        
        // ๐Ÿ“Š Update progress
        const now = Date.now();
        if (now - lastProgressUpdate >= 1000) { // Update every second
          const throughput = processed / ((now - startTime) / 1000);
          
          this.progressCallbacks.forEach(callback => {
            callback({
              stageName,
              processed,
              errors,
              throughput,
              timestamp: new Date()
            });
          });
          
          lastProgressUpdate = now;
        }
        
        yield result;
      }
    }
    
    // ๐Ÿ“Š Final progress update
    const finalThroughput = processed / ((Date.now() - startTime) / 1000);
    this.progressCallbacks.forEach(callback => {
      callback({
        stageName: `${stageName} (COMPLETE)`,
        processed,
        errors,
        throughput: finalThroughput,
        timestamp: new Date()
      });
    });
  }
  
  // โฑ๏ธ Rate limiting
  private async* rateLimitedProcessor<T>(
    source: AsyncGenerator<T, void, unknown>,
    itemsPerSecond: number
  ): AsyncGenerator<T, void, unknown> {
    const delayMs = 1000 / itemsPerSecond;
    let lastYieldTime = 0;
    
    for await (const item of source) {
      const now = Date.now();
      const timeSinceLastYield = now - lastYieldTime;
      
      if (timeSinceLastYield < delayMs) {
        await new Promise(resolve => setTimeout(resolve, delayMs - timeSinceLastYield));
      }
      
      lastYieldTime = Date.now();
      yield item;
    }
  }
}

// ๐ŸŽฎ Demo pipeline usage
async function demonstrateDataPipeline(): Promise<void> {
  // ๐Ÿญ Create data sources
  async function* createCsvSource(): AsyncGenerator<{ name: string; age: string; city: string }, void, unknown> {
    const data = [
      { name: 'Alice', age: '30', city: 'New York' },
      { name: 'Bob', age: '25', city: 'San Francisco' },
      { name: 'Charlie', age: 'invalid', city: 'Chicago' }, // Invalid age
      { name: 'Diana', age: '35', city: 'Boston' },
      { name: 'Eve', age: '28', city: 'Seattle' }
    ];
    
    for (const item of data) {
      yield item;
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
  
  async function* createApiSource(): AsyncGenerator<{ name: string; age: string; city: string }, void, unknown> {
    const data = [
      { name: 'Frank', age: '32', city: 'Miami' },
      { name: 'Grace', age: '29', city: 'Denver' },
      { name: 'Henry', age: '40', city: 'Portland' }
    ];
    
    for (const item of data) {
      yield item;
      await new Promise(resolve => setTimeout(resolve, 150));
    }
  }
  
  // ๐Ÿ—๏ธ Build pipeline
  const pipeline = new DataPipeline<{ name: string; age: string; city: string }, { name: string; age: number; city: string; category: string }>()
    .addStage({
      name: 'Parse Age',
      transform: async (item) => {
        const age = parseInt(item.age, 10);
        if (isNaN(age)) {
          throw new Error(`Invalid age: ${item.age}`);
        }
        return { ...item, age };
      },
      onError: (error, item) => {
        console.log(`โš ๏ธ Skipping invalid age for ${item.name}: ${item.age}`);
        return null; // Skip this item
      }
    })
    .addStage({
      name: 'Categorize',
      transform: async (item) => {
        await new Promise(resolve => setTimeout(resolve, 50)); // Simulate processing
        
        let category = 'unknown';
        if (item.age < 30) category = 'young';
        else if (item.age < 40) category = 'middle';
        else category = 'senior';
        
        return { ...item, category };
      },
      validate: (result) => result.category !== 'unknown'
    });
  
  // ๐Ÿ“Š Monitor progress
  pipeline.onProgress(progress => {
    console.log(`๐Ÿ“Š ${progress.stageName}: ${progress.processed} processed, ${progress.errors} errors, ${progress.throughput.toFixed(1)} items/sec`);
  });
  
  // ๐ŸŒŠ Execute pipeline
  console.log('๐Ÿš€ Starting data pipeline...');
  const sources = [createCsvSource(), createApiSource()];
  
  const results: any[] = [];
  for await (const result of pipeline.execute(sources, { 
    concurrency: 3, 
    errorThreshold: 0.2, 
    rateLimitPerSecond: 5 
  })) {
    results.push(result);
    console.log(`โœ… Pipeline output:`, result);
  }
  
  console.log(`๐ŸŽ‰ Pipeline complete! Processed ${results.length} items total`);
}

๐ŸŽ“ Key Takeaways

Youโ€™ve mastered async generators! Hereโ€™s what you can now do:

  • โœ… Create efficient streaming data processors with memory control ๐Ÿ’ช
  • โœ… Handle large datasets without memory issues like a pro ๐Ÿ›ก๏ธ
  • โœ… Build resilient async iteration patterns with confidence ๐ŸŽฏ
  • โœ… Implement backpressure and flow control effectively ๐Ÿ›
  • โœ… Combine async generators with other patterns for powerful applications! ๐Ÿš€

Remember: Async generators are perfect for streaming, lazy evaluation, and processing data that doesnโ€™t all fit in memory at once! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered async generators and streaming patterns!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the data pipeline exercise above
  2. ๐Ÿ—๏ธ Build a streaming application using async generators
  3. ๐Ÿ“š Explore Node.js streams and how they complement async generators
  4. ๐ŸŒŸ Share your async generator knowledge with others!

Remember: The best way to master async generators is to build something that processes real data streams. Start small and iterate! ๐Ÿš€


Happy streaming! ๐ŸŽ‰๐ŸŒŠโœจ