+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 334 of 355

๐Ÿ“˜ Worker Threads: Parallel Processing

Master worker threads: parallel processing in TypeScript with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic understanding of JavaScript ๐Ÿ“
  • TypeScript installation โšก
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write type-safe code โœจ

๐Ÿ“˜ Worker Threads: Parallel Processing

๐ŸŽฏ Introduction

Ever felt like your computer was running a marathon with one leg? ๐Ÿƒโ€โ™‚๏ธ Thatโ€™s what happens when JavaScript uses just one thread! Today, weโ€™re unleashing the full power of your CPU with Worker Threads โ€“ turning your single-lane road into a multi-lane highway! ๐Ÿ›ฃ๏ธ

Worker threads let you run JavaScript code in parallel, making heavy computations, data processing, and complex tasks run faster than ever. Itโ€™s like having multiple chefs in the kitchen instead of just one! ๐Ÿ‘จโ€๐Ÿณ๐Ÿ‘ฉโ€๐Ÿณ

Ready to supercharge your TypeScript applications? Letโ€™s dive in! ๐Ÿš€

๐Ÿ“š Understanding Worker Threads

Think of worker threads like having multiple employees in your office instead of doing everything yourself. Each worker can handle their own task independently, without blocking others! ๐Ÿข

What Are Worker Threads? ๐Ÿค”

Worker threads are separate JavaScript execution contexts that run in parallel with your main thread. Hereโ€™s what makes them special:

// ๐ŸŽญ The Main Thread (The Boss)
// Handles UI, events, and coordinates workers
const mainThread = {
  task: "Manage the application",
  canBlock: false, // Must stay responsive!
  coordinates: ["worker1", "worker2", "worker3"]
};

// ๐Ÿ‘ท Worker Threads (The Employees)
// Handle heavy computations in the background
const workerThread = {
  task: "Process data intensively",
  canBlock: true, // Can work without interrupting others
  communicates: "Through messages"
};

Why Use Worker Threads? ๐Ÿ’ก

  1. CPU-Intensive Tasks: Process large datasets without freezing your app
  2. Parallel Processing: Use all CPU cores effectively
  3. Better Performance: Reduce processing time significantly
  4. Responsive UI: Keep your application smooth and responsive

๐Ÿ”ง Basic Syntax and Usage

Letโ€™s create our first worker thread! First, install the necessary types:

npm install --save-dev @types/node

Creating a Worker Thread ๐Ÿ—๏ธ

// ๐Ÿ“ worker.ts - Our worker file
import { parentPort, workerData } from 'worker_threads';

// ๐Ÿ‘‹ Worker is ready to receive tasks!
if (parentPort) {
  // ๐Ÿ“จ Listen for messages from the main thread
  parentPort.on('message', (task: string) => {
    console.log(`Worker received: ${task}`);
    
    // ๐Ÿ’ช Do some heavy work
    const result = performHeavyTask(task);
    
    // ๐Ÿ“ค Send result back
    parentPort.postMessage({ 
      success: true, 
      result 
    });
  });
}

function performHeavyTask(task: string): string {
  // ๐Ÿ”ฅ Simulate intensive work
  return `Completed: ${task}`;
}

// ๐ŸŽฏ Process initial data if provided
if (workerData) {
  console.log('Initial data:', workerData);
}

Using the Worker from Main Thread ๐ŸŽฎ

// ๐Ÿ“ main.ts - Main application file
import { Worker } from 'worker_threads';
import path from 'path';

// ๐Ÿš€ Create a new worker
const worker = new Worker(
  path.join(__dirname, 'worker.ts'),
  {
    workerData: { message: 'Hello Worker!' }
  }
);

// ๐Ÿ“จ Send a task to the worker
worker.postMessage('Process this data');

// ๐Ÿ‘‚ Listen for results
worker.on('message', (result) => {
  console.log('Received from worker:', result);
});

// โš ๏ธ Handle errors
worker.on('error', (error) => {
  console.error('Worker error:', error);
});

// ๐Ÿ Worker finished
worker.on('exit', (code) => {
  console.log(`Worker stopped with exit code ${code}`);
});

