Prerequisites
- Basic TypeScript syntax and types π
- Understanding of Promises and async/await β‘
- Array iteration methods and for-of loops π
What you'll learn
- Master async iterators for streaming data processing π
- Build memory-efficient async data pipelines with for-await-of π
- Create custom async iterables for complex data sources ποΈ
- Handle real-time data streams with elegant async iteration π‘
π― Introduction
Welcome to the streaming world of async iterators and for-await-of loops! π In this guide, weβll explore how to process data streams asynchronously with elegant iteration patterns that handle massive datasets efficiently.
Youβll discover how to transform overwhelming data processing tasks into manageable, memory-efficient operations that can handle anything from API pagination π to real-time data feeds π‘. Whether youβre processing large files π, consuming streaming APIs π, or building data pipelines π, mastering async iterators is essential for building scalable TypeScript applications.
By the end of this tutorial, youβll be creating data processing powerhouses that stream through information like a flowing river! π Letβs dive in! πββοΈ
π Understanding Async Iterators
π€ What are Async Iterators?
Async iterators are like having a conveyor belt that delivers data one piece at a time, asynchronously π¦. Think of it as the difference between downloading an entire movie before watching versus streaming it - async iterators let you process data as it arrives!
In TypeScript terms, async iterators provide:
- β¨ Streaming processing - handle data as it becomes available
- π Memory efficiency - process large datasets without loading everything at once
- π‘οΈ Type safety - maintain strong typing throughout async iteration
- π¦ Lazy evaluation - only fetch data when needed
π‘ Why Use Async Iterators?
Hereβs why async iterators are game-changers:
- Memory Efficiency πͺ: Process massive datasets without overwhelming memory
- Real-Time Processing β‘: Handle streaming data as it arrives
- Better UX π: Show progress and partial results immediately
- Scalability π: Handle datasets of any size gracefully
- Resource Management π§: Optimal use of CPU and network resources
Real-world example: Processing a 10GB CSV file. Instead of loading it all into memory (π₯), async iterators let you process one row at a time while the file streams in! π
π§ Basic For-Await-Of Patterns
π Simple Async Iteration
Letβs start with fundamental for-await-of usage:
// π― Basic for-await-of with async generators
// TypeScript provides excellent type inference for async iteration
// π Simple async data processor
const processDataStream = async function* (): AsyncGenerator<ProcessedData, void, unknown> {
const dataSources = ['api-1', 'api-2', 'api-3', 'api-4'];
console.log('π Starting data stream processing...');
for (const source of dataSources) {
try {
// π Fetch data asynchronously
const rawData = await fetchDataFromSource(source);
// π Process each piece
const processed = await processRawData(rawData);
console.log(`β
Processed data from ${source}`);
yield processed; // π€ Yield results one at a time
} catch (error) {
console.error(`π₯ Error processing ${source}:`, error.message);
// β οΈ Continue with next source instead of failing entirely
continue;
}
}
console.log('π Stream processing complete!');
};
// ποΈ Type definitions
interface ProcessedData {
id: string;
content: string;
processedAt: Date;
source: string;
metrics: DataMetrics;
}
interface DataMetrics {
size: number;
processingTime: number;
quality: 'high' | 'medium' | 'low';
}
// β¨ Using for-await-of to consume the stream
const consumeDataStream = async (): Promise<void> => {
const results: ProcessedData[] = [];
console.log('π Starting stream consumption...');
// π Iterate through async stream
for await (const data of processDataStream()) {
// π¦ Process each piece as it arrives
results.push(data);
// π Show progress
console.log(`π Processed ${results.length} items so far...`);
// π― Optional: Add delay for demonstration
await new Promise(resolve => setTimeout(resolve, 100));
}
console.log(`β
Final results: ${results.length} items processed`);
return results;
};
π Advanced Streaming Patterns
Letβs explore sophisticated async iteration patterns:
// π Advanced streaming data pipeline with type safety
class AsyncDataPipeline<TInput, TOutput> {
private transformers: AsyncTransformer<any, any>[] = [];
// π Chainable transformer registration
pipe<TNext>(transformer: AsyncTransformer<TOutput, TNext>): AsyncDataPipeline<TInput, TNext> {
return new AsyncDataPipeline<TInput, TNext>([...this.transformers, transformer]);
}
// π Execute the pipeline with async iteration
async *process(source: AsyncIterable<TInput>): AsyncGenerator<TOutput, void, unknown> {
console.log('π Starting pipeline execution...');
for await (const item of source) {
try {
let current: any = item;
// π Apply each transformer in sequence
for (const transformer of this.transformers) {
current = await transformer.transform(current);
}
yield current as TOutput;
} catch (error) {
console.error('π₯ Pipeline error:', error.message);
// π‘οΈ Continue processing other items
continue;
}
}
console.log('β
Pipeline execution complete');
}
// π Collect all results (use with caution for large datasets)
async collect(source: AsyncIterable<TInput>): Promise<TOutput[]> {
const results: TOutput[] = [];
for await (const item of this.process(source)) {
results.push(item);
}
return results;
}
constructor(transformers: AsyncTransformer<any, any>[] = []) {
this.transformers = transformers;
}
}
// π οΈ Generic transformer interface
interface AsyncTransformer<TInput, TOutput> {
transform(input: TInput): Promise<TOutput>;
}
// π§ Practical transformer implementations
class DataValidationTransformer implements AsyncTransformer<RawDataItem, ValidatedDataItem> {
async transform(input: RawDataItem): Promise<ValidatedDataItem> {
console.log('π Validating data item...');
// π Validation logic
if (!input.id || !input.content) {
throw new Error('Invalid data item: missing required fields');
}
return {
...input,
isValid: true,
validatedAt: new Date()
};
}
}
class DataEnrichmentTransformer implements AsyncTransformer<ValidatedDataItem, EnrichedDataItem> {
async transform(input: ValidatedDataItem): Promise<EnrichedDataItem> {
console.log('β¨ Enriching data item...');
// π Fetch additional data
const metadata = await fetchMetadata(input.id);
const tags = await generateTags(input.content);
return {
...input,
metadata,
tags,
enrichedAt: new Date()
};
}
}
class DataCompressionTransformer implements AsyncTransformer<EnrichedDataItem, CompressedDataItem> {
async transform(input: EnrichedDataItem): Promise<CompressedDataItem> {
console.log('ποΈ Compressing data item...');
// π¦ Compression logic
const compressedContent = await compressData(input.content);
return {
id: input.id,
compressedContent,
originalSize: input.content.length,
compressedSize: compressedContent.length,
compressionRatio: compressedContent.length / input.content.length,
processedAt: new Date()
};
}
}
// ποΈ Type definitions for pipeline stages
interface RawDataItem {
id: string;
content: string;
source: string;
}
interface ValidatedDataItem extends RawDataItem {
isValid: boolean;
validatedAt: Date;
}
interface EnrichedDataItem extends ValidatedDataItem {
metadata: DataMetadata;
tags: string[];
enrichedAt: Date;
}
interface CompressedDataItem {
id: string;
compressedContent: string;
originalSize: number;
compressedSize: number;
compressionRatio: number;
processedAt: Date;
}
interface DataMetadata {
category: string;
priority: 'high' | 'medium' | 'low';
region: string;
version: number;
}
π Working with Async Iterables
π‘ Real-World Stream Processing
Letβs build a comprehensive real-time data processing system:
// π Real-time event stream processor
class RealTimeEventProcessor {
private eventStreams: Map<string, AsyncIterable<Event>> = new Map();
private processors: Map<EventType, EventProcessor> = new Map();
private subscribers: Map<string, EventSubscriber[]> = new Map();
// π‘ Register an event stream
registerStream(streamId: string, stream: AsyncIterable<Event>): void {
console.log(`π‘ Registering event stream: ${streamId}`);
this.eventStreams.set(streamId, stream);
}
// π Register event processor
registerProcessor(eventType: EventType, processor: EventProcessor): void {
console.log(`π Registering processor for: ${eventType}`);
this.processors.set(eventType, processor);
}
// π Register event subscriber
subscribe(eventType: string, subscriber: EventSubscriber): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, []);
}
this.subscribers.get(eventType)!.push(subscriber);
console.log(`π New subscriber for: ${eventType}`);
}
// π Start processing all streams
async startProcessing(): Promise<void> {
console.log('π Starting real-time event processing...');
const streamPromises = Array.from(this.eventStreams.entries()).map(
([streamId, stream]) => this.processStream(streamId, stream)
);
// π Process all streams concurrently
await Promise.all(streamPromises);
}
// π Process individual stream
private async processStream(streamId: string, stream: AsyncIterable<Event>): Promise<void> {
console.log(`π Processing stream: ${streamId}`);
try {
// π Iterate through events as they arrive
for await (const event of stream) {
await this.processEvent(streamId, event);
}
} catch (error) {
console.error(`π₯ Stream ${streamId} error:`, error.message);
// π Attempt to reconnect or handle gracefully
await this.handleStreamError(streamId, error);
}
}
// β‘ Process individual event
private async processEvent(streamId: string, event: Event): Promise<void> {
try {
console.log(`β‘ Processing event from ${streamId}:`, event.type);
// π Find and execute processor
const processor = this.processors.get(event.type);
if (processor) {
const processedEvent = await processor.process(event);
// π€ Notify subscribers
await this.notifySubscribers(event.type, processedEvent);
} else {
console.warn(`β οΈ No processor found for event type: ${event.type}`);
}
} catch (error) {
console.error(`π₯ Event processing error:`, error.message);
// π‘οΈ Continue processing other events
}
}
// π€ Notify all subscribers
private async notifySubscribers(eventType: EventType, event: ProcessedEvent): Promise<void> {
const subscribers = this.subscribers.get(eventType) || [];
// π‘ Notify all subscribers concurrently
const notifications = subscribers.map(subscriber =>
subscriber.onEvent(event).catch(error =>
console.error(`π₯ Subscriber notification error:`, error.message)
)
);
await Promise.all(notifications);
}
// π‘οΈ Handle stream errors with recovery
private async handleStreamError(streamId: string, error: Error): Promise<void> {
console.log(`π‘οΈ Handling stream error for ${streamId}...`);
// π Implement retry logic or reconnection
setTimeout(() => {
console.log(`π Attempting to reconnect stream: ${streamId}`);
// Reconnection logic would go here
}, 5000);
}
}
// ποΈ Event system type definitions
interface Event {
id: string;
type: EventType;
timestamp: Date;
data: any;
metadata: EventMetadata;
}
interface ProcessedEvent extends Event {
processedAt: Date;
processingTime: number;
enrichedData?: any;
}
interface EventMetadata {
source: string;
priority: 'critical' | 'high' | 'medium' | 'low';
correlationId?: string;
userId?: string;
}
type EventType = 'user_action' | 'system_event' | 'notification' | 'error' | 'metrics';
interface EventProcessor {
process(event: Event): Promise<ProcessedEvent>;
}
interface EventSubscriber {
onEvent(event: ProcessedEvent): Promise<void>;
}
// π§ Concrete processor implementations
class UserActionProcessor implements EventProcessor {
async process(event: Event): Promise<ProcessedEvent> {
const startTime = Date.now();
console.log('π€ Processing user action...');
// π Analyze user behavior
const analysis = await this.analyzeUserBehavior(event.data);
// π Update user metrics
await this.updateUserMetrics(event.metadata.userId!, analysis);
return {
...event,
processedAt: new Date(),
processingTime: Date.now() - startTime,
enrichedData: analysis
};
}
private async analyzeUserBehavior(data: any): Promise<UserBehaviorAnalysis> {
// π§ Complex analysis logic
await new Promise(resolve => setTimeout(resolve, 50)); // Simulate processing
return {
actionType: data.action,
frequency: Math.floor(Math.random() * 100),
pattern: 'normal',
riskScore: Math.random()
};
}
private async updateUserMetrics(userId: string, analysis: UserBehaviorAnalysis): Promise<void> {
console.log(`π Updating metrics for user: ${userId}`);
// Database update logic would go here
}
}
interface UserBehaviorAnalysis {
actionType: string;
frequency: number;
pattern: 'normal' | 'suspicious' | 'anomalous';
riskScore: number;
}
// π§ Email notification subscriber
class EmailNotificationSubscriber implements EventSubscriber {
async onEvent(event: ProcessedEvent): Promise<void> {
if (event.metadata.priority === 'critical') {
console.log('π§ Sending critical event email notification...');
// π¬ Send email logic
await this.sendEmail({
to: '[email protected]',
subject: `Critical Event: ${event.type}`,
body: `Event ${event.id} requires immediate attention.`
});
}
}
private async sendEmail(email: EmailData): Promise<void> {
// π¬ Email sending implementation
console.log(`π¬ Email sent: ${email.subject}`);
}
}
interface EmailData {
to: string;
subject: string;
body: string;
}
π οΈ Custom Async Iterables
ποΈ Building Custom Iterables
Letβs create powerful custom async iterables:
// π Paginated API data fetcher with async iteration
class PaginatedAPIIterator<T> implements AsyncIterable<T> {
private baseUrl: string;
private headers: Record<string, string>;
private pageSize: number;
private totalItems?: number;
constructor(
baseUrl: string,
options: {
headers?: Record<string, string>;
pageSize?: number;
} = {}
) {
this.baseUrl = baseUrl;
this.headers = options.headers || {};
this.pageSize = options.pageSize || 50;
}
// π Implement AsyncIterable interface
[Symbol.asyncIterator](): AsyncIterator<T> {
return this.createIterator();
}
// ποΈ Create the actual iterator
private createIterator(): AsyncIterator<T> {
let currentPage = 1;
let currentIndex = 0;
let currentBatch: T[] = [];
let finished = false;
return {
async next(): Promise<IteratorResult<T>> {
// π¦ If we have items in current batch, return next item
if (currentIndex < currentBatch.length) {
const value = currentBatch[currentIndex++];
return { value, done: false };
}
// π If we're finished, return done
if (finished) {
return { value: undefined, done: true };
}
try {
// π Fetch next page
console.log(`π Fetching page ${currentPage}...`);
const response = await fetch(`${this.baseUrl}?page=${currentPage}&limit=${this.pageSize}`, {
headers: this.headers
});
if (!response.ok) {
throw new Error(`API request failed: ${response.status}`);
}
const data: PaginatedResponse<T> = await response.json();
// π Update state
currentBatch = data.items;
currentIndex = 0;
currentPage++;
// π Check if we're done
if (currentBatch.length === 0 || data.hasNext === false) {
finished = true;
}
// π¦ Return first item from new batch
if (currentBatch.length > 0) {
const value = currentBatch[currentIndex++];
return { value, done: false };
} else {
return { value: undefined, done: true };
}
} catch (error) {
console.error('π₯ API iteration error:', error.message);
finished = true;
throw error;
}
}
};
}
// π Get estimated total count
async getEstimatedTotal(): Promise<number | undefined> {
if (this.totalItems !== undefined) {
return this.totalItems;
}
try {
// π Fetch metadata
const response = await fetch(`${this.baseUrl}/count`, {
headers: this.headers
});
if (response.ok) {
const { total } = await response.json();
this.totalItems = total;
return total;
}
} catch (error) {
console.warn('β οΈ Could not fetch total count:', error.message);
}
return undefined;
}
}
// ποΈ Paginated response interface
interface PaginatedResponse<T> {
items: T[];
page: number;
limit: number;
total: number;
hasNext: boolean;
}
// π Advanced batch processor with progress tracking
class BatchProcessor<TInput, TOutput> {
private batchSize: number;
private concurrency: number;
private processor: (batch: TInput[]) => Promise<TOutput[]>;
constructor(
processor: (batch: TInput[]) => Promise<TOutput[]>,
options: {
batchSize?: number;
concurrency?: number;
} = {}
) {
this.processor = processor;
this.batchSize = options.batchSize || 10;
this.concurrency = options.concurrency || 3;
}
// π Process async iterable in batches
async *processBatches(
source: AsyncIterable<TInput>
): AsyncGenerator<BatchResult<TOutput>, void, unknown> {
const activeBatches = new Map<number, Promise<BatchResult<TOutput>>>();
let batchId = 0;
let currentBatch: TInput[] = [];
let totalItems = 0;
console.log('π Starting batch processing...');
try {
// π Iterate through source
for await (const item of source) {
currentBatch.push(item);
totalItems++;
// π¦ Process when batch is full
if (currentBatch.length >= this.batchSize) {
const batch = [...currentBatch];
const id = batchId++;
// π Start batch processing
activeBatches.set(id, this.processBatch(id, batch));
currentBatch = [];
// π Manage concurrency
if (activeBatches.size >= this.concurrency) {
const result = await this.getNextCompletedBatch(activeBatches);
yield result;
}
}
}
// π¦ Process remaining items
if (currentBatch.length > 0) {
const id = batchId++;
activeBatches.set(id, this.processBatch(id, currentBatch));
}
// π Wait for all remaining batches
while (activeBatches.size > 0) {
const result = await this.getNextCompletedBatch(activeBatches);
yield result;
}
console.log(`β
Batch processing complete: ${totalItems} items processed`);
} catch (error) {
console.error('π₯ Batch processing error:', error.message);
throw error;
}
}
// ποΈ Process single batch
private async processBatch(id: number, batch: TInput[]): Promise<BatchResult<TOutput>> {
const startTime = Date.now();
console.log(`π¦ Processing batch ${id} with ${batch.length} items...`);
try {
const results = await this.processor(batch);
const processingTime = Date.now() - startTime;
console.log(`β
Batch ${id} completed in ${processingTime}ms`);
return {
id,
results,
inputCount: batch.length,
outputCount: results.length,
processingTime,
success: true
};
} catch (error) {
const processingTime = Date.now() - startTime;
console.error(`π₯ Batch ${id} failed:`, error.message);
return {
id,
results: [],
inputCount: batch.length,
outputCount: 0,
processingTime,
success: false,
error: error.message
};
}
}
// β³ Get next completed batch
private async getNextCompletedBatch(
activeBatches: Map<number, Promise<BatchResult<TOutput>>>
): Promise<BatchResult<TOutput>> {
const [batchId, promise] = Array.from(activeBatches.entries())[0];
const result = await promise;
activeBatches.delete(batchId);
return result;
}
}
// ποΈ Batch result interface
interface BatchResult<T> {
id: number;
results: T[];
inputCount: number;
outputCount: number;
processingTime: number;
success: boolean;
error?: string;
}
π― Practical Applications
π Complete Data Processing Pipeline
Letβs build a comprehensive real-world application:
// π Complete data ingestion and processing system
class DataIngestionPipeline {
private sources: Map<string, AsyncIterable<RawData>> = new Map();
private transformers: DataTransformer[] = [];
private outputs: Map<string, DataOutput> = new Map();
private metrics: ProcessingMetrics = {
totalProcessed: 0,
successCount: 0,
errorCount: 0,
startTime: new Date(),
averageProcessingTime: 0
};
// π‘ Register data source
addSource(name: string, source: AsyncIterable<RawData>): this {
console.log(`π‘ Adding data source: ${name}`);
this.sources.set(name, source);
return this;
}
// π Add data transformer
addTransformer(transformer: DataTransformer): this {
console.log(`π Adding transformer: ${transformer.name}`);
this.transformers.push(transformer);
return this;
}
// π€ Add output destination
addOutput(name: string, output: DataOutput): this {
console.log(`π€ Adding output: ${name}`);
this.outputs.set(name, output);
return this;
}
// π Start the pipeline
async run(): Promise<ProcessingMetrics> {
console.log('π Starting data ingestion pipeline...');
this.metrics.startTime = new Date();
try {
// π Process all sources concurrently
const sourcePromises = Array.from(this.sources.entries()).map(
([name, source]) => this.processSource(name, source)
);
await Promise.all(sourcePromises);
// π Finalize metrics
this.metrics.averageProcessingTime = this.calculateAverageProcessingTime();
console.log('β
Pipeline completed successfully');
console.log('π Final metrics:', this.metrics);
return this.metrics;
} catch (error) {
console.error('π₯ Pipeline error:', error.message);
throw error;
}
}
// π Process individual data source
private async processSource(sourceName: string, source: AsyncIterable<RawData>): Promise<void> {
console.log(`π Processing source: ${sourceName}`);
const processingStartTime = Date.now();
let itemCount = 0;
try {
// π Process each item from the source
for await (const rawItem of source) {
await this.processItem(sourceName, rawItem);
itemCount++;
// π Update progress periodically
if (itemCount % 100 === 0) {
console.log(`π Processed ${itemCount} items from ${sourceName}`);
}
}
const processingTime = Date.now() - processingStartTime;
console.log(`β
Source ${sourceName} complete: ${itemCount} items in ${processingTime}ms`);
} catch (error) {
console.error(`π₯ Source ${sourceName} error:`, error.message);
this.metrics.errorCount++;
}
}
// β‘ Process individual item
private async processItem(sourceName: string, rawItem: RawData): Promise<void> {
const itemStartTime = Date.now();
try {
let currentData: any = { ...rawItem, sourceName };
// π Apply all transformers
for (const transformer of this.transformers) {
currentData = await transformer.transform(currentData);
}
// π€ Send to all outputs
const outputPromises = Array.from(this.outputs.values()).map(
output => output.write(currentData).catch(error => {
console.error(`π₯ Output error:`, error.message);
})
);
await Promise.all(outputPromises);
// π Update metrics
this.metrics.totalProcessed++;
this.metrics.successCount++;
} catch (error) {
console.error(`π₯ Item processing error:`, error.message);
this.metrics.errorCount++;
}
}
// π Calculate average processing time
private calculateAverageProcessingTime(): number {
const totalTime = Date.now() - this.metrics.startTime.getTime();
return this.metrics.totalProcessed > 0 ? totalTime / this.metrics.totalProcessed : 0;
}
// π Get real-time metrics
getMetrics(): ProcessingMetrics {
return { ...this.metrics };
}
}
// ποΈ System interfaces
interface RawData {
id: string;
content: any;
timestamp: Date;
metadata: Record<string, any>;
}
interface DataTransformer {
name: string;
transform(data: any): Promise<any>;
}
interface DataOutput {
write(data: any): Promise<void>;
}
interface ProcessingMetrics {
totalProcessed: number;
successCount: number;
errorCount: number;
startTime: Date;
averageProcessingTime: number;
}
// π§ Concrete transformer implementations
class ValidationTransformer implements DataTransformer {
name = 'ValidationTransformer';
async transform(data: any): Promise<any> {
// π Validate required fields
if (!data.id || !data.content) {
throw new Error('Missing required fields');
}
return {
...data,
validated: true,
validatedAt: new Date()
};
}
}
class EnrichmentTransformer implements DataTransformer {
name = 'EnrichmentTransformer';
async transform(data: any): Promise<any> {
// β¨ Add enriched data
const enrichment = await this.fetchEnrichmentData(data.id);
return {
...data,
enrichment,
enrichedAt: new Date()
};
}
private async fetchEnrichmentData(id: string): Promise<any> {
// π Simulate API call
await new Promise(resolve => setTimeout(resolve, 10));
return { category: 'default', priority: 'medium' };
}
}
// π Database output implementation
class DatabaseOutput implements DataOutput {
async write(data: any): Promise<void> {
console.log(`πΎ Writing to database: ${data.id}`);
// ποΈ Simulate database write
await new Promise(resolve => setTimeout(resolve, 5));
}
}
// π File output implementation
class FileOutput implements DataOutput {
private filename: string;
constructor(filename: string) {
this.filename = filename;
}
async write(data: any): Promise<void> {
console.log(`π Writing to file ${this.filename}: ${data.id}`);
// π Simulate file write
await new Promise(resolve => setTimeout(resolve, 3));
}
}
π‘ Best Practices & Tips
π― Performance Optimization
Here are essential tips for optimal async iteration:
// β
DO: Use async iterators for large datasets
const processLargeDataset = async function* (source: AsyncIterable<DataItem>) {
// π Process one item at a time, keeping memory usage low
for await (const item of source) {
const processed = await processItem(item);
yield processed; // π€ Yield immediately to free memory
}
};
// β DON'T: Load everything into memory first
const processLargeDatasetBad = async (source: AsyncIterable<DataItem>) => {
const allItems: DataItem[] = [];
// π₯ This will consume massive memory for large datasets!
for await (const item of source) {
allItems.push(item);
}
return allItems.map(processItem); // π« Memory explosion!
};
// β
DO: Handle errors gracefully without stopping iteration
const resilientProcessing = async function* (source: AsyncIterable<DataItem>) {
for await (const item of source) {
try {
const result = await processItem(item);
yield { success: true, data: result };
} catch (error) {
console.error(`π₯ Error processing item:`, error.message);
yield { success: false, error: error.message };
// β
Continue processing instead of failing entirely
}
}
};
// β
DO: Implement backpressure for high-throughput scenarios
class BackpressureController<T> {
private buffer: T[] = [];
private maxBufferSize: number;
private processing = false;
constructor(maxBufferSize: number = 100) {
this.maxBufferSize = maxBufferSize;
}
async *process(source: AsyncIterable<T>): AsyncGenerator<T[], void, unknown> {
for await (const item of source) {
this.buffer.push(item);
// π¦ Yield batches when buffer is full
if (this.buffer.length >= this.maxBufferSize) {
const batch = [...this.buffer];
this.buffer = [];
yield batch;
}
}
// π¦ Yield remaining items
if (this.buffer.length > 0) {
yield [...this.buffer];
}
}
}
π‘οΈ Error Handling Strategies
Robust error handling for async iteration:
// π‘οΈ Comprehensive error handling patterns
class RobustAsyncProcessor<T, R> {
private retryAttempts: number;
private retryDelay: number;
constructor(options: { retryAttempts?: number; retryDelay?: number } = {}) {
this.retryAttempts = options.retryAttempts || 3;
this.retryDelay = options.retryDelay || 1000;
}
// π Process with automatic retry and error recovery
async *processWithRetry(
source: AsyncIterable<T>,
processor: (item: T) => Promise<R>
): AsyncGenerator<ProcessingResult<R>, void, unknown> {
for await (const item of source) {
yield await this.processItemWithRetry(item, processor);
}
}
// π Individual item processing with retry logic
private async processItemWithRetry(
item: T,
processor: (item: T) => Promise<R>
): Promise<ProcessingResult<R>> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= this.retryAttempts; attempt++) {
try {
console.log(`π Processing attempt ${attempt}/${this.retryAttempts}`);
const result = await processor(item);
return {
success: true,
data: result,
attempts: attempt
};
} catch (error) {
lastError = error as Error;
console.error(`π₯ Attempt ${attempt} failed:`, error.message);
// π Wait before retrying (except on last attempt)
if (attempt < this.retryAttempts) {
await this.delay(this.retryDelay * attempt);
}
}
}
// π« All attempts failed
return {
success: false,
error: lastError!.message,
attempts: this.retryAttempts
};
}
// β³ Delay helper
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// ποΈ Processing result interface
interface ProcessingResult<T> {
success: boolean;
data?: T;
error?: string;
attempts: number;
}
// π§ Circuit breaker for failing services
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private failureThreshold: number = 5,
private recoveryTimeout: number = 30000
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = 'half-open';
console.log('π Circuit breaker half-open, testing...');
} else {
throw new Error('Circuit breaker is open - service unavailable');
}
}
try {
const result = await operation();
// β
Success - reset circuit breaker
if (this.state === 'half-open') {
this.state = 'closed';
this.failures = 0;
console.log('β
Circuit breaker closed - service recovered');
}
return result;
} catch (error) {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'open';
console.log('π« Circuit breaker opened - service failing');
}
throw error;
}
}
}
π Summary
Congratulations! π Youβve mastered async iterators and for-await-of loops in TypeScript! Hereβs what youβve accomplished:
π― Key Takeaways
- Streaming Power π: You can now process large datasets efficiently without memory overload
- Type Safety π‘οΈ: Maintain strong typing throughout complex async iteration flows
- Performance Optimization π: Build high-performance data processing pipelines
- Error Resilience πͺ: Handle failures gracefully with retry and recovery patterns
- Real-World Applications π: Create production-ready data ingestion systems
π οΈ What You Can Build
- Data Processing Pipelines π: Stream through massive datasets efficiently
- Real-Time Event Systems β‘: Handle live data feeds and streaming APIs
- Batch Processing Systems π¦: Process large volumes of data in manageable chunks
- API Pagination Handlers π: Seamlessly iterate through paginated API responses
- File Streaming Processors π: Process large files without loading them entirely
π Next Steps
Ready to take your async skills even further? Consider exploring:
- Event Emitters π‘: Type-safe event-driven programming patterns
- Reactive Streams π: RxJS integration with TypeScript
- Worker Threads π₯: Parallel processing with Web Workers
- Performance Monitoring π: Advanced async operation profiling
Youβre now equipped to build scalable, efficient async applications that can handle any data processing challenge! π― Keep iterating and streaming! πβ¨