Prerequisites
- Basic TypeScript syntax and types ๐
- Understanding of Node.js fundamentals ๐ง
- Async/await and Promise concepts โก
What you'll learn
- Master Node.js streams for efficient data processing and memory management ๐
- Build scalable streaming applications with transform and pipeline patterns ๐
- Handle large datasets and files without memory overflow ๐พ
- Create production-ready streaming APIs and data processors ๐ญ
๐ฏ Introduction
Welcome to the powerful world of Node.js streams with TypeScript! ๐ In this guide, weโll explore how to process massive amounts of data efficiently, handle large files gracefully, and build high-performance applications that scale beautifully.
Youโll discover how to transform data processing from memory-hungry operations into elegant, efficient streams that can handle gigabytes of data with minimal memory usage. Whether youโre building file processors ๐, real-time data pipelines ๐, or high-throughput APIs ๐, mastering Node.js streams is essential for creating scalable TypeScript applications.
By the end of this tutorial, youโll be streaming data like a performance optimization wizard! ๐ Letโs dive in! ๐โโ๏ธ
๐ Understanding Node.js Streams
๐ค What are Streams?
Streams are like having a smart assembly line for your data ๐ญ. Think of them as pipes where data flows through in chunks, allowing you to process massive amounts of information without loading everything into memory at once!
In TypeScript with Node.js, streams provide:
- โจ Memory efficiency - process large files without memory overflow
- ๐ Performance - handle data as it arrives, not after everything loads
- ๐ก๏ธ Backpressure handling - automatic flow control when processing is slow
- ๐ฆ Composability - chain operations like building blocks
๐ก Why Use Streams?
Hereโs why streams are architectural game-changers:
- Memory Efficiency ๐พ: Process 10GB files with only 64KB of memory
- Real-Time Processing โก: Start working on data before the full download completes
- Scalability ๐: Handle thousands of concurrent file operations
- Composability ๐: Chain transformations like functional programming
- Backpressure ๐ฐ: Automatic flow control prevents memory overflow
Real-world example: Processing a 5GB CSV file to extract specific records - streams let you process line-by-line without loading the entire file into memory! ๐
๐ง Basic Stream Types and Patterns
๐ Readable Streams - Data Sources
Letโs start with fundamental stream patterns:
// ๐ฏ Basic readable stream patterns
// TypeScript provides excellent type safety for stream operations
import { Readable, Writable, Transform, pipeline } from 'stream';
import { createReadStream, createWriteStream } from 'fs';
import { promisify } from 'util';
// ๐ Custom readable stream with type safety
class NumberGenerator extends Readable {
private current: number = 1;
private max: number;
constructor(max: number) {
super({ objectMode: true });
this.max = max;
console.log(`๐ข Number generator created (1 to ${max})`);
}
_read(): void {
if (this.current <= this.max) {
console.log(`๐ค Generating number: ${this.current}`);
this.push(this.current++);
} else {
console.log('๐ Number generation complete');
this.push(null); // Signal end of stream
}
}
}
// ๐ File reading stream with error handling
const createFileReader = (filePath: string): Readable => {
console.log(`๐ Creating file reader for: ${filePath}`);
const stream = createReadStream(filePath, {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB chunks for memory efficiency
});
stream.on('open', () => {
console.log(`โ
File opened successfully: ${filePath}`);
});
stream.on('error', (error) => {
console.error(`๐ฅ File read error: ${error.message}`);
});
return stream;
};
// ๐ HTTP response as readable stream
import { IncomingMessage } from 'http';
const processHttpStream = (response: IncomingMessage): void => {
console.log('๐ Processing HTTP response stream');
let totalBytes = 0;
response.on('data', (chunk: Buffer) => {
totalBytes += chunk.length;
console.log(`๐ฅ Received chunk: ${chunk.length} bytes (total: ${totalBytes})`);
});
response.on('end', () => {
console.log(`๐ HTTP stream complete: ${totalBytes} total bytes`);
});
response.on('error', (error) => {
console.error(`๐ฅ HTTP stream error: ${error.message}`);
});
};
// ๐ Example usage
const numberStream = new NumberGenerator(5);
numberStream.on('data', (number) => {
console.log(`๐ข Received number: ${number}`);
});
numberStream.on('end', () => {
console.log('โ
Number stream ended');
});
๐ Transform Streams - Data Processing
Transform streams are where the magic happens:
// ๐ฏ Advanced transform stream patterns
// Process data as it flows through the pipeline
import { Transform, TransformCallback } from 'stream';
// ๐ JSON line processor with type safety
interface LogEntry {
timestamp: string;
level: 'info' | 'warn' | 'error';
message: string;
userId?: string;
requestId?: string;
}
class JSONLineProcessor extends Transform {
constructor() {
super({
writableObjectMode: false,
readableObjectMode: true,
highWaterMark: 1000 // Buffer up to 1000 objects
});
console.log('๐ง JSON line processor initialized');
}
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
const lines = chunk.toString().split('\n').filter(line => line.trim());
for (const line of lines) {
try {
const logEntry: LogEntry = JSON.parse(line);
// ๐ Enhanced log entry with metadata
const processedEntry = {
...logEntry,
processed: true,
processingTime: new Date().toISOString(),
severity: this.calculateSeverity(logEntry.level)
};
console.log(`โจ Processed log: ${logEntry.level} - ${logEntry.message.substring(0, 50)}...`);
this.push(processedEntry);
} catch (error) {
console.error(`๐ฅ Invalid JSON line: ${line.substring(0, 100)}...`);
// Continue processing other lines instead of failing
}
}
callback();
}
private calculateSeverity(level: LogEntry['level']): number {
const severityMap = { info: 1, warn: 2, error: 3 };
return severityMap[level] || 0;
}
}
// ๐งฎ Data aggregation transform
class DataAggregator extends Transform {
private stats = {
totalRecords: 0,
errorCount: 0,
warningCount: 0,
infoCount: 0,
userActions: new Map<string, number>()
};
constructor() {
super({ objectMode: true });
console.log('๐ Data aggregator initialized');
}
_transform(chunk: LogEntry & { processed: boolean }, encoding: BufferEncoding, callback: TransformCallback): void {
this.stats.totalRecords++;
// ๐ Update statistics
switch (chunk.level) {
case 'error':
this.stats.errorCount++;
break;
case 'warn':
this.stats.warningCount++;
break;
case 'info':
this.stats.infoCount++;
break;
}
// ๐ค Track user activity
if (chunk.userId) {
const current = this.stats.userActions.get(chunk.userId) || 0;
this.stats.userActions.set(chunk.userId, current + 1);
}
// ๐ Pass through the data
this.push(chunk);
// ๐ Periodic stats reporting
if (this.stats.totalRecords % 1000 === 0) {
console.log(`๐ Processed ${this.stats.totalRecords} records`);
console.log(` Errors: ${this.stats.errorCount}, Warnings: ${this.stats.warningCount}, Info: ${this.stats.infoCount}`);
console.log(` Active users: ${this.stats.userActions.size}`);
}
callback();
}
_flush(callback: TransformCallback): void {
// ๐ Final statistics report
const report = {
summary: {
totalRecords: this.stats.totalRecords,
errorRate: (this.stats.errorCount / this.stats.totalRecords * 100).toFixed(2) + '%',
topUsers: Array.from(this.stats.userActions.entries())
.sort((a, b) => b[1] - a[1])
.slice(0, 5)
},
timestamp: new Date().toISOString()
};
console.log('๐ Final Report:', JSON.stringify(report, null, 2));
this.push(report);
callback();
}
}
// ๐จ CSV to JSON transform with validation
class CSVTransform extends Transform {
private headers: string[] = [];
private isFirstRow = true;
constructor() {
super({
writableObjectMode: false,
readableObjectMode: true
});
console.log('๐ CSV transformer initialized');
}
_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
const lines = chunk.toString().split('\n').filter(line => line.trim());
for (const line of lines) {
if (this.isFirstRow) {
this.headers = line.split(',').map(h => h.trim());
this.isFirstRow = false;
console.log(`๐ CSV headers: ${this.headers.join(', ')}`);
continue;
}
const values = line.split(',').map(v => v.trim());
if (values.length === this.headers.length) {
const record: Record<string, string> = {};
this.headers.forEach((header, index) => {
record[header] = values[index];
});
console.log(`โ
CSV record processed: ${record[this.headers[0]]}`);
this.push(record);
} else {
console.error(`โ Invalid CSV row: expected ${this.headers.length} columns, got ${values.length}`);
}
}
callback();
}
}
๐ Advanced Stream Patterns
๐ Pipeline Operations and Error Handling
Letโs explore sophisticated stream patterns:
// ๐ฏ Advanced pipeline patterns with comprehensive error handling
// Building production-ready streaming applications
import { pipeline, Readable, Transform, Writable } from 'stream';
import { promisify } from 'util';
import { createGzip, createGunzip } from 'zlib';
const pipelineAsync = promisify(pipeline);
// ๐ญ Production data processing pipeline
class DataProcessingPipeline {
private processedCount = 0;
private errorCount = 0;
private startTime = Date.now();
async processLargeDataset(
inputPath: string,
outputPath: string,
options: ProcessingOptions = {}
): Promise<ProcessingResult> {
console.log('๐ญ Starting data processing pipeline');
const source = this.createSource(inputPath);
const validator = this.createValidator();
const transformer = this.createTransformer(options);
const aggregator = this.createAggregator();
const compressor = createGzip();
const destination = this.createDestination(outputPath);
try {
await pipelineAsync(
source,
validator,
transformer,
aggregator,
compressor,
destination
);
return this.generateReport();
} catch (error) {
console.error('๐ฅ Pipeline failed:', error);
throw new PipelineError('Data processing failed', error as Error);
}
}
private createSource(inputPath: string): Readable {
console.log(`๐ Creating data source: ${inputPath}`);
const stream = createReadStream(inputPath, {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks for optimal performance
});
stream.on('error', (error) => {
console.error(`๐ฅ Source error: ${error.message}`);
});
return stream;
}
private createValidator(): Transform {
return new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
try {
// ๐ Data validation logic
const data = typeof chunk === 'string' ? JSON.parse(chunk) : chunk;
if (this.isValidRecord(data)) {
this.processedCount++;
callback(null, data);
} else {
this.errorCount++;
console.warn(`โ ๏ธ Invalid record skipped: ${JSON.stringify(data).substring(0, 100)}`);
callback(); // Skip invalid records
}
} catch (error) {
this.errorCount++;
console.error(`๐ฅ Validation error: ${error}`);
callback(); // Skip malformed records
}
}
});
}
private createTransformer(options: ProcessingOptions): Transform {
return new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
try {
// ๐ Data transformation with business logic
const transformed = {
...chunk,
processed: true,
processedAt: new Date().toISOString(),
pipeline: 'v2.0',
metadata: {
processingNode: process.env.NODE_ID || 'unknown',
batchId: options.batchId || 'default'
}
};
// ๐ฏ Apply custom transformations
if (options.enrichment) {
transformed.enriched = this.enrichData(chunk);
}
if (options.normalization) {
this.normalizeData(transformed);
}
callback(null, transformed);
} catch (error) {
console.error(`๐ฅ Transformation error: ${error}`);
callback(error);
}
}
});
}
private createAggregator(): Transform {
const stats = new Map<string, number>();
return new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
// ๐ Real-time aggregation
const category = chunk.category || 'unknown';
stats.set(category, (stats.get(category) || 0) + 1);
// ๐ Periodic reporting
if (this.processedCount % 10000 === 0) {
console.log(`๐ Progress: ${this.processedCount} records processed`);
console.log('๐ Category distribution:', Object.fromEntries(stats));
}
callback(null, JSON.stringify(chunk) + '\n');
}
});
}
private createDestination(outputPath: string): Writable {
console.log(`๐พ Creating destination: ${outputPath}`);
const stream = createWriteStream(outputPath);
stream.on('error', (error) => {
console.error(`๐ฅ Destination error: ${error.message}`);
});
stream.on('finish', () => {
console.log(`โ
Data written successfully: ${outputPath}`);
});
return stream;
}
private isValidRecord(data: any): boolean {
return data &&
typeof data === 'object' &&
data.id &&
data.timestamp &&
new Date(data.timestamp).getTime() > 0;
}
private enrichData(data: any): EnrichmentData {
return {
geoLocation: this.getGeoLocation(data.ip),
userAgent: this.parseUserAgent(data.userAgent),
sessionData: this.getSessionData(data.sessionId)
};
}
private normalizeData(data: any): void {
if (data.email) data.email = data.email.toLowerCase();
if (data.phone) data.phone = data.phone.replace(/\D/g, '');
if (data.name) data.name = this.titleCase(data.name);
}
private generateReport(): ProcessingResult {
const duration = Date.now() - this.startTime;
return {
processed: this.processedCount,
errors: this.errorCount,
duration,
throughput: Math.round(this.processedCount / (duration / 1000)),
successRate: ((this.processedCount / (this.processedCount + this.errorCount)) * 100).toFixed(2) + '%'
};
}
private getGeoLocation(ip: string): any { return { country: 'US', city: 'Unknown' }; }
private parseUserAgent(ua: string): any { return { browser: 'Unknown', os: 'Unknown' }; }
private getSessionData(sessionId: string): any { return { duration: 0, pageViews: 0 }; }
private titleCase(str: string): string { return str.replace(/\w\S*/g, txt => txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase()); }
}
// ๐ง Supporting types and interfaces
interface ProcessingOptions {
batchId?: string;
enrichment?: boolean;
normalization?: boolean;
compression?: boolean;
}
interface ProcessingResult {
processed: number;
errors: number;
duration: number;
throughput: number;
successRate: string;
}
interface EnrichmentData {
geoLocation: any;
userAgent: any;
sessionData: any;
}
class PipelineError extends Error {
constructor(message: string, public cause: Error) {
super(message);
this.name = 'PipelineError';
}
}
๐ Real-Time Streaming Applications
Letโs build complete streaming applications:
// ๐ฏ Real-time streaming server with WebSocket integration
// Complete production-ready streaming architecture
import { Readable, Transform, Writable } from 'stream';
import { WebSocketServer, WebSocket } from 'ws';
import { EventEmitter } from 'events';
// ๐ก Real-time data streaming server
class StreamingServer extends EventEmitter {
private wss: WebSocketServer;
private activeStreams = new Map<string, ActiveStream>();
private metrics = new ServerMetrics();
constructor(port: number) {
super();
this.wss = new WebSocketServer({ port });
console.log(`๐ Streaming server started on port ${port}`);
this.setupWebSocketHandlers();
this.startMetricsReporting();
}
private setupWebSocketHandlers(): void {
this.wss.on('connection', (ws: WebSocket, request) => {
const streamId = this.generateStreamId();
console.log(`๐ New client connected: ${streamId}`);
// ๐ฏ Create dedicated stream for this client
const clientStream = this.createClientStream(streamId, ws);
this.activeStreams.set(streamId, clientStream);
ws.on('message', (data) => {
this.handleClientMessage(streamId, data);
});
ws.on('close', () => {
console.log(`๐ Client disconnected: ${streamId}`);
this.cleanupClientStream(streamId);
});
ws.on('error', (error) => {
console.error(`๐ฅ WebSocket error for ${streamId}: ${error.message}`);
this.cleanupClientStream(streamId);
});
});
}
private createClientStream(streamId: string, ws: WebSocket): ActiveStream {
// ๐ Real-time data source (simulated)
const dataSource = this.createRealTimeDataSource();
// ๐ Transform stream for client-specific processing
const transformer = new Transform({
objectMode: true,
transform: (chunk, encoding, callback) => {
try {
const processed = {
...chunk,
streamId,
clientTimestamp: Date.now(),
sequence: this.metrics.getTotalMessages()
};
callback(null, processed);
} catch (error) {
console.error(`๐ฅ Transform error for ${streamId}: ${error}`);
callback(error);
}
}
});
// ๐ค WebSocket sink
const webSocketSink = new Writable({
objectMode: true,
write: (chunk, encoding, callback) => {
try {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(chunk));
this.metrics.incrementMessagesSent();
}
callback();
} catch (error) {
console.error(`๐ฅ WebSocket send error: ${error}`);
callback(error);
}
}
});
// ๐ Connect the pipeline
dataSource.pipe(transformer).pipe(webSocketSink);
return {
id: streamId,
source: dataSource,
transformer,
sink: webSocketSink,
ws,
startTime: Date.now()
};
}
private createRealTimeDataSource(): Readable {
let messageCount = 0;
const stream = new Readable({
objectMode: true,
read() {} // Will be pushed to externally
});
// ๐ฒ Simulate real-time data
const interval = setInterval(() => {
const data = this.generateSampleData(++messageCount);
stream.push(data);
// ๐ Update metrics
this.metrics.incrementMessagesGenerated();
}, 100); // 10 messages per second
stream.on('close', () => {
clearInterval(interval);
});
return stream;
}
private generateSampleData(sequence: number): StreamingData {
const types: DataType[] = ['user_action', 'system_event', 'performance_metric'];
const type = types[Math.floor(Math.random() * types.length)];
const baseData: StreamingData = {
id: `msg_${sequence}_${Date.now()}`,
type,
timestamp: new Date().toISOString(),
sequence,
payload: {}
};
// ๐ฏ Type-specific data generation
switch (type) {
case 'user_action':
baseData.payload = {
userId: `user_${Math.floor(Math.random() * 1000)}`,
action: ['click', 'scroll', 'type', 'navigate'][Math.floor(Math.random() * 4)],
page: `/page/${Math.floor(Math.random() * 10)}`,
duration: Math.floor(Math.random() * 5000)
};
break;
case 'system_event':
baseData.payload = {
service: ['api', 'database', 'cache', 'queue'][Math.floor(Math.random() * 4)],
level: ['info', 'warn', 'error'][Math.floor(Math.random() * 3)],
message: `System event ${sequence}`,
metadata: { server: `server_${Math.floor(Math.random() * 5)}` }
};
break;
case 'performance_metric':
baseData.payload = {
metric: ['cpu', 'memory', 'disk', 'network'][Math.floor(Math.random() * 4)],
value: Math.random() * 100,
unit: '%',
threshold: 80,
status: Math.random() > 0.8 ? 'alert' : 'normal'
};
break;
}
return baseData;
}
private handleClientMessage(streamId: string, data: Buffer): void {
try {
const message = JSON.parse(data.toString());
console.log(`๐จ Received from ${streamId}: ${message.type}`);
// ๐ฏ Handle different message types
switch (message.type) {
case 'subscribe':
this.handleSubscription(streamId, message.payload);
break;
case 'filter':
this.handleFilterUpdate(streamId, message.payload);
break;
case 'pause':
this.handleStreamPause(streamId);
break;
case 'resume':
this.handleStreamResume(streamId);
break;
}
} catch (error) {
console.error(`๐ฅ Message parsing error: ${error}`);
}
}
private handleSubscription(streamId: string, payload: any): void {
console.log(`๐ก Client ${streamId} subscribed to: ${payload.topic}`);
// Implementation for topic-based subscriptions
}
private handleFilterUpdate(streamId: string, payload: any): void {
console.log(`๐ Filter updated for ${streamId}: ${JSON.stringify(payload)}`);
// Implementation for dynamic filtering
}
private handleStreamPause(streamId: string): void {
const stream = this.activeStreams.get(streamId);
if (stream) {
stream.source.pause();
console.log(`โธ๏ธ Stream paused: ${streamId}`);
}
}
private handleStreamResume(streamId: string): void {
const stream = this.activeStreams.get(streamId);
if (stream) {
stream.source.resume();
console.log(`โถ๏ธ Stream resumed: ${streamId}`);
}
}
private cleanupClientStream(streamId: string): void {
const stream = this.activeStreams.get(streamId);
if (stream) {
stream.source.destroy();
stream.transformer.destroy();
stream.sink.destroy();
this.activeStreams.delete(streamId);
this.metrics.decrementActiveConnections();
}
}
private generateStreamId(): string {
return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private startMetricsReporting(): void {
setInterval(() => {
const report = this.metrics.generateReport();
console.log('๐ Server Metrics:', JSON.stringify(report, null, 2));
// ๐ก Broadcast metrics to all connected clients
this.broadcastToAll({
type: 'server_metrics',
data: report,
timestamp: new Date().toISOString()
});
}, 10000); // Every 10 seconds
}
private broadcastToAll(message: any): void {
this.activeStreams.forEach((stream) => {
if (stream.ws.readyState === WebSocket.OPEN) {
stream.ws.send(JSON.stringify(message));
}
});
}
}
// ๐ง Supporting types and classes
interface ActiveStream {
id: string;
source: Readable;
transformer: Transform;
sink: Writable;
ws: WebSocket;
startTime: number;
}
interface StreamingData {
id: string;
type: DataType;
timestamp: string;
sequence: number;
payload: any;
}
type DataType = 'user_action' | 'system_event' | 'performance_metric';
class ServerMetrics {
private messagesGenerated = 0;
private messagesSent = 0;
private activeConnections = 0;
private startTime = Date.now();
incrementMessagesGenerated(): void { this.messagesGenerated++; }
incrementMessagesSent(): void { this.messagesSent++; }
getTotalMessages(): number { return this.messagesGenerated; }
decrementActiveConnections(): void { this.activeConnections--; }
generateReport(): MetricsReport {
const uptime = Date.now() - this.startTime;
return {
uptime: Math.floor(uptime / 1000),
activeConnections: this.activeConnections,
totalMessagesGenerated: this.messagesGenerated,
totalMessagesSent: this.messagesSent,
averageMessagesPerSecond: Math.round(this.messagesGenerated / (uptime / 1000)),
memoryUsage: process.memoryUsage(),
timestamp: new Date().toISOString()
};
}
}
interface MetricsReport {
uptime: number;
activeConnections: number;
totalMessagesGenerated: number;
totalMessagesSent: number;
averageMessagesPerSecond: number;
memoryUsage: NodeJS.MemoryUsage;
timestamp: string;
}
// ๐ Usage example
const server = new StreamingServer(8080);
console.log('๐ Node.js Streaming Server Ready!');
console.log('Connect with WebSocket clients to ws://localhost:8080');
๐ก๏ธ Error Handling and Performance Optimization
๐ง Production-Ready Stream Management
Hereโs how to handle errors and optimize performance:
// ๐ฏ Production stream management with comprehensive error handling
// Enterprise-grade streaming patterns
import { Readable, Transform, Writable, pipeline } from 'stream';
import { promisify } from 'util';
import { createBrotliCompress, createBrotliDecompress } from 'zlib';
const pipelineAsync = promisify(pipeline);
// ๐ก๏ธ Resilient stream manager
class StreamManager {
private retryAttempts = 3;
private backoffDelay = 1000;
private concurrencyLimit = 10;
private activeStreams = new Set<string>();
async processWithRetry<T>(
streamFactory: () => Promise<T>,
options: RetryOptions = {}
): Promise<T> {
const maxAttempts = options.maxAttempts || this.retryAttempts;
const delay = options.backoffDelay || this.backoffDelay;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
console.log(`๐ Processing attempt ${attempt}/${maxAttempts}`);
return await streamFactory();
} catch (error) {
console.error(`๐ฅ Attempt ${attempt} failed: ${error}`);
if (attempt === maxAttempts) {
throw new StreamProcessingError(`Failed after ${maxAttempts} attempts`, error as Error);
}
// ๐ Exponential backoff
const backoffTime = delay * Math.pow(2, attempt - 1);
console.log(`โณ Waiting ${backoffTime}ms before retry...`);
await new Promise(resolve => setTimeout(resolve, backoffTime));
}
}
throw new Error('Unexpected retry loop exit');
}
async createRobustPipeline(config: PipelineConfig): Promise<ProcessingStats> {
const streamId = this.generateStreamId();
this.activeStreams.add(streamId);
try {
console.log(`๐ Starting robust pipeline: ${streamId}`);
// ๐ Performance monitoring
const stats = new ProcessingStats();
// ๐ง Create pipeline components with error handling
const source = this.createMonitoredSource(config.input, stats);
const transforms = config.transforms.map(t => this.wrapWithMonitoring(t, stats));
const destination = this.createMonitoredDestination(config.output, stats);
// ๐ Build pipeline with error boundaries
const components = [source, ...transforms, destination];
await pipelineAsync(...components);
console.log(`โ
Pipeline completed successfully: ${streamId}`);
return stats;
} catch (error) {
console.error(`๐ฅ Pipeline failed: ${streamId}`, error);
throw error;
} finally {
this.activeStreams.delete(streamId);
}
}
private createMonitoredSource(input: InputConfig, stats: ProcessingStats): Readable {
let stream: Readable;
switch (input.type) {
case 'file':
stream = createReadStream(input.path, {
highWaterMark: input.bufferSize || 64 * 1024
});
break;
case 'http':
stream = this.createHttpSource(input.url);
break;
case 'generator':
stream = this.createGeneratorSource(input.generator);
break;
default:
throw new Error(`Unsupported input type: ${input.type}`);
}
// ๐ Add monitoring
stream.on('data', (chunk) => {
stats.addBytesRead(chunk.length);
});
stream.on('error', (error) => {
console.error(`๐ฅ Source error: ${error.message}`);
stats.addError(error);
});
stream.on('end', () => {
console.log('๐ Source stream ended');
stats.markSourceComplete();
});
return stream;
}
private wrapWithMonitoring(transform: Transform, stats: ProcessingStats): Transform {
const wrapper = new Transform({
objectMode: transform._readableState.objectMode,
transform: async (chunk, encoding, callback) => {
try {
const startTime = process.hrtime.bigint();
// ๐ Execute original transformation
transform._transform(chunk, encoding, (error, result) => {
const endTime = process.hrtime.bigint();
const duration = Number(endTime - startTime) / 1000000; // Convert to ms
if (error) {
stats.addError(error);
callback(error);
} else {
stats.addProcessingTime(duration);
stats.incrementProcessedCount();
callback(null, result);
}
});
} catch (error) {
console.error(`๐ฅ Transform error: ${error}`);
stats.addError(error as Error);
callback(error);
}
}
});
return wrapper;
}
private createMonitoredDestination(output: OutputConfig, stats: ProcessingStats): Writable {
let stream: Writable;
switch (output.type) {
case 'file':
stream = createWriteStream(output.path);
break;
case 'http':
stream = this.createHttpDestination(output.url);
break;
case 'memory':
stream = this.createMemoryDestination();
break;
default:
throw new Error(`Unsupported output type: ${output.type}`);
}
// ๐ Add monitoring
stream.on('write', (chunk) => {
stats.addBytesWritten(chunk.length);
});
stream.on('error', (error) => {
console.error(`๐ฅ Destination error: ${error.message}`);
stats.addError(error);
});
stream.on('finish', () => {
console.log('๐พ Destination stream finished');
stats.markDestinationComplete();
});
return stream;
}
// ๐ฏ Memory-efficient batch processor
async processBatch<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
batchSize: number = 100
): Promise<R[]> {
const results: R[] = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
console.log(`๐ฆ Processing batch ${Math.floor(i / batchSize) + 1}/${Math.ceil(items.length / batchSize)}`);
// ๐ Process batch with concurrency control
const batchPromises = batch.map(async (item, index) => {
try {
return await processor(item);
} catch (error) {
console.error(`๐ฅ Item ${i + index} failed: ${error}`);
throw error;
}
});
const batchResults = await Promise.allSettled(batchPromises);
// ๐ Collect successful results and log failures
batchResults.forEach((result, index) => {
if (result.status === 'fulfilled') {
results.push(result.value);
} else {
console.error(`๐ฅ Batch item ${i + index} failed: ${result.reason}`);
}
});
}
return results;
}
private generateStreamId(): string {
return `stream_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
private createHttpSource(url: string): Readable {
// Implementation for HTTP source
throw new Error('HTTP source not implemented');
}
private createGeneratorSource(generator: () => Generator<any>): Readable {
// Implementation for generator source
throw new Error('Generator source not implemented');
}
private createHttpDestination(url: string): Writable {
// Implementation for HTTP destination
throw new Error('HTTP destination not implemented');
}
private createMemoryDestination(): Writable {
const chunks: Buffer[] = [];
return new Writable({
write(chunk, encoding, callback) {
chunks.push(Buffer.from(chunk));
callback();
},
final(callback) {
console.log(`๐พ Memory destination complete: ${chunks.length} chunks`);
callback();
}
});
}
}
// ๐ Performance tracking
class ProcessingStats {
private startTime = Date.now();
private bytesRead = 0;
private bytesWritten = 0;
private processedCount = 0;
private errors: Error[] = [];
private processingTimes: number[] = [];
private sourceComplete = false;
private destinationComplete = false;
addBytesRead(bytes: number): void { this.bytesRead += bytes; }
addBytesWritten(bytes: number): void { this.bytesWritten += bytes; }
incrementProcessedCount(): void { this.processedCount++; }
addError(error: Error): void { this.errors.push(error); }
addProcessingTime(ms: number): void { this.processingTimes.push(ms); }
markSourceComplete(): void { this.sourceComplete = true; }
markDestinationComplete(): void { this.destinationComplete = true; }
generateReport(): StatsReport {
const duration = Date.now() - this.startTime;
const avgProcessingTime = this.processingTimes.length > 0
? this.processingTimes.reduce((a, b) => a + b, 0) / this.processingTimes.length
: 0;
return {
duration,
throughput: Math.round(this.processedCount / (duration / 1000)),
bytesRead: this.bytesRead,
bytesWritten: this.bytesWritten,
processedCount: this.processedCount,
errorCount: this.errors.length,
averageProcessingTime: Math.round(avgProcessingTime * 100) / 100,
memoryUsage: process.memoryUsage(),
complete: this.sourceComplete && this.destinationComplete
};
}
}
// ๐ง Supporting types
interface RetryOptions {
maxAttempts?: number;
backoffDelay?: number;
}
interface PipelineConfig {
input: InputConfig;
transforms: Transform[];
output: OutputConfig;
}
interface InputConfig {
type: 'file' | 'http' | 'generator';
path?: string;
url?: string;
generator?: () => Generator<any>;
bufferSize?: number;
}
interface OutputConfig {
type: 'file' | 'http' | 'memory';
path?: string;
url?: string;
}
interface StatsReport {
duration: number;
throughput: number;
bytesRead: number;
bytesWritten: number;
processedCount: number;
errorCount: number;
averageProcessingTime: number;
memoryUsage: NodeJS.MemoryUsage;
complete: boolean;
}
class StreamProcessingError extends Error {
constructor(message: string, public cause: Error) {
super(message);
this.name = 'StreamProcessingError';
}
}
// ๐ Usage example
const streamManager = new StreamManager();
// Example: Process large dataset with retry logic
streamManager.processWithRetry(async () => {
return await streamManager.createRobustPipeline({
input: { type: 'file', path: 'large-dataset.json' },
transforms: [
new JSONLineProcessor(),
new DataAggregator()
],
output: { type: 'file', path: 'processed-output.json' }
});
}).then(stats => {
console.log('๐ Processing complete!', stats.generateReport());
}).catch(error => {
console.error('๐ฅ Processing failed:', error);
});
๐ฏ Summary and Next Steps
Congratulations! ๐ Youโve mastered Node.js streams with TypeScript and learned how to build efficient, scalable data processing applications. You now have the skills to:
๐ What Youโve Accomplished
โ
Stream Fundamentals - Understanding readable, writable, and transform streams
โ
Type Safety - Using TypeScript for robust stream implementations
โ
Data Processing - Building efficient data transformation pipelines
โ
Real-Time Applications - Creating WebSocket-based streaming servers
โ
Error Handling - Implementing resilient stream management
โ
Performance Optimization - Memory-efficient processing patterns
๐ Key Takeaways
- Memory Efficiency ๐พ: Streams process data in chunks, not all at once
- Composability ๐: Chain transforms like building blocks
- Backpressure ๐ฐ: Automatic flow control prevents memory overflow
- Error Resilience ๐ก๏ธ: Proper error handling keeps streams running
- Type Safety ๐: TypeScript ensures correct stream operations
๐ฏ Next Steps
Ready to expand your streaming expertise? Consider exploring:
- Advanced Patterns ๐๏ธ: Custom duplex streams and more complex pipelines
- Microservices ๐: Stream-based inter-service communication
- Real-Time Analytics ๐: Building live dashboards and monitoring
- File Processing ๐: Large file handling and cloud storage integration
- Message Queues ๐ฎ: Streaming with RabbitMQ, Kafka, and Redis
Youโre now equipped to build production-grade streaming applications that handle massive datasets with grace and efficiency! ๐
Keep streaming, keep building! ๐โจ