๐Ÿ’ก Practical Examples

Example 1: Image Processing Pipeline ๐Ÿ–ผ๏ธ

Letโ€™s build a parallel image processor that can handle multiple images simultaneously!

// ๐Ÿ“ imageWorker.ts
import { parentPort, workerData } from 'worker_threads';

interface ImageTask {
  id: string;
  path: string;
  operations: string[];
}

interface ProcessResult {
  id: string;
  success: boolean;
  processedPath?: string;
  error?: string;
}

// ๐ŸŽจ Process images in the worker
if (parentPort) {
  parentPort.on('message', async (task: ImageTask) => {
    try {
      console.log(`๐Ÿ–ผ๏ธ Processing image: ${task.path}`);
      
      // Simulate image processing
      const processed = await processImage(task);
      
      // ๐Ÿ“ค Send success result
      parentPort.postMessage({
        id: task.id,
        success: true,
        processedPath: processed
      } as ProcessResult);
      
    } catch (error) {
      // ๐Ÿ“ค Send error result
      parentPort.postMessage({
        id: task.id,
        success: false,
        error: error.message
      } as ProcessResult);
    }
  });
}

async function processImage(task: ImageTask): Promise<string> {
  // ๐ŸŽฏ Apply each operation
  for (const operation of task.operations) {
    console.log(`  โœจ Applying ${operation}...`);
    // Simulate processing time
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  
  return `processed_${task.id}.jpg`;
}
// ๐Ÿ“ imageProcessor.ts - Main thread controller
import { Worker } from 'worker_threads';
import path from 'path';

class ImageProcessor {
  private workers: Worker[] = [];
  private taskQueue: ImageTask[] = [];
  private busyWorkers = new Set<Worker>();
  
  constructor(private workerCount: number = 4) {
    this.initializeWorkers();
  }
  
  // ๐Ÿ—๏ธ Create worker pool
  private initializeWorkers(): void {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(
        path.join(__dirname, 'imageWorker.js')
      );
      
      // ๐Ÿ‘‚ Set up event handlers
      worker.on('message', (result: ProcessResult) => {
        console.log(`โœ… Image ${result.id} processed!`);
        this.busyWorkers.delete(worker);
        this.processNextTask(worker);
      });
      
      worker.on('error', (error) => {
        console.error('โŒ Worker error:', error);
      });
      
      this.workers.push(worker);
    }
  }
  
  // ๐Ÿ“ฅ Add task to queue
  async processImage(task: ImageTask): Promise<void> {
    this.taskQueue.push(task);
    this.distributeWork();
  }
  
  // ๐ŸŽฏ Distribute work to available workers
  private distributeWork(): void {
    for (const worker of this.workers) {
      if (!this.busyWorkers.has(worker) && this.taskQueue.length > 0) {
        const task = this.taskQueue.shift()!;
        this.busyWorkers.add(worker);
        worker.postMessage(task);
      }
    }
  }
  
  // ๐Ÿ”„ Process next task
  private processNextTask(worker: Worker): void {
    if (this.taskQueue.length > 0) {
      const task = this.taskQueue.shift()!;
      this.busyWorkers.add(worker);
      worker.postMessage(task);
    }
  }
  
  // ๐Ÿ›‘ Cleanup workers
  async shutdown(): Promise<void> {
    await Promise.all(
      this.workers.map(worker => worker.terminate())
    );
  }
}

// ๐Ÿš€ Usage example
async function main() {
  const processor = new ImageProcessor(4); // 4 parallel workers
  
  // ๐Ÿ“ธ Process multiple images
  const images = [
    { id: '1', path: 'photo1.jpg', operations: ['resize', 'blur'] },
    { id: '2', path: 'photo2.jpg', operations: ['crop', 'sharpen'] },
    { id: '3', path: 'photo3.jpg', operations: ['rotate', 'filter'] },
    // ... more images
  ];
  
  // ๐ŸŽจ Process all images in parallel
  for (const image of images) {
    await processor.processImage(image);
  }
  
  // Wait a bit for processing
  setTimeout(() => processor.shutdown(), 5000);
}

