Prerequisites
- Understanding of async functions and Promises ๐
- Basic generator function knowledge โก
- Experience with iterators and iteration protocols ๐ป
What you'll learn
- Master async generator functions and yield syntax ๐ฏ
- Build streaming data processing systems ๐๏ธ
- Handle backpressure and memory-efficient operations ๐
- Create powerful async iteration patterns with TypeScript โจ
๐ฏ Introduction
Welcome to the powerful world of async generators! ๐ If regular generators are like lazy evaluation for synchronous data, then async generators are like lazy evaluation for asynchronous data streams. Theyโre the perfect fusion of async/await and generator functions!
Think of async generators as data faucets ๐ฐ - they can produce data on-demand, handle massive datasets without consuming all your memory, and process streams of information that might come from APIs, databases, or file systems.
By the end of this tutorial, youโll be a master of async iteration, able to build memory-efficient streaming applications that handle real-time data with grace and performance. Letโs dive into the flow! ๐โโ๏ธ
๐ Understanding Async Generators
๐ค What Are Async Generators?
Async generators combine the lazy evaluation power of generators with the asynchronous capabilities of async/await. Theyโre functions that can yield multiple values over time, where each yield operation can involve asynchronous work!
// ๐ Basic async generator example
async function* countWithDelay(): AsyncGenerator<number, void, unknown> {
let count = 1;
while (count <= 5) {
console.log(`๐ Generating number ${count}`);
await new Promise(resolve => setTimeout(resolve, 1000)); // Async delay
yield count; // Yield the current count
count++;
}
console.log('โ
Counting complete!');
}
// ๐ Using the async generator
async function demonstrateBasicUsage(): Promise<void> {
console.log('๐ Starting async generator demo...');
for await (const number of countWithDelay()) {
console.log(`๐จ Received: ${number}`);
}
console.log('๐ Demo complete!');
}
๐ก Key Characteristics
- ๐ Lazy Evaluation: Values are generated on-demand, not all at once
- โก Async Yields: Each yield can involve asynchronous operations
- ๐ฏ Memory Efficient: Only one value in memory at a time
- ๐ก๏ธ Backpressure Handling: Consumer controls the pace of data generation
// ๐จ TypeScript's type inference with async generators
async function* generateMixedData(): AsyncGenerator<string | number, void, unknown> {
yield "starting..."; // string
await new Promise(resolve => setTimeout(resolve, 100));
yield 42; // number
await new Promise(resolve => setTimeout(resolve, 100));
yield "ending..."; // string
}
// Type: AsyncGenerator<string | number, void, unknown>
๐ Async Generators vs Other Patterns
// ๐โโ๏ธ Regular array - all data loaded at once
async function getAllDataAtOnce(): Promise<number[]> {
const data: number[] = [];
for (let i = 1; i <= 1000000; i++) {
// ๐พ All 1 million items in memory!
data.push(await fetchDataFromAPI(i));
}
return data; // ๐ Slow, memory intensive
}
// ๐ฎ Promise-based streaming - complex callback handling
function streamWithCallbacks(
onData: (item: number) => void,
onComplete: () => void
): void {
let i = 1;
const processNext = async () => {
if (i <= 1000000) {
const item = await fetchDataFromAPI(i);
onData(item);
i++;
setImmediate(processNext); // Next iteration
} else {
onComplete();
}
};
processNext();
}
// ๐ Async generator - clean and memory efficient
async function* streamData(): AsyncGenerator<number, void, unknown> {
for (let i = 1; i <= 1000000; i++) {
// ๐ก Only one item in memory at a time
yield await fetchDataFromAPI(i);
}
}
async function fetchDataFromAPI(id: number): Promise<number> {
await new Promise(resolve => setTimeout(resolve, 10));
return id * 2;
}
๐ง Core Syntax and Patterns
๐ Basic Async Generator Syntax
// ๐๏ธ Data processing pipeline
interface DataItem {
id: string;
value: number;
timestamp: Date;
processed: boolean;
}
class DataProcessor {
// ๐ Generate data items with async processing
async* processDataStream(source: string[]): AsyncGenerator<DataItem, void, unknown> {
console.log(`๐ Starting to process ${source.length} items...`);
for (const [index, item] of source.entries()) {
console.log(`โ๏ธ Processing item ${index + 1}/${source.length}: ${item}`);
// ๐ Simulate async processing (API call, database query, etc.)
await this.delay(100);
// ๐๏ธ Create processed data item
const dataItem: DataItem = {
id: `item_${index + 1}`,
value: item.length * Math.random(),
timestamp: new Date(),
processed: true
};
// ๐ Yield the processed item
yield dataItem;
// ๐ Progress logging
if ((index + 1) % 10 === 0) {
console.log(`๐ Progress: ${index + 1}/${source.length} items processed`);
}
}
console.log('โ
All items processed!');
}
// ๐ Process with conditional yielding
async* filterAndProcess(
source: number[],
condition: (value: number) => boolean
): AsyncGenerator<{ original: number; processed: number }, void, unknown> {
let processedCount = 0;
for (const value of source) {
// ๐ Check condition first
if (condition(value)) {
// ๐ Async processing only for items that pass the filter
await this.delay(50);
const processed = await this.complexProcessing(value);
processedCount++;
yield {
original: value,
processed
};
console.log(`โ
Processed item ${processedCount}: ${value} -> ${processed}`);
} else {
console.log(`โญ๏ธ Skipped: ${value} (didn't meet condition)`);
}
}
console.log(`๐ฏ Total processed: ${processedCount} items`);
}
// ๐ฎ Helper methods
private async delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
private async complexProcessing(value: number): Promise<number> {
// Simulate complex async calculation
await this.delay(Math.random() * 100);
return value * value + Math.floor(Math.random() * 100);
}
}
// ๐ฏ Usage examples
async function demonstrateDataProcessing(): Promise<void> {
const processor = new DataProcessor();
// ๐ Source data
const sourceItems = ['apple', 'banana', 'cherry', 'date', 'elderberry'];
console.log('๐ Streaming data processing:');
for await (const item of processor.processDataStream(sourceItems)) {
console.log(`๐จ Received processed item:`, {
id: item.id,
value: item.value.toFixed(2),
processed: item.processed
});
}
console.log('\n๐ Filtered processing:');
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
for await (const result of processor.filterAndProcess(numbers, n => n % 2 === 0)) {
console.log(`๐จ Even number processed: ${result.original} -> ${result.processed}`);
}
}
๐จ Advanced Yielding Patterns
// ๐๏ธ Multi-source data aggregation
class DataAggregator {
// ๐ Merge multiple async data sources
async* mergeMultipleSources(
...sources: AsyncGenerator<any, void, unknown>[]
): AsyncGenerator<{ source: number; data: any }, void, unknown> {
// ๐ฏ Create iterators for all sources
const iterators = sources.map((source, index) => ({
index,
iterator: source[Symbol.asyncIterator](),
done: false,
pending: null as Promise<IteratorResult<any, void>> | null
}));
// ๐ Start first iteration for all sources
for (const iter of iterators) {
if (!iter.done) {
iter.pending = iter.iterator.next();
}
}
while (iterators.some(iter => !iter.done)) {
// ๐ Race all pending operations
const pendingPromises = iterators
.filter(iter => iter.pending)
.map(async (iter) => {
const result = await iter.pending!;
return { iter, result };
});
if (pendingPromises.length === 0) break;
// โก Get the first result
const { iter, result } = await Promise.race(pendingPromises);
if (result.done) {
iter.done = true;
iter.pending = null;
} else {
// ๐ Yield the result with source information
yield {
source: iter.index,
data: result.value
};
// ๐ Start next iteration for this source
iter.pending = iter.iterator.next();
}
}
console.log('โ
All sources exhausted!');
}
// ๐ Batch yielding for efficiency
async* batchProcessor<T>(
source: AsyncGenerator<T, void, unknown>,
batchSize: number
): AsyncGenerator<T[], void, unknown> {
let batch: T[] = [];
for await (const item of source) {
batch.push(item);
// ๐ฆ Yield when batch is full
if (batch.length >= batchSize) {
console.log(`๐ฆ Yielding batch of ${batch.length} items`);
yield [...batch]; // Create a copy
batch = []; // Reset batch
}
}
// ๐ Yield remaining items if any
if (batch.length > 0) {
console.log(`๐ฆ Yielding final batch of ${batch.length} items`);
yield batch;
}
}
// ๐ฏ Rate-limited processing
async* rateLimitedProcessor<T>(
source: AsyncGenerator<T, void, unknown>,
itemsPerSecond: number
): AsyncGenerator<T, void, unknown> {
const delayMs = 1000 / itemsPerSecond;
let lastYieldTime = 0;
for await (const item of source) {
const now = Date.now();
const timeSinceLastYield = now - lastYieldTime;
// โฑ๏ธ Enforce rate limit
if (timeSinceLastYield < delayMs) {
const remainingDelay = delayMs - timeSinceLastYield;
console.log(`โฐ Rate limiting: waiting ${remainingDelay.toFixed(0)}ms`);
await new Promise(resolve => setTimeout(resolve, remainingDelay));
}
lastYieldTime = Date.now();
yield item;
}
}
}
๐ก Real-World Applications
๐ Example 1: E-commerce Order Processing
// ๐ช E-commerce order processing system
interface Order {
id: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
status: 'pending' | 'processing' | 'fulfilled' | 'failed';
createdAt: Date;
}
interface ProcessedOrder extends Order {
inventoryChecked: boolean;
paymentProcessed: boolean;
shippingCalculated: boolean;
total: number;
}
class OrderProcessingSystem {
// ๐ Process orders as they come in (streaming)
async* processOrderStream(
orderSource: AsyncGenerator<Order, void, unknown>
): AsyncGenerator<ProcessedOrder, void, unknown> {
console.log('๐ช Starting order processing stream...');
let processedCount = 0;
for await (const order of orderSource) {
try {
console.log(`๐ Processing order ${order.id} for customer ${order.customerId}`);
// ๐ Step 1: Check inventory
console.log(`๐ฆ Checking inventory for order ${order.id}...`);
await this.checkInventory(order);
// ๐ณ Step 2: Process payment
console.log(`๐ณ Processing payment for order ${order.id}...`);
await this.processPayment(order);
// ๐ Step 3: Calculate shipping
console.log(`๐ Calculating shipping for order ${order.id}...`);
const shippingCost = await this.calculateShipping(order);
// ๐ฐ Calculate total
const itemTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const total = itemTotal + shippingCost;
// โ
Create processed order
const processedOrder: ProcessedOrder = {
...order,
status: 'fulfilled',
inventoryChecked: true,
paymentProcessed: true,
shippingCalculated: true,
total
};
processedCount++;
console.log(`โ
Order ${order.id} processed successfully! Total: $${total.toFixed(2)}`);
// ๐ Yield the processed order
yield processedOrder;
} catch (error) {
console.error(`โ Failed to process order ${order.id}:`, error.message);
// ๐จ Yield failed order
yield {
...order,
status: 'failed',
inventoryChecked: false,
paymentProcessed: false,
shippingCalculated: false,
total: 0
};
}
}
console.log(`๐ฏ Processing complete! Processed ${processedCount} orders`);
}
// ๐ Generate analytics from processed orders
async* analyzeOrders(
processedOrders: AsyncGenerator<ProcessedOrder, void, unknown>
): AsyncGenerator<{ metric: string; value: number; timestamp: Date }, void, unknown> {
let totalRevenue = 0;
let orderCount = 0;
let successfulOrders = 0;
let failedOrders = 0;
for await (const order of processedOrders) {
orderCount++;
if (order.status === 'fulfilled') {
successfulOrders++;
totalRevenue += order.total;
} else {
failedOrders++;
}
// ๐ Yield metrics every 10 orders
if (orderCount % 10 === 0) {
yield {
metric: 'total_revenue',
value: totalRevenue,
timestamp: new Date()
};
yield {
metric: 'success_rate',
value: (successfulOrders / orderCount) * 100,
timestamp: new Date()
};
yield {
metric: 'average_order_value',
value: successfulOrders > 0 ? totalRevenue / successfulOrders : 0,
timestamp: new Date()
};
}
}
// ๐ Final metrics
console.log(`๐ Final Analytics:
๐ Total Orders: ${orderCount}
โ
Successful: ${successfulOrders}
โ Failed: ${failedOrders}
๐ฐ Total Revenue: $${totalRevenue.toFixed(2)}
๐ Success Rate: ${((successfulOrders / orderCount) * 100).toFixed(1)}%
`);
}
// ๐๏ธ Helper methods
private async checkInventory(order: Order): Promise<void> {
await this.delay(100 + Math.random() * 200); // Simulate inventory check
if (Math.random() < 0.05) { // 5% chance of inventory failure
throw new Error('Insufficient inventory');
}
}
private async processPayment(order: Order): Promise<void> {
await this.delay(200 + Math.random() * 300); // Simulate payment processing
if (Math.random() < 0.03) { // 3% chance of payment failure
throw new Error('Payment processing failed');
}
}
private async calculateShipping(order: Order): Promise<number> {
await this.delay(50 + Math.random() * 100); // Simulate shipping calculation
return 5.99 + (Math.random() * 10); // Base shipping + variable cost
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// ๐ฎ Demo order processing
async function demonstrateOrderProcessing(): Promise<void> {
const orderSystem = new OrderProcessingSystem();
// ๐ญ Create a mock order generator
async function* generateOrders(): AsyncGenerator<Order, void, unknown> {
for (let i = 1; i <= 25; i++) {
yield {
id: `ORDER_${i.toString().padStart(3, '0')}`,
customerId: `CUST_${Math.floor(Math.random() * 100) + 1}`,
items: [
{
productId: `PROD_${Math.floor(Math.random() * 50) + 1}`,
quantity: Math.floor(Math.random() * 3) + 1,
price: Math.random() * 100 + 10
}
],
status: 'pending',
createdAt: new Date()
};
// ๐
Simulate orders coming in over time
await new Promise(resolve => setTimeout(resolve, 100));
}
}
// ๐ Process the stream
const processedStream = orderSystem.processOrderStream(generateOrders());
const analyticsStream = orderSystem.analyzeOrders(processedStream);
// ๐ Consume analytics
for await (const metric of analyticsStream) {
console.log(`๐ Metric: ${metric.metric} = ${metric.value.toFixed(2)} at ${metric.timestamp.toLocaleTimeString()}`);
}
}
๐ฎ Example 2: Real-Time Log Processing
// ๐ Real-time log processing system
interface LogEntry {
timestamp: Date;
level: 'info' | 'warn' | 'error' | 'debug';
message: string;
source: string;
metadata?: Record<string, any>;
}
interface ProcessedLog extends LogEntry {
processed: true;
severity: number;
category: string;
alertTriggered?: boolean;
}
class LogProcessor {
// ๐ Process log streams with filtering and alerting
async* processLogStream(
logSource: AsyncGenerator<LogEntry, void, unknown>,
config: {
minLevel: LogEntry['level'];
alertPatterns: RegExp[];
categoryRules: Array<{ pattern: RegExp; category: string }>;
}
): AsyncGenerator<ProcessedLog, void, unknown> {
const levelSeverity = { debug: 1, info: 2, warn: 3, error: 4 };
const minSeverity = levelSeverity[config.minLevel];
console.log(`๐ Starting log processing with min level: ${config.minLevel}`);
for await (const log of logSource) {
const severity = levelSeverity[log.level];
// ๐ Filter by minimum level
if (severity < minSeverity) {
continue; // Skip low-severity logs
}
// ๐ท๏ธ Categorize the log
let category = 'general';
for (const rule of config.categoryRules) {
if (rule.pattern.test(log.message)) {
category = rule.category;
break;
}
}
// ๐จ Check for alert patterns
let alertTriggered = false;
for (const pattern of config.alertPatterns) {
if (pattern.test(log.message)) {
alertTriggered = true;
console.log(`๐จ ALERT TRIGGERED: ${log.message}`);
break;
}
}
// ๐๏ธ Create processed log
const processedLog: ProcessedLog = {
...log,
processed: true,
severity,
category,
alertTriggered
};
// ๐ Yield processed log
yield processedLog;
}
}
// ๐ Aggregate log statistics
async* aggregateLogStats(
processedLogs: AsyncGenerator<ProcessedLog, void, unknown>,
windowSizeMs: number = 60000 // 1 minute windows
): AsyncGenerator<{
windowStart: Date;
windowEnd: Date;
stats: {
total: number;
byLevel: Record<string, number>;
byCategory: Record<string, number>;
alertsTriggered: number;
errorRate: number;
};
}, void, unknown> {
let windowStart = new Date();
let windowEnd = new Date(windowStart.getTime() + windowSizeMs);
let currentWindow = {
total: 0,
byLevel: {} as Record<string, number>,
byCategory: {} as Record<string, number>,
alertsTriggered: 0,
errorRate: 0
};
for await (const log of processedLogs) {
// ๐
Check if we need to start a new window
if (log.timestamp >= windowEnd) {
// ๐ Calculate final stats for current window
currentWindow.errorRate = currentWindow.total > 0
? ((currentWindow.byLevel.error || 0) / currentWindow.total) * 100
: 0;
// ๐ Yield current window stats
yield {
windowStart,
windowEnd,
stats: { ...currentWindow }
};
// ๐ Start new window
windowStart = new Date(windowEnd);
windowEnd = new Date(windowStart.getTime() + windowSizeMs);
currentWindow = {
total: 0,
byLevel: {},
byCategory: {},
alertsTriggered: 0,
errorRate: 0
};
}
// ๐ Update current window stats
currentWindow.total++;
currentWindow.byLevel[log.level] = (currentWindow.byLevel[log.level] || 0) + 1;
currentWindow.byCategory[log.category] = (currentWindow.byCategory[log.category] || 0) + 1;
if (log.alertTriggered) {
currentWindow.alertsTriggered++;
}
}
// ๐ Yield final window if it has data
if (currentWindow.total > 0) {
currentWindow.errorRate = (currentWindow.byLevel.error || 0) / currentWindow.total * 100;
yield {
windowStart,
windowEnd,
stats: currentWindow
};
}
}
}
// ๐ฎ Demo log processing
async function demonstrateLogProcessing(): Promise<void> {
const logProcessor = new LogProcessor();
// ๐ญ Generate sample logs
async function* generateLogs(): AsyncGenerator<LogEntry, void, unknown> {
const sources = ['web-server', 'database', 'auth-service', 'payment-gateway'];
const messages = [
'User login successful',
'Database connection established',
'Payment processed successfully',
'Error: Connection timeout',
'Warning: High memory usage detected',
'Critical: Database connection failed',
'Info: Cache cleared',
'Debug: Request processed in 150ms'
];
for (let i = 0; i < 100; i++) {
const level = ['info', 'warn', 'error', 'debug'][Math.floor(Math.random() * 4)] as LogEntry['level'];
const source = sources[Math.floor(Math.random() * sources.length)];
const message = messages[Math.floor(Math.random() * messages.length)];
yield {
timestamp: new Date(),
level,
message,
source,
metadata: { requestId: `req_${i}`, userId: `user_${Math.floor(Math.random() * 100)}` }
};
// ๐
Simulate logs coming in over time
await new Promise(resolve => setTimeout(resolve, 50));
}
}
// โ๏ธ Configure processing
const config = {
minLevel: 'info' as const,
alertPatterns: [/Error:|Critical:/],
categoryRules: [
{ pattern: /login|auth/i, category: 'authentication' },
{ pattern: /payment|billing/i, category: 'financial' },
{ pattern: /database|connection/i, category: 'infrastructure' },
{ pattern: /memory|cpu/i, category: 'performance' }
]
};
// ๐ Process the log stream
const processedStream = logProcessor.processLogStream(generateLogs(), config);
const statsStream = logProcessor.aggregateLogStats(processedStream, 5000); // 5-second windows for demo
// ๐ Consume statistics
for await (const window of statsStream) {
console.log(`๐ Window ${window.windowStart.toLocaleTimeString()} - ${window.windowEnd.toLocaleTimeString()}:`);
console.log(` ๐ Total logs: ${window.stats.total}`);
console.log(` ๐ By level:`, window.stats.byLevel);
console.log(` ๐ท๏ธ By category:`, window.stats.byCategory);
console.log(` ๐จ Alerts: ${window.stats.alertsTriggered}`);
console.log(` โ Error rate: ${window.stats.errorRate.toFixed(1)}%\n`);
}
}
๐ Advanced Patterns
๐งโโ๏ธ Error Handling in Async Generators
// ๐ก๏ธ Robust error handling patterns
class ResilientDataProcessor {
// ๐ Retry logic with backoff
async* processWithRetry<T, R>(
source: AsyncGenerator<T, void, unknown>,
processor: (item: T) => Promise<R>,
maxRetries: number = 3
): AsyncGenerator<{ success: true; data: R } | { success: false; error: Error; item: T }, void, unknown> {
for await (const item of source) {
let attempts = 0;
let lastError: Error | null = null;
while (attempts <= maxRetries) {
try {
console.log(`๐ Processing item (attempt ${attempts + 1}/${maxRetries + 1})`);
const result = await processor(item);
// โ
Success!
yield { success: true, data: result };
break; // Move to next item
} catch (error) {
attempts++;
lastError = error instanceof Error ? error : new Error('Unknown error');
if (attempts <= maxRetries) {
// โฑ๏ธ Exponential backoff
const backoffMs = Math.pow(2, attempts) * 1000;
console.log(`โ ๏ธ Attempt ${attempts} failed, retrying in ${backoffMs}ms...`);
await new Promise(resolve => setTimeout(resolve, backoffMs));
}
}
}
// โ All retries exhausted
if (lastError) {
console.log(`โ Item processing failed after ${maxRetries + 1} attempts`);
yield { success: false, error: lastError, item };
}
}
}
// ๐ก๏ธ Circuit breaker pattern
async* processWithCircuitBreaker<T, R>(
source: AsyncGenerator<T, void, unknown>,
processor: (item: T) => Promise<R>,
config: {
failureThreshold: number;
resetTimeoutMs: number;
monitoringWindowMs: number;
}
): AsyncGenerator<{ success: true; data: R } | { success: false; error: Error; circuitOpen: boolean }, void, unknown> {
let failures = 0;
let lastFailureTime = 0;
let circuitOpen = false;
let windowStart = Date.now();
let requestsInWindow = 0;
for await (const item of source) {
const now = Date.now();
// ๐ Reset monitoring window
if (now - windowStart > config.monitoringWindowMs) {
windowStart = now;
requestsInWindow = 0;
failures = 0;
}
requestsInWindow++;
// ๐ช Check if circuit should be closed (reset)
if (circuitOpen && (now - lastFailureTime) > config.resetTimeoutMs) {
console.log('๐ Circuit breaker: Attempting to close circuit');
circuitOpen = false;
failures = 0;
}
// โก Circuit breaker is open - fail fast
if (circuitOpen) {
yield {
success: false,
error: new Error('Circuit breaker is open'),
circuitOpen: true
};
continue;
}
try {
const result = await processor(item);
// โ
Success - reset failure count
if (failures > 0) {
console.log('โ
Success after failures - resetting circuit breaker');
failures = 0;
}
yield { success: true, data: result };
} catch (error) {
failures++;
lastFailureTime = now;
// ๐จ Check if we should open the circuit
if (failures >= config.failureThreshold) {
console.log(`๐จ Circuit breaker: Opening circuit (${failures} failures in window)`);
circuitOpen = true;
}
yield {
success: false,
error: error instanceof Error ? error : new Error('Unknown error'),
circuitOpen
};
}
}
}
}
๐ฏ Memory-Efficient Large Dataset Processing
// ๐พ Memory-conscious data processing
class LargeDataProcessor {
// ๐๏ธ Process CSV files without loading entire file into memory
async* processCsvStream(
filePath: string,
chunkSize: number = 1024 * 1024 // 1MB chunks
): AsyncGenerator<{ [key: string]: string }, void, unknown> {
console.log(`๐ Processing CSV file: ${filePath}`);
// ๐ฏ In a real implementation, you'd use Node.js streams
// This is a simplified example
const fileContent = await this.readFileInChunks(filePath, chunkSize);
let buffer = '';
let headers: string[] = [];
let isFirstRow = true;
for await (const chunk of fileContent) {
buffer += chunk;
// ๐ Process complete lines
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim() === '') continue;
const values = this.parseCsvLine(line);
if (isFirstRow) {
headers = values;
isFirstRow = false;
console.log(`๐ CSV Headers: ${headers.join(', ')}`);
continue;
}
// ๐๏ธ Create row object
const row: { [key: string]: string } = {};
for (let i = 0; i < Math.min(headers.length, values.length); i++) {
row[headers[i]] = values[i];
}
yield row;
}
}
// ๐ Process any remaining data in buffer
if (buffer.trim() && headers.length > 0) {
const values = this.parseCsvLine(buffer);
const row: { [key: string]: string } = {};
for (let i = 0; i < Math.min(headers.length, values.length); i++) {
row[headers[i]] = values[i];
}
yield row;
}
console.log('โ
CSV processing complete');
}
// ๐ Paginated API data fetching
async* fetchPaginatedData<T>(
baseUrl: string,
pageSize: number = 100,
maxPages?: number
): AsyncGenerator<T, void, unknown> {
let page = 1;
let hasMore = true;
let totalFetched = 0;
while (hasMore && (!maxPages || page <= maxPages)) {
console.log(`๐ Fetching page ${page} (${pageSize} items per page)`);
try {
// ๐ Fetch current page
const response = await fetch(`${baseUrl}?page=${page}&limit=${pageSize}`);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
const items = data.items || [];
// ๐ Yield each item individually
for (const item of items) {
yield item;
totalFetched++;
}
// ๐ Check if there are more pages
hasMore = data.hasMore || items.length === pageSize;
page++;
console.log(`๐ Progress: ${totalFetched} items fetched so far`);
// ๐ด Rate limiting - be nice to the API
await new Promise(resolve => setTimeout(resolve, 100));
} catch (error) {
console.error(`โ Error fetching page ${page}:`, error.message);
break;
}
}
console.log(`โ
Pagination complete. Total items: ${totalFetched}`);
}
// ๐๏ธ Helper methods
private async* readFileInChunks(filePath: string, chunkSize: number): AsyncGenerator<string, void, unknown> {
// ๐ฏ Simulate file reading in chunks
const mockFileContent = `name,age,city,occupation
John Doe,30,New York,Engineer
Jane Smith,25,San Francisco,Designer
Bob Johnson,35,Chicago,Manager
Alice Brown,28,Boston,Developer
Charlie Wilson,32,Seattle,Analyst`.repeat(1000); // Large mock file
for (let i = 0; i < mockFileContent.length; i += chunkSize) {
const chunk = mockFileContent.slice(i, i + chunkSize);
yield chunk;
// ๐
Simulate I/O delay
await new Promise(resolve => setTimeout(resolve, 10));
}
}
private parseCsvLine(line: string): string[] {
// ๐ฏ Simple CSV parser (doesn't handle quotes/escaping)
return line.split(',').map(value => value.trim());
}
}
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Leaks with Infinite Generators
// โ Dangerous - infinite generator without proper cleanup
async function* dangerousInfiniteGenerator(): AsyncGenerator<number, never, unknown> {
let count = 0;
const cache = new Map(); // ๐ฅ This keeps growing!
while (true) {
cache.set(count, `value_${count}`); // Memory leak!
yield count++;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
// โ
Safe - infinite generator with proper memory management
async function* safeInfiniteGenerator(maxCacheSize: number = 1000): AsyncGenerator<number, never, unknown> {
let count = 0;
const cache = new Map();
while (true) {
// ๐งน Clean up cache when it gets too large
if (cache.size >= maxCacheSize) {
const keysToDelete = Array.from(cache.keys()).slice(0, Math.floor(maxCacheSize / 2));
keysToDelete.forEach(key => cache.delete(key));
console.log(`๐งน Cleaned up cache, removed ${keysToDelete.length} entries`);
}
cache.set(count, `value_${count}`);
yield count++;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
// ๐ฏ Safe consumption with early termination
async function consumeInfiniteGenerator(): Promise<void> {
const generator = safeInfiniteGenerator();
let consumed = 0;
for await (const value of generator) {
console.log(`๐จ Received: ${value}`);
consumed++;
// โ
Always have an exit condition for infinite generators
if (consumed >= 50) {
console.log('๐ Terminating infinite generator');
await generator.return(undefined); // Clean shutdown
break;
}
}
}
๐คฏ Pitfall 2: Unhandled Async Generator Errors
// โ Dangerous - errors can crash the entire stream
async function* dangerousGenerator(): AsyncGenerator<string, void, unknown> {
for (let i = 1; i <= 10; i++) {
if (i === 5) {
throw new Error('Something went wrong!'); // ๐ฅ Crashes everything
}
yield `Item ${i}`;
}
}
// โ
Safe - proper error handling within generator
async function* resilientGenerator(): AsyncGenerator<{ success: true; data: string } | { success: false; error: string }, void, unknown> {
for (let i = 1; i <= 10; i++) {
try {
// ๐ฏ Simulate work that might fail
await new Promise(resolve => setTimeout(resolve, 100));
if (i === 5 && Math.random() < 0.7) {
throw new Error(`Processing failed for item ${i}`);
}
yield { success: true, data: `Item ${i}` };
} catch (error) {
console.error(`โ ๏ธ Error processing item ${i}:`, error.message);
yield {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}
}
// ๐ก๏ธ Safe consumption with error handling
async function safeConsumption(): Promise<void> {
try {
for await (const result of resilientGenerator()) {
if (result.success) {
console.log(`โ
${result.data}`);
} else {
console.log(`โ Error: ${result.error}`);
}
}
} catch (error) {
console.error('๐ฅ Generator threw an unhandled error:', error);
}
}
๐ ๏ธ Best Practices
๐ฏ Performance Optimization
// ๐ Performance monitoring and optimization
class OptimizedAsyncGenerator {
// ๐ Buffered processing for better throughput
async* bufferedProcessor<T, R>(
source: AsyncGenerator<T, void, unknown>,
processor: (items: T[]) => Promise<R[]>,
bufferSize: number = 10
): AsyncGenerator<R, void, unknown> {
let buffer: T[] = [];
for await (const item of source) {
buffer.push(item);
// ๐ฆ Process when buffer is full
if (buffer.length >= bufferSize) {
const results = await processor(buffer);
for (const result of results) {
yield result;
}
buffer = [];
}
}
// ๐ Process remaining items
if (buffer.length > 0) {
const results = await processor(buffer);
for (const result of results) {
yield result;
}
}
}
// โก Parallel processing with controlled concurrency
async* parallelProcessor<T, R>(
source: AsyncGenerator<T, void, unknown>,
processor: (item: T) => Promise<R>,
concurrency: number = 3
): AsyncGenerator<R, void, unknown> {
const semaphore = new Semaphore(concurrency);
const pending = new Map<number, Promise<{ index: number; result: R }>>();
let inputIndex = 0;
let outputIndex = 0;
let sourceExhausted = false;
// ๐ Start initial batch
const sourceIterator = source[Symbol.asyncIterator]();
while (!sourceExhausted || pending.size > 0) {
// ๐ Fill up to concurrency limit
while (pending.size < concurrency && !sourceExhausted) {
const { value, done } = await sourceIterator.next();
if (done) {
sourceExhausted = true;
break;
}
const currentIndex = inputIndex++;
const promise = semaphore.acquire().then(async () => {
try {
const result = await processor(value);
return { index: currentIndex, result };
} finally {
semaphore.release();
}
});
pending.set(currentIndex, promise);
}
// ๐ Yield results in order
while (pending.has(outputIndex)) {
const { result } = await pending.get(outputIndex)!;
pending.delete(outputIndex);
yield result;
outputIndex++;
}
// โฑ๏ธ If no results ready, wait for at least one
if (pending.size > 0 && !pending.has(outputIndex)) {
await Promise.race(Array.from(pending.values()));
}
}
}
}
// ๐ Semaphore for concurrency control
class Semaphore {
private available: number;
private waiters: Array<() => void> = [];
constructor(count: number) {
this.available = count;
}
async acquire(): Promise<void> {
if (this.available > 0) {
this.available--;
return;
}
return new Promise<void>(resolve => {
this.waiters.push(resolve);
});
}
release(): void {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift()!;
waiter();
} else {
this.available++;
}
}
}
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Data Pipeline System
Create a comprehensive data pipeline using async generators that can:
๐ Requirements:
- โ Read data from multiple sources (CSV, JSON, API)
- ๐ท๏ธ Transform and validate data with customizable rules
- ๐ค Handle errors gracefully without stopping the pipeline
- ๐ Provide real-time progress monitoring
- ๐จ Support backpressure and rate limiting
๐ Bonus Points:
- Add data deduplication
- Implement pipeline branching (one input, multiple outputs)
- Create a visual progress dashboard
๐ก Solution
๐ Click to see solution
// ๐ฏ Comprehensive data pipeline system!
interface DataSource {
name: string;
type: 'csv' | 'json' | 'api';
config: any;
}
interface PipelineStage<TInput, TOutput> {
name: string;
transform: (input: TInput) => Promise<TOutput>;
validate?: (output: TOutput) => boolean;
onError?: (error: Error, input: TInput) => TOutput | null;
}
interface PipelineProgress {
stageName: string;
processed: number;
errors: number;
throughput: number; // items per second
timestamp: Date;
}
class DataPipeline<TInput, TOutput> {
private stages: PipelineStage<any, any>[] = [];
private progressCallbacks: Array<(progress: PipelineProgress) => void> = [];
// โ Add pipeline stage
addStage<TStageOutput>(stage: PipelineStage<TInput, TStageOutput>): DataPipeline<TInput, TStageOutput> {
this.stages.push(stage);
return this as any;
}
// ๐ Monitor progress
onProgress(callback: (progress: PipelineProgress) => void): void {
this.progressCallbacks.push(callback);
}
// ๐ Execute pipeline
async* execute(
sources: AsyncGenerator<TInput, void, unknown>[],
config: {
concurrency?: number;
errorThreshold?: number;
rateLimitPerSecond?: number;
} = {}
): AsyncGenerator<TOutput, void, unknown> {
const { concurrency = 5, errorThreshold = 0.1, rateLimitPerSecond } = config;
// ๐ Merge all sources
const mergedSource = this.mergeSources(sources);
// ๐ฏ Apply rate limiting if specified
const rateLimitedSource = rateLimitPerSecond
? this.rateLimitedProcessor(mergedSource, rateLimitPerSecond)
: mergedSource;
// ๐ Process through all stages
let currentStream: AsyncGenerator<any, void, unknown> = rateLimitedSource;
for (const [index, stage] of this.stages.entries()) {
currentStream = this.processStage(
currentStream,
stage,
concurrency,
errorThreshold,
`Stage ${index + 1}: ${stage.name}`
);
}
// ๐ Yield final results
for await (const result of currentStream) {
yield result;
}
}
// ๐ Merge multiple sources
private async* mergeSources(
sources: AsyncGenerator<TInput, void, unknown>[]
): AsyncGenerator<TInput, void, unknown> {
const iterators = sources.map(source => ({
iterator: source[Symbol.asyncIterator](),
done: false,
pending: null as Promise<IteratorResult<TInput, void>> | null
}));
// ๐ Start first iteration for all sources
for (const iter of iterators) {
iter.pending = iter.iterator.next();
}
while (iterators.some(iter => !iter.done)) {
const pendingPromises = iterators
.filter(iter => iter.pending && !iter.done)
.map(async (iter) => {
const result = await iter.pending!;
return { iter, result };
});
if (pendingPromises.length === 0) break;
const { iter, result } = await Promise.race(pendingPromises);
if (result.done) {
iter.done = true;
iter.pending = null;
} else {
yield result.value;
iter.pending = iter.iterator.next();
}
}
}
// โ๏ธ Process single stage
private async* processStage<TStageInput, TStageOutput>(
source: AsyncGenerator<TStageInput, void, unknown>,
stage: PipelineStage<TStageInput, TStageOutput>,
concurrency: number,
errorThreshold: number,
stageName: string
): AsyncGenerator<TStageOutput, void, unknown> {
let processed = 0;
let errors = 0;
const startTime = Date.now();
let lastProgressUpdate = startTime;
const processor = new OptimizedAsyncGenerator();
const processItem = async (item: TStageInput): Promise<TStageOutput | null> => {
try {
const result = await stage.transform(item);
// ๐ Validate if validator provided
if (stage.validate && !stage.validate(result)) {
throw new Error('Validation failed');
}
return result;
} catch (error) {
errors++;
// ๐ Check error threshold
if (processed > 0 && (errors / processed) > errorThreshold) {
throw new Error(`Error threshold exceeded: ${errors}/${processed} = ${((errors/processed)*100).toFixed(1)}%`);
}
// ๐ก๏ธ Use error handler if available
if (stage.onError) {
return stage.onError(error instanceof Error ? error : new Error('Unknown error'), item);
}
return null; // Skip this item
}
};
for await (const result of processor.parallelProcessor(source, processItem, concurrency)) {
if (result !== null) {
processed++;
// ๐ Update progress
const now = Date.now();
if (now - lastProgressUpdate >= 1000) { // Update every second
const throughput = processed / ((now - startTime) / 1000);
this.progressCallbacks.forEach(callback => {
callback({
stageName,
processed,
errors,
throughput,
timestamp: new Date()
});
});
lastProgressUpdate = now;
}
yield result;
}
}
// ๐ Final progress update
const finalThroughput = processed / ((Date.now() - startTime) / 1000);
this.progressCallbacks.forEach(callback => {
callback({
stageName: `${stageName} (COMPLETE)`,
processed,
errors,
throughput: finalThroughput,
timestamp: new Date()
});
});
}
// โฑ๏ธ Rate limiting
private async* rateLimitedProcessor<T>(
source: AsyncGenerator<T, void, unknown>,
itemsPerSecond: number
): AsyncGenerator<T, void, unknown> {
const delayMs = 1000 / itemsPerSecond;
let lastYieldTime = 0;
for await (const item of source) {
const now = Date.now();
const timeSinceLastYield = now - lastYieldTime;
if (timeSinceLastYield < delayMs) {
await new Promise(resolve => setTimeout(resolve, delayMs - timeSinceLastYield));
}
lastYieldTime = Date.now();
yield item;
}
}
}
// ๐ฎ Demo pipeline usage
async function demonstrateDataPipeline(): Promise<void> {
// ๐ญ Create data sources
async function* createCsvSource(): AsyncGenerator<{ name: string; age: string; city: string }, void, unknown> {
const data = [
{ name: 'Alice', age: '30', city: 'New York' },
{ name: 'Bob', age: '25', city: 'San Francisco' },
{ name: 'Charlie', age: 'invalid', city: 'Chicago' }, // Invalid age
{ name: 'Diana', age: '35', city: 'Boston' },
{ name: 'Eve', age: '28', city: 'Seattle' }
];
for (const item of data) {
yield item;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
async function* createApiSource(): AsyncGenerator<{ name: string; age: string; city: string }, void, unknown> {
const data = [
{ name: 'Frank', age: '32', city: 'Miami' },
{ name: 'Grace', age: '29', city: 'Denver' },
{ name: 'Henry', age: '40', city: 'Portland' }
];
for (const item of data) {
yield item;
await new Promise(resolve => setTimeout(resolve, 150));
}
}
// ๐๏ธ Build pipeline
const pipeline = new DataPipeline<{ name: string; age: string; city: string }, { name: string; age: number; city: string; category: string }>()
.addStage({
name: 'Parse Age',
transform: async (item) => {
const age = parseInt(item.age, 10);
if (isNaN(age)) {
throw new Error(`Invalid age: ${item.age}`);
}
return { ...item, age };
},
onError: (error, item) => {
console.log(`โ ๏ธ Skipping invalid age for ${item.name}: ${item.age}`);
return null; // Skip this item
}
})
.addStage({
name: 'Categorize',
transform: async (item) => {
await new Promise(resolve => setTimeout(resolve, 50)); // Simulate processing
let category = 'unknown';
if (item.age < 30) category = 'young';
else if (item.age < 40) category = 'middle';
else category = 'senior';
return { ...item, category };
},
validate: (result) => result.category !== 'unknown'
});
// ๐ Monitor progress
pipeline.onProgress(progress => {
console.log(`๐ ${progress.stageName}: ${progress.processed} processed, ${progress.errors} errors, ${progress.throughput.toFixed(1)} items/sec`);
});
// ๐ Execute pipeline
console.log('๐ Starting data pipeline...');
const sources = [createCsvSource(), createApiSource()];
const results: any[] = [];
for await (const result of pipeline.execute(sources, {
concurrency: 3,
errorThreshold: 0.2,
rateLimitPerSecond: 5
})) {
results.push(result);
console.log(`โ
Pipeline output:`, result);
}
console.log(`๐ Pipeline complete! Processed ${results.length} items total`);
}
๐ Key Takeaways
Youโve mastered async generators! Hereโs what you can now do:
- โ Create efficient streaming data processors with memory control ๐ช
- โ Handle large datasets without memory issues like a pro ๐ก๏ธ
- โ Build resilient async iteration patterns with confidence ๐ฏ
- โ Implement backpressure and flow control effectively ๐
- โ Combine async generators with other patterns for powerful applications! ๐
Remember: Async generators are perfect for streaming, lazy evaluation, and processing data that doesnโt all fit in memory at once! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered async generators and streaming patterns!
Hereโs what to do next:
- ๐ป Practice with the data pipeline exercise above
- ๐๏ธ Build a streaming application using async generators
- ๐ Explore Node.js streams and how they complement async generators
- ๐ Share your async generator knowledge with others!
Remember: The best way to master async generators is to build something that processes real data streams. Start small and iterate! ๐
Happy streaming! ๐๐โจ