Prerequisites
- Understanding of TypeScript interfaces and types ๐
- Basic knowledge of browser APIs โก
- Experience with async programming patterns ๐ป
What you'll learn
- Master Web Worker creation and communication patterns ๐ฏ
- Build type-safe message passing systems ๐๏ธ
- Implement parallel processing for performance optimization ๐
- Create robust worker error handling and lifecycle management โจ
๐ฏ Introduction
Welcome to the world of parallel processing in the browser! ๐งต Web Workers are your secret weapon for keeping the main UI thread responsive while performing heavy computations, data processing, or background tasks.
Think of Web Workers as dedicated assistants ๐ฅ who work in parallel - they can crunch numbers, process large datasets, or handle complex calculations without freezing your user interface. With TypeScript, we can make this communication type-safe and robust!
By the end of this tutorial, youโll be a master of multi-threaded browser programming, able to build lightning-fast applications that leverage parallel processing while maintaining perfect type safety. Letโs unlock the power of concurrency! โก
๐ Understanding Web Workers
๐ค What Are Web Workers?
Web Workers are background scripts that run in separate threads, completely isolated from the main UI thread. Theyโre like having multiple CPU cores for your web application - each worker can process data independently without blocking user interactions!
// ๐ Basic Web Worker creation
// main.ts - Main thread
const worker = new Worker('/path/to/worker.js');
// ๐ค Send data to worker
worker.postMessage({ type: 'PROCESS_DATA', data: [1, 2, 3, 4, 5] });
// ๐ฅ Receive results from worker
worker.onmessage = (event) => {
console.log('Result from worker:', event.data);
};
// worker.ts - Worker thread
self.onmessage = (event) => {
const { type, data } = event.data;
if (type === 'PROCESS_DATA') {
// ๐ Heavy computation that won't block UI
const result = data.map((num: number) => num * num);
self.postMessage({ type: 'RESULT', result });
}
};
๐ก Key Characteristics
- ๐ Parallel Execution: Run independently from the main thread
- ๐ซ No DOM Access: Workers canโt directly manipulate the DOM
- ๐ก Message-Based Communication: Data exchange via postMessage/onmessage
- ๐ก๏ธ Type Safety: TypeScript can enforce message type contracts
// ๐จ Type-safe worker communication
interface WorkerMessage<T = any> {
type: string;
payload: T;
id?: string;
}
interface MathOperation {
operation: 'add' | 'multiply' | 'fibonacci';
numbers: number[];
}
// ๐ฏ Type-safe message sending
function sendToWorker(worker: Worker, message: WorkerMessage<MathOperation>): void {
worker.postMessage(message);
}
๐ Web Workers vs Main Thread
// โ Blocking main thread - UI freezes
function heavyComputationOnMainThread(data: number[]): number[] {
const result: number[] = [];
// ๐ This blocks the UI for large datasets
for (let i = 0; i < data.length; i++) {
for (let j = 0; j < 1000000; j++) {
// Simulate heavy computation
Math.sqrt(data[i] * j);
}
result.push(data[i] * 2);
}
return result; // UI is frozen until this completes
}
// โ
Non-blocking with Web Worker - UI stays responsive
class ComputationWorker {
private worker: Worker;
constructor() {
this.worker = new Worker('/computation-worker.js');
this.setupMessageHandling();
}
async processData(data: number[]): Promise<number[]> {
return new Promise((resolve, reject) => {
const messageId = `task_${Date.now()}`;
// ๐ฏ Set up one-time listener for this specific task
const handleMessage = (event: MessageEvent) => {
if (event.data.id === messageId) {
this.worker.removeEventListener('message', handleMessage);
if (event.data.error) {
reject(new Error(event.data.error));
} else {
resolve(event.data.result);
}
}
};
this.worker.addEventListener('message', handleMessage);
// ๐ค Send task to worker
this.worker.postMessage({
type: 'HEAVY_COMPUTATION',
data,
id: messageId
});
});
}
private setupMessageHandling(): void {
this.worker.onerror = (error) => {
console.error('Worker error:', error);
};
}
}
๐ง Type-Safe Worker Communication
๐ Message Type System
// ๐๏ธ Comprehensive message type system
interface BaseMessage {
id: string;
timestamp: number;
}
// ๐ค Messages sent TO worker
type WorkerInput =
| ProcessDataMessage
| CancelTaskMessage
| ConfigUpdateMessage;
interface ProcessDataMessage extends BaseMessage {
type: 'PROCESS_DATA';
payload: {
data: number[];
algorithm: 'quicksort' | 'mergesort' | 'bubblesort';
options?: {
ascending?: boolean;
chunkSize?: number;
};
};
}
interface CancelTaskMessage extends BaseMessage {
type: 'CANCEL_TASK';
payload: {
taskId: string;
};
}
interface ConfigUpdateMessage extends BaseMessage {
type: 'UPDATE_CONFIG';
payload: {
maxMemoryUsage: number;
logLevel: 'debug' | 'info' | 'warn' | 'error';
};
}
// ๐ฅ Messages received FROM worker
type WorkerOutput =
| ProcessResultMessage
| ProgressUpdateMessage
| ErrorMessage
| StatusMessage;
interface ProcessResultMessage extends BaseMessage {
type: 'PROCESS_RESULT';
payload: {
result: number[];
processingTime: number;
memoryUsed: number;
};
}
interface ProgressUpdateMessage extends BaseMessage {
type: 'PROGRESS_UPDATE';
payload: {
taskId: string;
progress: number; // 0-100
currentStep: string;
};
}
interface ErrorMessage extends BaseMessage {
type: 'ERROR';
payload: {
error: string;
stack?: string;
recoverable: boolean;
};
}
interface StatusMessage extends BaseMessage {
type: 'STATUS';
payload: {
status: 'idle' | 'processing' | 'error';
activeTasks: number;
memoryUsage: number;
};
}
๐จ Worker Manager Class
// ๐๏ธ Type-safe Worker Manager
class TypeSafeWorkerManager {
private worker: Worker;
private pendingTasks = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
timeout?: NodeJS.Timeout;
}>();
private messageQueue: WorkerInput[] = [];
private isWorkerReady = false;
constructor(workerPath: string) {
this.worker = new Worker(workerPath);
this.setupWorker();
}
private setupWorker(): void {
// ๐ฅ Handle messages from worker
this.worker.onmessage = (event: MessageEvent<WorkerOutput>) => {
this.handleWorkerMessage(event.data);
};
// ๐จ Handle worker errors
this.worker.onerror = (error) => {
console.error('Worker error:', error);
this.rejectAllPendingTasks(new Error('Worker crashed'));
};
// ๐ Handle worker termination
this.worker.onmessageerror = (error) => {
console.error('Worker message error:', error);
};
// ๐ Worker is ready
this.isWorkerReady = true;
this.flushMessageQueue();
}
// ๐ค Send typed message to worker
private sendMessage(message: WorkerInput): void {
if (this.isWorkerReady) {
this.worker.postMessage(message);
} else {
this.messageQueue.push(message);
}
}
// ๐ Process queued messages when worker is ready
private flushMessageQueue(): void {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()!;
this.worker.postMessage(message);
}
}
// ๐ฅ Handle incoming worker messages
private handleWorkerMessage(message: WorkerOutput): void {
const pending = this.pendingTasks.get(message.id);
switch (message.type) {
case 'PROCESS_RESULT':
if (pending) {
clearTimeout(pending.timeout);
this.pendingTasks.delete(message.id);
pending.resolve(message.payload);
}
break;
case 'ERROR':
if (pending) {
clearTimeout(pending.timeout);
this.pendingTasks.delete(message.id);
pending.reject(new Error(message.payload.error));
}
break;
case 'PROGRESS_UPDATE':
// ๐ Emit progress event (could use EventEmitter)
console.log(`Progress: ${message.payload.progress}% - ${message.payload.currentStep}`);
break;
case 'STATUS':
// ๐ Update worker status
console.log(`Worker status: ${message.payload.status}, Active tasks: ${message.payload.activeTasks}`);
break;
}
}
// ๐ฏ Public API methods
async processData(
data: number[],
algorithm: 'quicksort' | 'mergesort' | 'bubblesort' = 'quicksort',
options?: { ascending?: boolean; chunkSize?: number }
): Promise<{ result: number[]; processingTime: number; memoryUsed: number }> {
const messageId = `process_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const message: ProcessDataMessage = {
id: messageId,
timestamp: Date.now(),
type: 'PROCESS_DATA',
payload: {
data,
algorithm,
options
}
};
return new Promise((resolve, reject) => {
// โฐ Set timeout for task
const timeout = setTimeout(() => {
this.pendingTasks.delete(messageId);
reject(new Error('Worker task timeout'));
}, 30000); // 30 second timeout
// ๐ Register task
this.pendingTasks.set(messageId, { resolve, reject, timeout });
// ๐ค Send to worker
this.sendMessage(message);
});
}
// ๐ Cancel specific task
async cancelTask(taskId: string): Promise<void> {
const message: CancelTaskMessage = {
id: `cancel_${Date.now()}`,
timestamp: Date.now(),
type: 'CANCEL_TASK',
payload: { taskId }
};
this.sendMessage(message);
// ๐ฎ Remove from pending tasks
const pending = this.pendingTasks.get(taskId);
if (pending) {
clearTimeout(pending.timeout);
this.pendingTasks.delete(taskId);
pending.reject(new Error('Task cancelled'));
}
}
// โ๏ธ Update worker configuration
async updateConfig(config: { maxMemoryUsage: number; logLevel: 'debug' | 'info' | 'warn' | 'error' }): Promise<void> {
const message: ConfigUpdateMessage = {
id: `config_${Date.now()}`,
timestamp: Date.now(),
type: 'UPDATE_CONFIG',
payload: config
};
this.sendMessage(message);
}
// ๐งน Cleanup
terminate(): void {
this.rejectAllPendingTasks(new Error('Worker terminated'));
this.worker.terminate();
}
private rejectAllPendingTasks(error: Error): void {
for (const [id, task] of this.pendingTasks) {
clearTimeout(task.timeout);
task.reject(error);
}
this.pendingTasks.clear();
}
}
๐ก Real-World Applications
๐ฎ Example 1: Image Processing Worker
// ๐ผ๏ธ Image processing with Web Workers
interface ImageData {
width: number;
height: number;
data: Uint8ClampedArray;
}
interface ImageProcessingTask {
id: string;
type: 'blur' | 'sharpen' | 'brighten' | 'contrast';
imageData: ImageData;
params: {
intensity?: number;
radius?: number;
threshold?: number;
};
}
class ImageProcessingWorker {
private worker: Worker;
private activeTasks = new Map<string, Promise<ImageData>>();
constructor() {
this.worker = new Worker('/image-processing-worker.js');
this.setupWorker();
}
async processImage(
imageData: ImageData,
filter: 'blur' | 'sharpen' | 'brighten' | 'contrast',
params: { intensity?: number; radius?: number; threshold?: number } = {}
): Promise<ImageData> {
const taskId = `img_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const task: ImageProcessingTask = {
id: taskId,
type: filter,
imageData,
params
};
// ๐ Create and track the processing promise
const processingPromise = new Promise<ImageData>((resolve, reject) => {
const handleMessage = (event: MessageEvent) => {
const { id, type, result, error } = event.data;
if (id === taskId) {
this.worker.removeEventListener('message', handleMessage);
this.activeTasks.delete(taskId);
if (error) {
reject(new Error(error));
} else if (type === 'IMAGE_PROCESSED') {
resolve(result);
}
}
};
this.worker.addEventListener('message', handleMessage);
// โฐ Set up timeout
setTimeout(() => {
this.worker.removeEventListener('message', handleMessage);
this.activeTasks.delete(taskId);
reject(new Error('Image processing timeout'));
}, 60000); // 1 minute timeout for image processing
// ๐ค Send task to worker
this.worker.postMessage({
type: 'PROCESS_IMAGE',
task
});
});
this.activeTasks.set(taskId, processingPromise);
return processingPromise;
}
// ๐ Batch process multiple images
async processBatch(
images: Array<{
imageData: ImageData;
filter: 'blur' | 'sharpen' | 'brighten' | 'contrast';
params?: { intensity?: number; radius?: number; threshold?: number };
}>
): Promise<ImageData[]> {
console.log(`๐ผ๏ธ Processing batch of ${images.length} images...`);
// ๐ Process all images in parallel
const processingPromises = images.map(({ imageData, filter, params = {} }) =>
this.processImage(imageData, filter, params)
);
try {
const results = await Promise.all(processingPromises);
console.log(`โ
Batch processing complete! Processed ${results.length} images`);
return results;
} catch (error) {
console.error('โ Batch processing failed:', error);
throw error;
}
}
// ๐ Get processing status
getStatus(): {
activeTasks: number;
taskIds: string[];
} {
return {
activeTasks: this.activeTasks.size,
taskIds: Array.from(this.activeTasks.keys())
};
}
private setupWorker(): void {
this.worker.onerror = (error) => {
console.error('๐จ Image processing worker error:', error);
};
}
terminate(): void {
// ๐ Cancel all active tasks
for (const [taskId, promise] of this.activeTasks) {
console.log(`โน๏ธ Cancelling image processing task: ${taskId}`);
}
this.worker.terminate();
this.activeTasks.clear();
}
}
// ๐ฎ Usage example
async function demonstrateImageProcessing(): Promise<void> {
const imageWorker = new ImageProcessingWorker();
// ๐ผ๏ธ Mock image data (in real app, get from canvas or img element)
const mockImageData: ImageData = {
width: 800,
height: 600,
data: new Uint8ClampedArray(800 * 600 * 4) // RGBA
};
try {
// ๐ Single image processing
console.log('๐ผ๏ธ Processing single image...');
const blurredImage = await imageWorker.processImage(
mockImageData,
'blur',
{ radius: 5, intensity: 0.8 }
);
console.log('โ
Blur effect applied!', blurredImage);
// ๐ฆ Batch processing
console.log('๐ฆ Starting batch processing...');
const batchImages = [
{ imageData: mockImageData, filter: 'blur' as const, params: { radius: 3 } },
{ imageData: mockImageData, filter: 'sharpen' as const, params: { intensity: 1.2 } },
{ imageData: mockImageData, filter: 'brighten' as const, params: { intensity: 0.3 } }
];
const processedBatch = await imageWorker.processBatch(batchImages);
console.log(`โ
Batch complete! Processed ${processedBatch.length} images`);
// ๐ Check status
const status = imageWorker.getStatus();
console.log('๐ Worker status:', status);
} catch (error) {
console.error('โ Image processing failed:', error);
} finally {
imageWorker.terminate();
}
}
๐ Example 2: Data Analysis Worker
// ๐ Advanced data analysis with Web Workers
interface DataPoint {
id: string;
timestamp: number;
values: Record<string, number>;
metadata?: Record<string, any>;
}
interface AnalysisRequest {
id: string;
data: DataPoint[];
analysis: {
type: 'correlation' | 'regression' | 'clustering' | 'forecast';
parameters: Record<string, any>;
options?: {
sampleSize?: number;
confidenceLevel?: number;
iterations?: number;
};
};
}
interface AnalysisResult {
id: string;
type: string;
result: {
statistics: Record<string, number>;
patterns: Array<{
type: string;
confidence: number;
description: string;
}>;
visualizationData?: Record<string, any>;
};
metadata: {
processingTime: number;
dataPoints: number;
accuracy: number;
};
}
class DataAnalysisWorker {
private worker: Worker;
private analysisQueue = new Map<string, {
resolve: (result: AnalysisResult) => void;
reject: (error: Error) => void;
startTime: number;
}>();
constructor() {
this.worker = new Worker('/data-analysis-worker.js');
this.setupWorker();
}
// ๐ Perform correlation analysis
async performCorrelationAnalysis(
data: DataPoint[],
variables: string[],
options: { method?: 'pearson' | 'spearman'; threshold?: number } = {}
): Promise<AnalysisResult> {
const analysisId = `correlation_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const request: AnalysisRequest = {
id: analysisId,
data,
analysis: {
type: 'correlation',
parameters: {
variables,
method: options.method || 'pearson',
threshold: options.threshold || 0.1
}
}
};
return this.executeAnalysis(request);
}
// ๐ฎ Perform forecasting analysis
async performForecast(
data: DataPoint[],
targetVariable: string,
options: {
periods?: number;
seasonality?: 'auto' | 'weekly' | 'monthly' | 'yearly';
confidenceInterval?: number;
} = {}
): Promise<AnalysisResult> {
const analysisId = `forecast_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const request: AnalysisRequest = {
id: analysisId,
data,
analysis: {
type: 'forecast',
parameters: {
targetVariable,
periods: options.periods || 12,
seasonality: options.seasonality || 'auto',
confidenceInterval: options.confidenceInterval || 0.95
}
}
};
return this.executeAnalysis(request);
}
// ๐ฏ Perform clustering analysis
async performClustering(
data: DataPoint[],
features: string[],
options: {
algorithm?: 'kmeans' | 'dbscan' | 'hierarchical';
clusters?: number;
distance?: 'euclidean' | 'manhattan' | 'cosine';
} = {}
): Promise<AnalysisResult> {
const analysisId = `clustering_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const request: AnalysisRequest = {
id: analysisId,
data,
analysis: {
type: 'clustering',
parameters: {
features,
algorithm: options.algorithm || 'kmeans',
clusters: options.clusters || 3,
distance: options.distance || 'euclidean'
}
}
};
return this.executeAnalysis(request);
}
// โ๏ธ Execute analysis request
private async executeAnalysis(request: AnalysisRequest): Promise<AnalysisResult> {
return new Promise((resolve, reject) => {
const startTime = Date.now();
// ๐ Register analysis request
this.analysisQueue.set(request.id, {
resolve,
reject,
startTime
});
// โฐ Set timeout (longer for complex analysis)
setTimeout(() => {
if (this.analysisQueue.has(request.id)) {
this.analysisQueue.delete(request.id);
reject(new Error(`Analysis timeout: ${request.analysis.type}`));
}
}, 120000); // 2 minutes timeout
// ๐ค Send to worker
this.worker.postMessage({
type: 'ANALYZE_DATA',
request
});
console.log(`๐ Started ${request.analysis.type} analysis with ${request.data.length} data points`);
});
}
private setupWorker(): void {
this.worker.onmessage = (event: MessageEvent) => {
const { type, result, error, id } = event.data;
if (type === 'ANALYSIS_COMPLETE') {
const pending = this.analysisQueue.get(id);
if (pending) {
this.analysisQueue.delete(id);
if (error) {
pending.reject(new Error(error));
} else {
// ๐ Add processing time to result
result.metadata.processingTime = Date.now() - pending.startTime;
pending.resolve(result);
}
}
} else if (type === 'ANALYSIS_PROGRESS') {
// ๐ Handle progress updates
console.log(`๐ Analysis ${id} progress: ${event.data.progress}%`);
}
};
this.worker.onerror = (error) => {
console.error('๐จ Data analysis worker error:', error);
// ๐ซ Reject all pending analyses
for (const [id, pending] of this.analysisQueue) {
pending.reject(new Error('Worker crashed during analysis'));
}
this.analysisQueue.clear();
};
}
// ๐ Get analysis queue status
getQueueStatus(): {
pending: number;
analyses: Array<{ id: string; type: string; duration: number }>;
} {
const now = Date.now();
return {
pending: this.analysisQueue.size,
analyses: Array.from(this.analysisQueue.entries()).map(([id, data]) => ({
id,
type: id.split('_')[0],
duration: now - data.startTime
}))
};
}
terminate(): void {
// ๐ Cancel all pending analyses
for (const [id, pending] of this.analysisQueue) {
pending.reject(new Error('Analysis cancelled - worker terminated'));
}
this.worker.terminate();
this.analysisQueue.clear();
}
}
// ๐ฎ Demo data analysis
async function demonstrateDataAnalysis(): Promise<void> {
const analysisWorker = new DataAnalysisWorker();
// ๐ Generate mock data
const mockData: DataPoint[] = Array.from({ length: 1000 }, (_, i) => ({
id: `point_${i}`,
timestamp: Date.now() + i * 1000,
values: {
temperature: 20 + Math.sin(i * 0.1) * 10 + Math.random() * 5,
humidity: 50 + Math.cos(i * 0.05) * 20 + Math.random() * 10,
pressure: 1013 + Math.sin(i * 0.02) * 15 + Math.random() * 8
},
metadata: { sensor: `sensor_${i % 10}` }
}));
try {
console.log('๐ Starting data analysis demonstrations...');
// ๐ Correlation analysis
console.log('๐ Performing correlation analysis...');
const correlationResult = await analysisWorker.performCorrelationAnalysis(
mockData,
['temperature', 'humidity', 'pressure'],
{ method: 'pearson', threshold: 0.1 }
);
console.log('โ
Correlation analysis complete:', correlationResult.result.statistics);
// ๐ฎ Forecasting
console.log('๐ฎ Performing forecast analysis...');
const forecastResult = await analysisWorker.performForecast(
mockData,
'temperature',
{ periods: 24, seasonality: 'auto', confidenceInterval: 0.95 }
);
console.log('โ
Forecast analysis complete:', forecastResult.result.patterns);
// ๐ฏ Clustering
console.log('๐ฏ Performing clustering analysis...');
const clusteringResult = await analysisWorker.performClustering(
mockData.slice(0, 200), // Smaller dataset for clustering
['temperature', 'humidity'],
{ algorithm: 'kmeans', clusters: 3 }
);
console.log('โ
Clustering analysis complete:', clusteringResult.result.patterns);
// ๐ Check queue status
const status = analysisWorker.getQueueStatus();
console.log('๐ Final queue status:', status);
} catch (error) {
console.error('โ Data analysis failed:', error);
} finally {
analysisWorker.terminate();
}
}
๐ Advanced Worker Patterns
๐งโโ๏ธ Worker Pool Management
// ๐โโ๏ธ Worker pool for managing multiple workers
class WorkerPool<TInput = any, TOutput = any> {
private workers: Worker[] = [];
private availableWorkers: Worker[] = [];
private taskQueue: Array<{
input: TInput;
resolve: (output: TOutput) => void;
reject: (error: Error) => void;
priority: number;
}> = [];
private workerTasks = new Map<Worker, {
taskId: string;
startTime: number;
resolve: (output: TOutput) => void;
reject: (error: Error) => void;
}>();
constructor(
private workerScriptPath: string,
private poolSize: number = navigator.hardwareConcurrency || 4
) {
this.initializePool();
}
private initializePool(): void {
console.log(`๐โโ๏ธ Initializing worker pool with ${this.poolSize} workers`);
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScriptPath);
// ๐ฅ Handle worker messages
worker.onmessage = (event: MessageEvent) => {
this.handleWorkerResult(worker, event.data);
};
// ๐จ Handle worker errors
worker.onerror = (error) => {
this.handleWorkerError(worker, error);
};
this.workers.push(worker);
this.availableWorkers.push(worker);
}
}
// ๐ฏ Execute task with priority
async execute(
input: TInput,
priority: number = 0
): Promise<TOutput> {
return new Promise<TOutput>((resolve, reject) => {
const task = { input, resolve, reject, priority };
// ๐ If worker available, assign immediately
if (this.availableWorkers.length > 0) {
this.assignTaskToWorker(task);
} else {
// ๐ Add to queue with priority sorting
this.taskQueue.push(task);
this.taskQueue.sort((a, b) => b.priority - a.priority); // Higher priority first
}
});
}
private assignTaskToWorker(task: {
input: TInput;
resolve: (output: TOutput) => void;
reject: (error: Error) => void;
priority: number;
}): void {
const worker = this.availableWorkers.pop()!;
const taskId = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// ๐ Track worker task
this.workerTasks.set(worker, {
taskId,
startTime: Date.now(),
resolve: task.resolve,
reject: task.reject
});
// ๐ค Send task to worker
worker.postMessage({
taskId,
input: task.input
});
}
private handleWorkerResult(worker: Worker, data: any): void {
const task = this.workerTasks.get(worker);
if (task) {
// ๐ Calculate processing time
const processingTime = Date.now() - task.startTime;
console.log(`โก Task ${task.taskId} completed in ${processingTime}ms`);
// โ
Resolve the task
task.resolve(data.result || data);
// ๐งน Clean up and make worker available
this.workerTasks.delete(worker);
this.availableWorkers.push(worker);
// ๐ Assign next queued task if any
if (this.taskQueue.length > 0) {
const nextTask = this.taskQueue.shift()!;
this.assignTaskToWorker(nextTask);
}
}
}
private handleWorkerError(worker: Worker, error: ErrorEvent): void {
console.error('๐จ Worker error:', error);
const task = this.workerTasks.get(worker);
if (task) {
task.reject(new Error(`Worker error: ${error.message}`));
this.workerTasks.delete(worker);
}
// ๐ Replace failed worker
this.replaceWorker(worker);
}
private replaceWorker(failedWorker: Worker): void {
console.log('๐ Replacing failed worker...');
// ๐๏ธ Remove failed worker
const workerIndex = this.workers.indexOf(failedWorker);
if (workerIndex !== -1) {
this.workers.splice(workerIndex, 1);
}
const availableIndex = this.availableWorkers.indexOf(failedWorker);
if (availableIndex !== -1) {
this.availableWorkers.splice(availableIndex, 1);
}
failedWorker.terminate();
// ๐ Create replacement worker
const newWorker = new Worker(this.workerScriptPath);
newWorker.onmessage = (event) => this.handleWorkerResult(newWorker, event.data);
newWorker.onerror = (error) => this.handleWorkerError(newWorker, error);
this.workers.push(newWorker);
this.availableWorkers.push(newWorker);
}
// ๐ Get pool statistics
getStats(): {
totalWorkers: number;
availableWorkers: number;
activeTasks: number;
queuedTasks: number;
averageTaskTime: number;
} {
return {
totalWorkers: this.workers.length,
availableWorkers: this.availableWorkers.length,
activeTasks: this.workerTasks.size,
queuedTasks: this.taskQueue.length,
averageTaskTime: 0 // Could calculate from historical data
};
}
// ๐ Terminate all workers
terminate(): void {
console.log('๐ Terminating worker pool...');
// ๐ซ Reject all pending tasks
for (const task of this.taskQueue) {
task.reject(new Error('Worker pool terminated'));
}
for (const [worker, task] of this.workerTasks) {
task.reject(new Error('Worker pool terminated'));
}
// ๐๏ธ Terminate all workers
for (const worker of this.workers) {
worker.terminate();
}
this.workers.length = 0;
this.availableWorkers.length = 0;
this.taskQueue.length = 0;
this.workerTasks.clear();
}
}
// ๐ฎ Demo worker pool usage
async function demonstrateWorkerPool(): Promise<void> {
const workerPool = new WorkerPool<number[], number[]>('/math-worker.js', 4);
try {
console.log('๐โโ๏ธ Testing worker pool with parallel tasks...');
// ๐ Submit multiple tasks with different priorities
const tasks = [
workerPool.execute([1, 2, 3, 4, 5], 1), // Low priority
workerPool.execute([10, 20, 30, 40, 50], 3), // High priority
workerPool.execute([100, 200, 300], 2), // Medium priority
workerPool.execute([5, 10, 15, 20, 25, 30], 3), // High priority
workerPool.execute([1, 1, 2, 3, 5, 8, 13], 1) // Low priority
];
// โณ Wait for all tasks to complete
const results = await Promise.all(tasks);
console.log('โ
All worker pool tasks completed:', results);
// ๐ Check pool statistics
const stats = workerPool.getStats();
console.log('๐ Worker pool stats:', stats);
} catch (error) {
console.error('โ Worker pool demo failed:', error);
} finally {
workerPool.terminate();
}
}
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Memory Leaks with Large Data Transfer
// โ Dangerous - copying large data repeatedly
class BadWorkerManager {
private worker: Worker;
constructor() {
this.worker = new Worker('/worker.js');
}
async processLargeDataset(data: Float64Array): Promise<Float64Array> {
// ๐ฅ This copies the entire array to worker memory
this.worker.postMessage({ data });
return new Promise((resolve) => {
this.worker.onmessage = (event) => {
// ๐ฅ This copies the result back to main thread
resolve(event.data.result);
};
});
}
}
// โ
Safe - using Transferable Objects
class EfficientWorkerManager {
private worker: Worker;
constructor() {
this.worker = new Worker('/worker.js');
}
async processLargeDataset(data: Float64Array): Promise<Float64Array> {
console.log('๐ Processing large dataset:', data.length, 'elements');
// โก Transfer ownership instead of copying
return new Promise((resolve, reject) => {
this.worker.onmessage = (event) => {
if (event.data.error) {
reject(new Error(event.data.error));
} else {
// ๐ Data ownership transferred back
resolve(event.data.result);
}
};
// ๐ Transfer ArrayBuffer ownership to worker
this.worker.postMessage(
{
type: 'PROCESS_LARGE_DATA',
data
},
[data.buffer] // โก Transfer list - zero-copy transfer
);
});
}
// ๐ Process with SharedArrayBuffer for shared memory
async processWithSharedMemory(size: number): Promise<void> {
// ๐ค Create shared memory that both threads can access
const sharedBuffer = new SharedArrayBuffer(size * Float64Array.BYTES_PER_ELEMENT);
const sharedArray = new Float64Array(sharedBuffer);
// ๐ฒ Fill with random data
for (let i = 0; i < size; i++) {
sharedArray[i] = Math.random() * 100;
}
console.log('๐ค Created shared memory:', sharedArray.length, 'elements');
return new Promise((resolve, reject) => {
this.worker.onmessage = (event) => {
if (event.data.type === 'SHARED_PROCESSING_COMPLETE') {
console.log('โ
Shared memory processing complete');
console.log('๐ First 10 processed values:', Array.from(sharedArray.slice(0, 10)));
resolve();
}
};
// ๐ค Send shared buffer reference (no data copy!)
this.worker.postMessage({
type: 'PROCESS_SHARED_MEMORY',
sharedBuffer,
size
});
});
}
}
๐คฏ Pitfall 2: Unhandled Worker Errors and Cleanup
// โ Dangerous - no error handling or cleanup
class UnreliableWorkerManager {
private worker: Worker;
private pendingTasks = new Map<string, any>();
constructor() {
this.worker = new Worker('/unreliable-worker.js');
// ๐ฅ No error handling setup!
}
async processData(data: any): Promise<any> {
const taskId = Math.random().toString();
return new Promise((resolve) => {
this.pendingTasks.set(taskId, resolve);
this.worker.postMessage({ taskId, data });
// ๐ฅ No timeout, no error handling!
});
}
}
// โ
Robust - comprehensive error handling and cleanup
class RobustWorkerManager {
private worker: Worker | null = null;
private pendingTasks = new Map<string, {
resolve: (value: any) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
startTime: number;
}>();
private isTerminated = false;
private reconnectAttempts = 0;
private maxReconnectAttempts = 3;
constructor(private workerPath: string) {
this.initializeWorker();
}
private initializeWorker(): void {
if (this.isTerminated) return;
try {
this.worker = new Worker(this.workerPath);
this.setupWorkerHandlers();
console.log('โ
Worker initialized successfully');
} catch (error) {
console.error('โ Failed to initialize worker:', error);
this.handleWorkerFailure();
}
}
private setupWorkerHandlers(): void {
if (!this.worker) return;
// ๐ฅ Handle successful messages
this.worker.onmessage = (event: MessageEvent) => {
this.handleWorkerMessage(event.data);
};
// ๐จ Handle worker errors
this.worker.onerror = (error: ErrorEvent) => {
console.error('๐จ Worker error:', error);
this.handleWorkerFailure();
};
// ๐ฅ Handle worker crashes
this.worker.onmessageerror = (error: MessageEvent) => {
console.error('๐ฅ Worker message error:', error);
this.handleWorkerFailure();
};
}
private handleWorkerMessage(data: any): void {
const { taskId, result, error } = data;
const task = this.pendingTasks.get(taskId);
if (task) {
clearTimeout(task.timeout);
this.pendingTasks.delete(taskId);
const processingTime = Date.now() - task.startTime;
console.log(`โก Task ${taskId} completed in ${processingTime}ms`);
if (error) {
task.reject(new Error(error));
} else {
task.resolve(result);
}
}
}
private handleWorkerFailure(): void {
console.error('๐ฅ Worker failure detected');
// ๐ซ Reject all pending tasks
for (const [taskId, task] of this.pendingTasks) {
clearTimeout(task.timeout);
task.reject(new Error('Worker failed'));
}
this.pendingTasks.clear();
// ๐ Attempt to reconnect
if (this.reconnectAttempts < this.maxReconnectAttempts && !this.isTerminated) {
this.reconnectAttempts++;
console.log(`๐ Attempting to reconnect worker (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.initializeWorker();
}, 1000 * this.reconnectAttempts); // Exponential backoff
} else {
console.error('โ Max reconnection attempts reached. Worker permanently failed.');
}
}
async processData(data: any, timeoutMs: number = 30000): Promise<any> {
if (this.isTerminated || !this.worker) {
throw new Error('Worker is not available');
}
const taskId = `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
return new Promise((resolve, reject) => {
// โฐ Set up timeout
const timeout = setTimeout(() => {
this.pendingTasks.delete(taskId);
reject(new Error(`Task ${taskId} timed out after ${timeoutMs}ms`));
}, timeoutMs);
// ๐ Register task
this.pendingTasks.set(taskId, {
resolve,
reject,
timeout,
startTime: Date.now()
});
// ๐ค Send to worker
this.worker!.postMessage({ taskId, data });
});
}
// ๐ Health check
isHealthy(): boolean {
return !this.isTerminated &&
this.worker !== null &&
this.reconnectAttempts < this.maxReconnectAttempts;
}
// ๐ Get performance metrics
getMetrics(): {
pendingTasks: number;
reconnectAttempts: number;
isHealthy: boolean;
} {
return {
pendingTasks: this.pendingTasks.size,
reconnectAttempts: this.reconnectAttempts,
isHealthy: this.isHealthy()
};
}
// ๐งน Clean termination
terminate(): void {
console.log('๐ Terminating worker manager...');
this.isTerminated = true;
// ๐ซ Reject all pending tasks
for (const [taskId, task] of this.pendingTasks) {
clearTimeout(task.timeout);
task.reject(new Error('Worker manager terminated'));
}
this.pendingTasks.clear();
// ๐๏ธ Terminate worker
if (this.worker) {
this.worker.terminate();
this.worker = null;
}
}
}
๐ ๏ธ Best Practices
๐ฏ Performance Optimization
// ๐ Advanced performance optimization strategies
class PerformanceOptimizedWorkerManager {
private workerPool: Worker[] = [];
private taskMetrics = new Map<string, {
startTime: number;
endTime: number;
dataSize: number;
processingTime: number;
}>();
private performanceHistory: Array<{
timestamp: number;
throughput: number; // tasks per second
averageLatency: number;
memoryUsage: number;
}> = [];
constructor(workerCount: number = navigator.hardwareConcurrency || 4) {
this.initializeWorkerPool(workerCount);
this.startPerformanceMonitoring();
}
private initializeWorkerPool(count: number): void {
console.log(`๐โโ๏ธ Initializing optimized worker pool with ${count} workers`);
for (let i = 0; i < count; i++) {
const worker = new Worker('/optimized-worker.js');
// ๐ง Configure worker for optimal performance
worker.postMessage({
type: 'CONFIGURE',
config: {
chunkSize: 10000, // Optimal chunk size for data processing
useTransferableObjects: true,
enableProgressReporting: false, // Disable for better performance
memoryLimit: 256 * 1024 * 1024 // 256MB memory limit
}
});
this.workerPool.push(worker);
}
}
// ๐ Intelligent load balancing
private selectOptimalWorker(): Worker {
// ๐ฏ Simple round-robin for now (could implement more sophisticated strategies)
const selectedWorker = this.workerPool[Math.floor(Math.random() * this.workerPool.length)];
return selectedWorker;
}
// ๐ Batch processing for better throughput
async processBatch(
items: any[],
batchSize: number = 100
): Promise<any[]> {
console.log(`๐ฆ Processing batch of ${items.length} items (batch size: ${batchSize})`);
const batches: any[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
const startTime = Date.now();
// ๐ Process batches in parallel
const batchPromises = batches.map(async (batch, index) => {
const worker = this.selectOptimalWorker();
return this.processBatchInWorker(worker, batch, `batch_${index}`);
});
try {
const results = await Promise.all(batchPromises);
const flatResults = results.flat();
const processingTime = Date.now() - startTime;
const throughput = items.length / (processingTime / 1000);
console.log(`โ
Batch processing complete: ${items.length} items in ${processingTime}ms (${throughput.toFixed(1)} items/sec)`);
return flatResults;
} catch (error) {
console.error('โ Batch processing failed:', error);
throw error;
}
}
private async processBatchInWorker(
worker: Worker,
batch: any[],
batchId: string
): Promise<any[]> {
const taskId = `${batchId}_${Date.now()}`;
return new Promise((resolve, reject) => {
const handleMessage = (event: MessageEvent) => {
if (event.data.taskId === taskId) {
worker.removeEventListener('message', handleMessage);
if (event.data.error) {
reject(new Error(event.data.error));
} else {
resolve(event.data.result);
}
}
};
worker.addEventListener('message', handleMessage);
// ๐ค Send batch to worker with transferable objects if applicable
const message = { taskId, type: 'PROCESS_BATCH', batch };
const transferList: Transferable[] = [];
// ๐ Add ArrayBuffers to transfer list for zero-copy transfer
batch.forEach(item => {
if (item instanceof ArrayBuffer) {
transferList.push(item);
} else if (item?.buffer instanceof ArrayBuffer) {
transferList.push(item.buffer);
}
});
worker.postMessage(message, transferList);
});
}
// ๐ Performance monitoring
private startPerformanceMonitoring(): void {
setInterval(() => {
this.recordPerformanceMetrics();
}, 5000); // Record metrics every 5 seconds
}
private recordPerformanceMetrics(): void {
const now = Date.now();
const recentTasks = Array.from(this.taskMetrics.values())
.filter(metric => now - metric.endTime < 60000); // Last minute
if (recentTasks.length === 0) return;
const throughput = recentTasks.length / 60; // tasks per second over last minute
const averageLatency = recentTasks.reduce((sum, task) => sum + task.processingTime, 0) / recentTasks.length;
// ๐ Estimate memory usage
const memoryUsage = (performance as any).memory?.usedJSHeapSize || 0;
this.performanceHistory.push({
timestamp: now,
throughput,
averageLatency,
memoryUsage
});
// ๐งน Keep only last 100 entries
if (this.performanceHistory.length > 100) {
this.performanceHistory.splice(0, this.performanceHistory.length - 100);
}
// ๐ฏ Adaptive optimization based on metrics
this.optimizeBasedOnMetrics(throughput, averageLatency, memoryUsage);
}
private optimizeBasedOnMetrics(throughput: number, latency: number, memoryUsage: number): void {
// ๐ฏ Simple adaptive optimizations
// ๐ If throughput is low, consider adding more workers (if CPU allows)
if (throughput < 1 && this.workerPool.length < navigator.hardwareConcurrency * 2) {
console.log('๐ Low throughput detected, considering adding worker...');
}
// ๐ If latency is high, optimize chunk sizes
if (latency > 1000) { // More than 1 second average latency
console.log('๐ High latency detected, consider optimizing batch sizes');
}
// ๐พ If memory usage is high, trigger cleanup
if (memoryUsage > 500 * 1024 * 1024) { // More than 500MB
console.log('๐พ High memory usage detected, triggering cleanup...');
this.cleanup();
}
}
// ๐งน Memory cleanup
private cleanup(): void {
// ๐๏ธ Clear old task metrics
const cutoffTime = Date.now() - 300000; // 5 minutes ago
for (const [taskId, metric] of this.taskMetrics) {
if (metric.endTime < cutoffTime) {
this.taskMetrics.delete(taskId);
}
}
// ๐งน Trim performance history
if (this.performanceHistory.length > 50) {
this.performanceHistory.splice(0, this.performanceHistory.length - 50);
}
// ๐๏ธ Suggest garbage collection if available
if ((window as any).gc) {
(window as any).gc();
}
}
// ๐ Get detailed performance report
getPerformanceReport(): {
current: {
throughput: number;
averageLatency: number;
memoryUsage: number;
activeTasks: number;
};
historical: typeof this.performanceHistory;
recommendations: string[];
} {
const latest = this.performanceHistory[this.performanceHistory.length - 1];
const recommendations: string[] = [];
if (latest) {
if (latest.throughput < 1) {
recommendations.push('Consider increasing worker pool size for better throughput');
}
if (latest.averageLatency > 1000) {
recommendations.push('Optimize batch sizes to reduce latency');
}
if (latest.memoryUsage > 400 * 1024 * 1024) {
recommendations.push('Consider implementing more aggressive memory cleanup');
}
}
return {
current: {
throughput: latest?.throughput || 0,
averageLatency: latest?.averageLatency || 0,
memoryUsage: latest?.memoryUsage || 0,
activeTasks: this.taskMetrics.size
},
historical: this.performanceHistory,
recommendations
};
}
terminate(): void {
console.log('๐ Terminating performance-optimized worker manager...');
for (const worker of this.workerPool) {
worker.terminate();
}
this.workerPool.length = 0;
this.taskMetrics.clear();
this.performanceHistory.length = 0;
}
}
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Multi-Worker Image Processing Pipeline
Create a comprehensive image processing system using Web Workers that can:
๐ Requirements:
- โ Process multiple images in parallel using a worker pool
- ๐ท๏ธ Apply different filters (blur, sharpen, brightness, contrast)
- ๐ค Handle real-time progress reporting for each image
- ๐ Implement error recovery and retry mechanisms
- ๐จ Support batch processing with priority queuing
๐ Bonus Points:
- Add image format conversion (PNG โ JPEG โ WebP)
- Implement adaptive quality based on file size targets
- Create a visual progress dashboard with live updates
๐ก Solution Framework
๐ Click to see solution framework
// ๐ฏ Image processing pipeline with Web Workers
interface ImageProcessingTask {
id: string;
imageData: ImageData;
operations: Array<{
type: 'blur' | 'sharpen' | 'brightness' | 'contrast';
params: Record<string, number>;
}>;
priority: number;
outputFormat?: 'png' | 'jpeg' | 'webp';
quality?: number;
}
interface ProcessingProgress {
taskId: string;
stage: string;
progress: number; // 0-100
estimatedTimeRemaining: number;
}
class ImageProcessingPipeline {
private workerPool: WorkerPool<ImageProcessingTask, ProcessedImage>;
private progressCallbacks = new Map<string, (progress: ProcessingProgress) => void>();
private completedTasks = new Map<string, ProcessedImage>();
constructor(workerCount: number = 4) {
this.workerPool = new WorkerPool('/image-worker.js', workerCount);
this.setupProgressHandling();
}
async processImage(task: ImageProcessingTask): Promise<ProcessedImage> {
console.log(`๐ผ๏ธ Starting image processing task: ${task.id}`);
try {
const result = await this.workerPool.execute(task, task.priority);
this.completedTasks.set(task.id, result);
return result;
} catch (error) {
console.error(`โ Image processing failed for task ${task.id}:`, error);
throw error;
}
}
async processBatch(tasks: ImageProcessingTask[]): Promise<ProcessedImage[]> {
console.log(`๐ฆ Processing batch of ${tasks.length} images`);
const promises = tasks.map(task => this.processImage(task));
return Promise.all(promises);
}
onProgress(taskId: string, callback: (progress: ProcessingProgress) => void): void {
this.progressCallbacks.set(taskId, callback);
}
private setupProgressHandling(): void {
// Implementation for handling progress messages from workers
// This would listen to worker messages and call appropriate callbacks
}
getStats(): {
activeJobs: number;
completedJobs: number;
queuedJobs: number;
averageProcessingTime: number;
} {
const stats = this.workerPool.getStats();
return {
activeJobs: stats.activeTasks,
completedJobs: this.completedTasks.size,
queuedJobs: stats.queuedTasks,
averageProcessingTime: stats.averageTaskTime
};
}
terminate(): void {
this.workerPool.terminate();
this.progressCallbacks.clear();
this.completedTasks.clear();
}
}
// ๐ฎ Usage example
async function demonstrateImagePipeline(): Promise<void> {
const pipeline = new ImageProcessingPipeline(4);
// ๐ผ๏ธ Create sample image processing tasks
const tasks: ImageProcessingTask[] = [
{
id: 'image_1',
imageData: createMockImageData(800, 600),
operations: [
{ type: 'blur', params: { radius: 2 } },
{ type: 'brightness', params: { amount: 0.2 } }
],
priority: 2,
outputFormat: 'jpeg',
quality: 85
},
{
id: 'image_2',
imageData: createMockImageData(1200, 900),
operations: [
{ type: 'sharpen', params: { amount: 1.5 } },
{ type: 'contrast', params: { amount: 0.3 } }
],
priority: 1,
outputFormat: 'png'
}
];
// ๐ Set up progress monitoring
tasks.forEach(task => {
pipeline.onProgress(task.id, (progress) => {
console.log(`๐ ${task.id}: ${progress.progress}% - ${progress.stage}`);
});
});
try {
const results = await pipeline.processBatch(tasks);
console.log(`โ
Batch processing complete! Processed ${results.length} images`);
const stats = pipeline.getStats();
console.log('๐ Final stats:', stats);
} catch (error) {
console.error('โ Batch processing failed:', error);
} finally {
pipeline.terminate();
}
}
function createMockImageData(width: number, height: number): ImageData {
const data = new Uint8ClampedArray(width * height * 4);
// Fill with random pixel data
for (let i = 0; i < data.length; i += 4) {
data[i] = Math.floor(Math.random() * 256); // R
data[i + 1] = Math.floor(Math.random() * 256); // G
data[i + 2] = Math.floor(Math.random() * 256); // B
data[i + 3] = 255; // A
}
return new ImageData(data, width, height);
}
๐ Key Takeaways
Youโve mastered Web Workers! Hereโs what you can now do:
- โ Create type-safe worker communication with message contracts ๐ช
- โ Build efficient worker pools for parallel processing ๐ก๏ธ
- โ Handle complex data transfers with Transferable Objects ๐ฏ
- โ Implement robust error handling and recovery mechanisms ๐
- โ Optimize performance with monitoring and adaptive strategies ๐
Remember: Web Workers are your secret weapon for keeping the UI responsive while doing heavy lifting in the background! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered Web Workers and parallel processing!
Hereโs what to do next:
- ๐ป Practice with the image processing pipeline exercise above
- ๐๏ธ Build a real application that leverages Web Workers for performance
- ๐ Move on to our next tutorial: โService Workers: Offline Functionalityโ
- ๐ Share your Web Worker knowledge with others!
Remember: The key to great Web Worker implementations is balancing performance gains with communication overhead. Start simple and optimize based on real metrics! ๐
Happy parallel processing! ๐๐งตโจ