Example 2: Data Analysis Engine ๐Ÿ“Š

Build a parallel data analyzer for processing large datasets!

// ๐Ÿ“ dataWorker.ts
import { parentPort, workerData } from 'worker_threads';

interface DataChunk {
  id: number;
  data: number[];
  operation: 'sum' | 'average' | 'max' | 'min';
}

interface AnalysisResult {
  id: number;
  result: number;
  processTime: number;
}

// ๐Ÿ“Š Analyze data chunks
if (parentPort) {
  parentPort.on('message', (chunk: DataChunk) => {
    const startTime = Date.now();
    
    let result: number;
    switch (chunk.operation) {
      case 'sum':
        result = chunk.data.reduce((a, b) => a + b, 0);
        break;
      case 'average':
        result = chunk.data.reduce((a, b) => a + b, 0) / chunk.data.length;
        break;
      case 'max':
        result = Math.max(...chunk.data);
        break;
      case 'min':
        result = Math.min(...chunk.data);
        break;
    }
    
    // ๐Ÿ“ค Send analysis result
    parentPort.postMessage({
      id: chunk.id,
      result,
      processTime: Date.now() - startTime
    } as AnalysisResult);
  });
}
// ๐Ÿ“ dataAnalyzer.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';

class DataAnalyzer extends EventEmitter {
  private workers: Worker[] = [];
  private results = new Map<number, number>();
  private pendingChunks = 0;
  
  constructor(private workerCount: number = 4) {
    super();
    this.initializeWorkers();
  }
  
  // ๐Ÿ” Analyze large dataset in parallel
  async analyzeDataset(
    data: number[], 
    operation: 'sum' | 'average' | 'max' | 'min',
    chunkSize: number = 10000
  ): Promise<number> {
    // ๐Ÿ”ช Split data into chunks
    const chunks: DataChunk[] = [];
    for (let i = 0; i < data.length; i += chunkSize) {
      chunks.push({
        id: chunks.length,
        data: data.slice(i, i + chunkSize),
        operation
      });
    }
    
    this.pendingChunks = chunks.length;
    this.results.clear();
    
    // ๐Ÿ“จ Distribute chunks to workers
    return new Promise((resolve) => {
      let workerIndex = 0;
      
      // Listen for completion
      this.once('analysisComplete', () => {
        const finalResult = this.aggregateResults(operation);
        resolve(finalResult);
      });
      
      // Send chunks to workers
      for (const chunk of chunks) {
        this.workers[workerIndex].postMessage(chunk);
        workerIndex = (workerIndex + 1) % this.workerCount;
      }
    });
  }
  
  // ๐Ÿ—๏ธ Initialize worker pool
  private initializeWorkers(): void {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(
        path.join(__dirname, 'dataWorker.js')
      );
      
      worker.on('message', (result: AnalysisResult) => {
        console.log(
          `โšก Chunk ${result.id} processed in ${result.processTime}ms`
        );
        
        this.results.set(result.id, result.result);
        this.pendingChunks--;
        
        if (this.pendingChunks === 0) {
          this.emit('analysisComplete');
        }
      });
      
      this.workers.push(worker);
    }
  }
  
  // ๐ŸŽฏ Aggregate results from all workers
  private aggregateResults(operation: string): number {
    const values = Array.from(this.results.values());
    
    switch (operation) {
      case 'sum':
      case 'average':
        return values.reduce((a, b) => a + b, 0);
      case 'max':
        return Math.max(...values);
      case 'min':
        return Math.min(...values);
      default:
        return 0;
    }
  }
}

// ๐Ÿš€ Usage example
async function analyzeMarketData() {
  const analyzer = new DataAnalyzer(8); // 8 parallel workers
  
  // ๐Ÿ“ˆ Generate large dataset (1 million data points)
  const marketData = Array.from(
    { length: 1_000_000 }, 
    () => Math.random() * 1000
  );
  
  console.time('Analysis Time');
  
  // ๐Ÿ“Š Run parallel analysis
  const sum = await analyzer.analyzeDataset(marketData, 'sum');
  const avg = await analyzer.analyzeDataset(marketData, 'average');
  const max = await analyzer.analyzeDataset(marketData, 'max');
  
  console.timeEnd('Analysis Time');
  
  console.log(`
    ๐Ÿ“Š Market Analysis Results:
    Sum: $${sum.toFixed(2)}
    Average: $${avg.toFixed(2)}
    Maximum: $${max.toFixed(2)}
  `);
}

Example 3: Crypto Mining Simulator โ›๏ธ

Letโ€™s build a fun crypto mining simulator using worker threads!

// ๐Ÿ“ miningWorker.ts
import { parentPort, workerData } from 'worker_threads';
import crypto from 'crypto';

interface MiningTask {
  blockData: string;
  difficulty: number;
  startNonce: number;
  range: number;
}

interface MiningResult {
  found: boolean;
  nonce?: number;
  hash?: string;
  attempts: number;
}

// โ›๏ธ Mine for valid hash
if (parentPort) {
  parentPort.on('message', (task: MiningTask) => {
    console.log(`โ›๏ธ Worker mining with nonce range ${task.startNonce} - ${task.startNonce + task.range}`);
    
    const target = '0'.repeat(task.difficulty);
    let attempts = 0;
    
    for (let nonce = task.startNonce; nonce < task.startNonce + task.range; nonce++) {
      attempts++;
      
      // ๐Ÿ” Calculate hash
      const data = task.blockData + nonce;
      const hash = crypto.createHash('sha256').update(data).digest('hex');
      
      // ๐ŸŽฏ Check if we found a valid hash
      if (hash.startsWith(target)) {
        parentPort.postMessage({
          found: true,
          nonce,
          hash,
          attempts
        } as MiningResult);
        return;
      }
      
      // ๐Ÿ“Š Report progress every 10000 attempts
      if (attempts % 10000 === 0) {
        parentPort.postMessage({
          found: false,
          attempts
        } as MiningResult);
      }
    }
    
    // ๐Ÿ˜” Didn't find in this range
    parentPort.postMessage({
      found: false,
      attempts
    } as MiningResult);
  });
}
// ๐Ÿ“ cryptoMiner.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';

class CryptoMiner extends EventEmitter {
  private workers: Worker[] = [];
  private mining = false;
  private totalAttempts = 0;
  
  constructor(private workerCount: number = 4) {
    super();
  }
  
  // โ›๏ธ Start mining a block
  async mineBlock(blockData: string, difficulty: number): Promise<{
    nonce: number;
    hash: string;
    attempts: number;
    timeSeconds: number;
  }> {
    this.mining = true;
    this.totalAttempts = 0;
    const startTime = Date.now();
    
    // ๐Ÿ—๏ธ Create workers for this mining session
    this.createWorkers();
    
    return new Promise((resolve) => {
      let currentNonce = 0;
      const rangePerWorker = 100000;
      
      // ๐Ÿ“จ Distribute work
      const distributeWork = () => {
        if (!this.mining) return;
        
        for (const worker of this.workers) {
          worker.postMessage({
            blockData,
            difficulty,
            startNonce: currentNonce,
            range: rangePerWorker
          } as MiningTask);
          
          currentNonce += rangePerWorker;
        }
      };
      
      // ๐Ÿ‘‚ Handle worker results
      const handleResult = (worker: Worker) => (result: MiningResult) => {
        this.totalAttempts += result.attempts;
        
        if (result.found) {
          // ๐ŸŽ‰ Found valid hash!
          this.mining = false;
          const timeSeconds = (Date.now() - startTime) / 1000;
          
          resolve({
            nonce: result.nonce!,
            hash: result.hash!,
            attempts: this.totalAttempts,
            timeSeconds
          });
          
          this.stopMining();
        } else if (this.mining) {
          // ๐Ÿ’ช Keep mining
          worker.postMessage({
            blockData,
            difficulty,
            startNonce: currentNonce,
            range: rangePerWorker
          } as MiningTask);
          
          currentNonce += rangePerWorker;
          
          // ๐Ÿ“Š Emit progress
          this.emit('progress', {
            attempts: this.totalAttempts,
            hashRate: this.totalAttempts / ((Date.now() - startTime) / 1000)
          });
        }
      };
      
      // Set up workers with handlers
      for (const worker of this.workers) {
        worker.on('message', handleResult(worker));
      }
      
      // ๐Ÿš€ Start mining!
      distributeWork();
    });
  }
  
  // ๐Ÿ—๏ธ Create worker pool
  private createWorkers(): void {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(
        path.join(__dirname, 'miningWorker.js')
      );
      
      worker.on('error', (error) => {
        console.error('โŒ Mining worker error:', error);
      });
      
      this.workers.push(worker);
    }
  }
  
  // ๐Ÿ›‘ Stop all mining
  private async stopMining(): Promise<void> {
    await Promise.all(
      this.workers.map(worker => worker.terminate())
    );
    this.workers = [];
  }
}

// ๐Ÿš€ Usage example
async function simulateMining() {
  const miner = new CryptoMiner(8); // 8 parallel workers
  
  // ๐Ÿ“Š Track mining progress
  miner.on('progress', ({ attempts, hashRate }) => {
    console.log(`โšก Mining: ${attempts} attempts | ${hashRate.toFixed(0)} H/s`);
  });
  
  console.log('โ›๏ธ Starting crypto mining simulation...\n');
  
  // Mine blocks with increasing difficulty
  for (let difficulty = 4; difficulty <= 6; difficulty++) {
    console.log(`\n๐ŸŽฏ Mining block with difficulty ${difficulty}...`);
    
    const blockData = `Block #${Date.now()} | Previous: 0x1234...`;
    const result = await miner.mineBlock(blockData, difficulty);
    
    console.log(`
โœ… Block mined successfully!
   ๐Ÿ”‘ Nonce: ${result.nonce}
   ๐Ÿ” Hash: ${result.hash}
   โฑ๏ธ Time: ${result.timeSeconds.toFixed(2)} seconds
   ๐Ÿ’ช Attempts: ${result.attempts.toLocaleString()}
    `);
  }
}

๐Ÿš€ Advanced Concepts

SharedArrayBuffer for Shared Memory ๐Ÿง 

Share memory between threads for ultra-fast communication!

// ๐Ÿ“ sharedMemoryExample.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

if (isMainThread) {
  // ๐Ÿง  Create shared memory
  const sharedBuffer = new SharedArrayBuffer(1024);
  const sharedArray = new Int32Array(sharedBuffer);
  
  // ๐Ÿ“ Initialize shared data
  sharedArray[0] = 0; // Counter
  
  // ๐Ÿ—๏ธ Create workers with shared memory
  const workers: Worker[] = [];
  for (let i = 0; i < 4; i++) {
    const worker = new Worker(__filename, {
      workerData: { sharedBuffer, workerId: i }
    });
    workers.push(worker);
  }
  
  // ๐Ÿ“Š Monitor shared counter
  setInterval(() => {
    console.log(`Shared counter value: ${sharedArray[0]}`);
  }, 1000);
  
} else {
  // ๐Ÿ‘ท Worker thread
  const { sharedBuffer, workerId } = workerData;
  const sharedArray = new Int32Array(sharedBuffer);
  
  // ๐Ÿ”„ Increment shared counter atomically
  setInterval(() => {
    Atomics.add(sharedArray, 0, 1);
    console.log(`Worker ${workerId} incremented counter`);
  }, 100);
}

Worker Pool Pattern ๐ŸŠโ€โ™‚๏ธ

Manage a pool of reusable workers efficiently!

// ๐Ÿ“ workerPool.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';

interface PooledWorker {
  worker: Worker;
  busy: boolean;
}

class WorkerPool extends EventEmitter {
  private workers: PooledWorker[] = [];
  private taskQueue: Array<{
    task: any;
    resolve: (value: any) => void;
    reject: (error: any) => void;
  }> = [];
  
  constructor(
    private workerScript: string,
    private poolSize: number = 4
  ) {
    super();
    this.initializePool();
  }
  
  // ๐Ÿ—๏ธ Initialize worker pool
  private initializePool(): void {
    for (let i = 0; i < this.poolSize; i++) {
      const worker = new Worker(this.workerScript);
      const pooledWorker: PooledWorker = {
        worker,
        busy: false
      };
      
      // ๐Ÿ‘‚ Handle worker messages
      worker.on('message', (result) => {
        pooledWorker.busy = false;
        this.emit('taskComplete', result);
        this.processNextTask();
      });
      
      worker.on('error', (error) => {
        pooledWorker.busy = false;
        this.emit('error', error);
        this.processNextTask();
      });
      
      this.workers.push(pooledWorker);
    }
  }
  
  // ๐Ÿ“ฅ Submit task to pool
  async execute<T>(task: any): Promise<T> {
    return new Promise((resolve, reject) => {
      this.taskQueue.push({ task, resolve, reject });
      this.processNextTask();
    });
  }
  
  // ๐Ÿ”„ Process next task in queue
  private processNextTask(): void {
    if (this.taskQueue.length === 0) return;
    
    // ๐Ÿ” Find available worker
    const availableWorker = this.workers.find(w => !w.busy);
    if (!availableWorker) return;
    
    // ๐Ÿ“ค Assign task to worker
    const { task, resolve, reject } = this.taskQueue.shift()!;
    availableWorker.busy = true;
    
    // Set up one-time handlers
    const messageHandler = (result: any) => {
      availableWorker.worker.off('message', messageHandler);
      availableWorker.worker.off('error', errorHandler);
      resolve(result);
    };
    
    const errorHandler = (error: any) => {
      availableWorker.worker.off('message', messageHandler);
      availableWorker.worker.off('error', errorHandler);
      reject(error);
    };
    
    availableWorker.worker.once('message', messageHandler);
    availableWorker.worker.once('error', errorHandler);
    availableWorker.worker.postMessage(task);
  }
  
  // ๐Ÿ›‘ Terminate all workers
  async terminate(): Promise<void> {
    await Promise.all(
      this.workers.map(w => w.worker.terminate())
    );
  }
}

// ๐ŸŽฏ Usage
const pool = new WorkerPool('./processor.js', 6);

// Execute multiple tasks
const tasks = Array.from({ length: 20 }, (_, i) => ({
  id: i,
  data: `Task ${i}`
}));

Promise.all(
  tasks.map(task => pool.execute(task))
).then(results => {
  console.log('All tasks completed!', results);
  pool.terminate();
});

โš ๏ธ Common Pitfalls and Solutions

โŒ Wrong: Sharing Non-Transferable Objects

// โŒ This won't work - functions can't be sent to workers
const worker = new Worker('./worker.js');
worker.postMessage({
  data: [1, 2, 3],
  processor: (x: number) => x * 2 // Functions can't be serialized!
});

โœ… Correct: Send Only Serializable Data

// โœ… Send data and operation type instead
const worker = new Worker('./worker.js');
worker.postMessage({
  data: [1, 2, 3],
  operation: 'double' // Worker knows what to do
});

// In worker:
if (data.operation === 'double') {
  const result = data.data.map(x => x * 2);
  parentPort.postMessage(result);
}

โŒ Wrong: Creating Too Many Workers

// โŒ Don't create a new worker for each task
async function processTasks(tasks: Task[]) {
  for (const task of tasks) {
    const worker = new Worker('./worker.js'); // Memory explosion! ๐Ÿ’ฅ
    worker.postMessage(task);
  }
}

โœ… Correct: Use a Worker Pool

// โœ… Reuse workers efficiently
const workerPool = new WorkerPool('./worker.js', 4);

async function processTasks(tasks: Task[]) {
  const results = await Promise.all(
    tasks.map(task => workerPool.execute(task))
  );
  return results;
}

๐Ÿ› ๏ธ Best Practices

1. Use TypeScript Types for Messages ๐Ÿ“

// Define clear message types
interface WorkerMessage {
  type: 'process' | 'cancel' | 'status';
  payload: any;
}

interface WorkerResponse {
  type: 'result' | 'error' | 'progress';
  payload: any;
}

// Type-safe message handling
worker.on('message', (msg: WorkerResponse) => {
  switch (msg.type) {
    case 'result':
      handleResult(msg.payload);
      break;
    case 'error':
      handleError(msg.payload);
      break;
    case 'progress':
      updateProgress(msg.payload);
      break;
  }
});

2. Implement Graceful Shutdown ๐Ÿ›‘

class GracefulWorkerPool {
  private shuttingDown = false;
  
  async shutdown(): Promise<void> {
    this.shuttingDown = true;
    
    // Wait for current tasks to complete
    await this.waitForPendingTasks();
    
    // Terminate workers
    await Promise.all(
      this.workers.map(async (worker) => {
        worker.postMessage({ type: 'shutdown' });
        await new Promise(resolve => 
          worker.once('exit', resolve)
        );
      })
    );
  }
}

3. Monitor Worker Health ๐Ÿฅ

class HealthyWorkerPool {
  private workerHealth = new Map<Worker, {
    lastHeartbeat: number;
    taskCount: number;
  }>();
  
  private startHealthCheck(): void {
    setInterval(() => {
      const now = Date.now();
      
      for (const [worker, health] of this.workerHealth) {
        if (now - health.lastHeartbeat > 30000) {
          console.warn('โš ๏ธ Worker unresponsive, restarting...');
          this.restartWorker(worker);
        }
      }
    }, 10000);
  }
}

๐Ÿงช Hands-On Exercise

Ready to put your skills to the test? Letโ€™s build a parallel web scraper! ๐Ÿ•ท๏ธ

Challenge: Parallel Web Scraper ๐ŸŒ

Create a web scraper that fetches multiple URLs in parallel using worker threads:

Requirements:

  • โœ… Create a worker that fetches and parses HTML
  • โœ… Use a worker pool to fetch multiple URLs concurrently
  • โœ… Extract specific data (title, description, images)
  • โœ… Handle errors gracefully
  • โœ… Implement progress tracking

Starter Code:

// Your task: Complete this implementation!
interface ScrapingTask {
  url: string;
  selectors: {
    title?: string;
    description?: string;
    images?: string;
  };
}

interface ScrapingResult {
  url: string;
  data: {
    title?: string;
    description?: string;
    images?: string[];
  };
  error?: string;
}

// TODO: Implement the worker
// TODO: Implement the scraper class
// TODO: Add error handling and retries
๐Ÿ’ก Solution
// ๐Ÿ“ scraperWorker.ts
import { parentPort } from 'worker_threads';
import fetch from 'node-fetch';
import * as cheerio from 'cheerio';

interface ScrapingTask {
  url: string;
  selectors: {
    title?: string;
    description?: string;
    images?: string;
  };
}

if (parentPort) {
  parentPort.on('message', async (task: ScrapingTask) => {
    try {
      // ๐ŸŒ Fetch the webpage
      const response = await fetch(task.url);
      const html = await response.text();
      
      // ๐Ÿ” Parse HTML
      const $ = cheerio.load(html);
      
      // ๐Ÿ“Š Extract data
      const data = {
        title: task.selectors.title 
          ? $(task.selectors.title).text().trim()
          : $('title').text().trim(),
        
        description: task.selectors.description
          ? $(task.selectors.description).text().trim()
          : $('meta[name="description"]').attr('content'),
        
        images: task.selectors.images
          ? $(task.selectors.images).map((_, el) => $(el).attr('src')).get()
          : $('img').map((_, el) => $(el).attr('src')).get()
      };
      
      // ๐Ÿ“ค Send result
      parentPort.postMessage({
        url: task.url,
        data,
        error: null
      });
      
    } catch (error) {
      // ๐Ÿ“ค Send error
      parentPort.postMessage({
        url: task.url,
        data: null,
        error: error.message
      });
    }
  });
}

// ๐Ÿ“ webScraper.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
import path from 'path';

class WebScraper extends EventEmitter {
  private workers: Worker[] = [];
  private activeJobs = 0;
  private results: ScrapingResult[] = [];
  
  constructor(private workerCount: number = 4) {
    super();
    this.initializeWorkers();
  }
  
  // ๐Ÿ—๏ธ Initialize worker pool
  private initializeWorkers(): void {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(
        path.join(__dirname, 'scraperWorker.js')
      );
      
      worker.on('message', (result: ScrapingResult) => {
        this.activeJobs--;
        this.results.push(result);
        
        // ๐Ÿ“Š Emit progress
        this.emit('progress', {
          completed: this.results.length,
          active: this.activeJobs
        });
        
        if (result.error) {
          this.emit('error', result);
        } else {
          this.emit('scraped', result);
        }
      });
      
      this.workers.push(worker);
    }
  }
  
  // ๐Ÿ•ท๏ธ Scrape multiple URLs
  async scrapeUrls(tasks: ScrapingTask[]): Promise<ScrapingResult[]> {
    this.results = [];
    this.activeJobs = tasks.length;
    
    return new Promise((resolve) => {
      let taskIndex = 0;
      let workerIndex = 0;
      
      // ๐Ÿ“Š Track completion
      const checkCompletion = () => {
        if (this.results.length === tasks.length) {
          resolve(this.results);
        }
      };
      
      this.on('scraped', checkCompletion);
      this.on('error', checkCompletion);
      
      // ๐Ÿ“จ Distribute tasks
      while (taskIndex < tasks.length) {
        this.workers[workerIndex].postMessage(tasks[taskIndex]);
        taskIndex++;
        workerIndex = (workerIndex + 1) % this.workerCount;
      }
    });
  }
  
  // ๐Ÿ›‘ Cleanup
  async terminate(): Promise<void> {
    await Promise.all(
      this.workers.map(w => w.terminate())
    );
  }
}

// ๐Ÿš€ Usage example
async function scrapeWebsites() {
  const scraper = new WebScraper(6); // 6 parallel workers
  
  // ๐Ÿ“Š Monitor progress
  scraper.on('progress', ({ completed, active }) => {
    console.log(`Progress: ${completed} completed, ${active} active`);
  });
  
  scraper.on('scraped', (result) => {
    console.log(`โœ… Scraped ${result.url}: ${result.data.title}`);
  });
  
  // ๐ŸŒ Define scraping tasks
  const tasks: ScrapingTask[] = [
    {
      url: 'https://example.com',
      selectors: {
        title: 'h1',
        description: '.description',
        images: '.gallery img'
      }
    },
    // Add more URLs...
  ];
  
  // ๐Ÿ•ท๏ธ Start scraping
  console.time('Scraping Time');
  const results = await scraper.scrapeUrls(tasks);
  console.timeEnd('Scraping Time');
  
  // ๐Ÿ“Š Display results
  console.log(`\n๐Ÿ“Š Scraping Summary:`);
  console.log(`Total URLs: ${results.length}`);
  console.log(`Successful: ${results.filter(r => !r.error).length}`);
  console.log(`Failed: ${results.filter(r => r.error).length}`);
  
  await scraper.terminate();
}

๐ŸŽ‰ Excellent work! Youโ€™ve built a powerful parallel web scraper!

๐ŸŽ“ Key Takeaways

Youโ€™ve just unlocked the power of parallel processing! Hereโ€™s what youโ€™ve mastered:

  1. Worker Threads Basics ๐Ÿงต - Creating and communicating with workers
  2. Worker Pools ๐ŸŠโ€โ™‚๏ธ - Efficiently managing multiple workers
  3. Parallel Processing โšก - Distributing work across CPU cores
  4. Message Passing ๐Ÿ“จ - Type-safe communication between threads
  5. Error Handling ๐Ÿ›ก๏ธ - Graceful error recovery in workers
  6. Performance ๐Ÿš€ - Dramatic speedups for CPU-intensive tasks

๐Ÿค Next Steps

Ready to explore more parallel processing techniques? Check out these tutorials:

  • ๐Ÿ“š Cluster Module: Scale across multiple processes
  • ๐Ÿ”„ Async Patterns: Master advanced async operations
  • ๐Ÿงฎ WebAssembly: Near-native performance in TypeScript
  • ๐ŸŒ Microservices: Distributed system patterns

Keep parallelizing, and remember: with great power comes great performance! Your TypeScript apps are now turbocharged! ๐Ÿš€โœจ

Happy coding! ๐ŸŽ‰