Prerequisites
- Basic understanding of JavaScript ๐
- TypeScript installation โก
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write type-safe code โจ
๐ Worker Threads: Parallel Processing
๐ฏ Introduction
Ever felt like your computer was running a marathon with one leg? ๐โโ๏ธ Thatโs what happens when JavaScript uses just one thread! Today, weโre unleashing the full power of your CPU with Worker Threads โ turning your single-lane road into a multi-lane highway! ๐ฃ๏ธ
Worker threads let you run JavaScript code in parallel, making heavy computations, data processing, and complex tasks run faster than ever. Itโs like having multiple chefs in the kitchen instead of just one! ๐จโ๐ณ๐ฉโ๐ณ
Ready to supercharge your TypeScript applications? Letโs dive in! ๐
๐ Understanding Worker Threads
Think of worker threads like having multiple employees in your office instead of doing everything yourself. Each worker can handle their own task independently, without blocking others! ๐ข
What Are Worker Threads? ๐ค
Worker threads are separate JavaScript execution contexts that run in parallel with your main thread. Hereโs what makes them special:
// ๐ญ The Main Thread (The Boss)
// Handles UI, events, and coordinates workers
const mainThread = {
task: "Manage the application",
canBlock: false, // Must stay responsive!
coordinates: ["worker1", "worker2", "worker3"]
};
// ๐ท Worker Threads (The Employees)
// Handle heavy computations in the background
const workerThread = {
task: "Process data intensively",
canBlock: true, // Can work without interrupting others
communicates: "Through messages"
};
Why Use Worker Threads? ๐ก
- CPU-Intensive Tasks: Process large datasets without freezing your app
- Parallel Processing: Use all CPU cores effectively
- Better Performance: Reduce processing time significantly
- Responsive UI: Keep your application smooth and responsive
๐ง Basic Syntax and Usage
Letโs create our first worker thread! First, install the necessary types:
npm install --save-dev @types/node
Creating a Worker Thread ๐๏ธ
// ๐ worker.ts - Our worker file
import { parentPort, workerData } from 'worker_threads';
// ๐ Worker is ready to receive tasks!
if (parentPort) {
// ๐จ Listen for messages from the main thread
parentPort.on('message', (task: string) => {
console.log(`Worker received: ${task}`);
// ๐ช Do some heavy work
const result = performHeavyTask(task);
// ๐ค Send result back
parentPort.postMessage({
success: true,
result
});
});
}
function performHeavyTask(task: string): string {
// ๐ฅ Simulate intensive work
return `Completed: ${task}`;
}
// ๐ฏ Process initial data if provided
if (workerData) {
console.log('Initial data:', workerData);
}
Using the Worker from Main Thread ๐ฎ
// ๐ main.ts - Main application file
import { Worker } from 'worker_threads';
import path from 'path';
// ๐ Create a new worker
const worker = new Worker(
path.join(__dirname, 'worker.ts'),
{
workerData: { message: 'Hello Worker!' }
}
);
// ๐จ Send a task to the worker
worker.postMessage('Process this data');
// ๐ Listen for results
worker.on('message', (result) => {
console.log('Received from worker:', result);
});
// โ ๏ธ Handle errors
worker.on('error', (error) => {
console.error('Worker error:', error);
});
// ๐ Worker finished
worker.on('exit', (code) => {
console.log(`Worker stopped with exit code ${code}`);
});
๐ก Practical Examples
Example 1: Image Processing Pipeline ๐ผ๏ธ
Letโs build a parallel image processor that can handle multiple images simultaneously!
// ๐ imageWorker.ts
import { parentPort, workerData } from 'worker_threads';
interface ImageTask {
id: string;
path: string;
operations: string[];
}
interface ProcessResult {
id: string;
success: boolean;
processedPath?: string;
error?: string;
}
// ๐จ Process images in the worker
if (parentPort) {
parentPort.on('message', async (task: ImageTask) => {
try {
console.log(`๐ผ๏ธ Processing image: ${task.path}`);
// Simulate image processing
const processed = await processImage(task);
// ๐ค Send success result
parentPort.postMessage({
id: task.id,
success: true,
processedPath: processed
} as ProcessResult);
} catch (error) {
// ๐ค Send error result
parentPort.postMessage({
id: task.id,
success: false,
error: error.message
} as ProcessResult);
}
});
}
async function processImage(task: ImageTask): Promise<string> {
// ๐ฏ Apply each operation
for (const operation of task.operations) {
console.log(` โจ Applying ${operation}...`);
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 500));
}
return `processed_${task.id}.jpg`;
}
// ๐ imageProcessor.ts - Main thread controller
import { Worker } from 'worker_threads';
import path from 'path';
class ImageProcessor {
private workers: Worker[] = [];
private taskQueue: ImageTask[] = [];
private busyWorkers = new Set<Worker>();
constructor(private workerCount: number = 4) {
this.initializeWorkers();
}
// ๐๏ธ Create worker pool
private initializeWorkers(): void {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(
path.join(__dirname, 'imageWorker.js')
);
// ๐ Set up event handlers
worker.on('message', (result: ProcessResult) => {
console.log(`โ
Image ${result.id} processed!`);
this.busyWorkers.delete(worker);
this.processNextTask(worker);
});
worker.on('error', (error) => {
console.error('โ Worker error:', error);
});
this.workers.push(worker);
}
}
// ๐ฅ Add task to queue
async processImage(task: ImageTask): Promise<void> {
this.taskQueue.push(task);
this.distributeWork();
}
// ๐ฏ Distribute work to available workers
private distributeWork(): void {
for (const worker of this.workers) {
if (!this.busyWorkers.has(worker) && this.taskQueue.length > 0) {
const task = this.taskQueue.shift()!;
this.busyWorkers.add(worker);
worker.postMessage(task);
}
}
}
// ๐ Process next task
private processNextTask(worker: Worker): void {
if (this.taskQueue.length > 0) {
const task = this.taskQueue.shift()!;
this.busyWorkers.add(worker);
worker.postMessage(task);
}
}
// ๐ Cleanup workers
async shutdown(): Promise<void> {
await Promise.all(
this.workers.map(worker => worker.terminate())
);
}
}
// ๐ Usage example
async function main() {
const processor = new ImageProcessor(4); // 4 parallel workers
// ๐ธ Process multiple images
const images = [
{ id: '1', path: 'photo1.jpg', operations: ['resize', 'blur'] },
{ id: '2', path: 'photo2.jpg', operations: ['crop', 'sharpen'] },
{ id: '3', path: 'photo3.jpg', operations: ['rotate', 'filter'] },
// ... more images
];
// ๐จ Process all images in parallel
for (const image of images) {
await processor.processImage(image);
}
// Wait a bit for processing
setTimeout(() => processor.shutdown(), 5000);
}
Example 2: Data Analysis Engine ๐
Build a parallel data analyzer for processing large datasets!
// ๐ dataWorker.ts
import { parentPort, workerData } from 'worker_threads';
interface DataChunk {
id: number;
data: number[];
operation: 'sum' | 'average' | 'max' | 'min';
}
interface AnalysisResult {
id: number;
result: number;
processTime: number;
}
// ๐ Analyze data chunks
if (parentPort) {
parentPort.on('message', (chunk: DataChunk) => {
const startTime = Date.now();
let result: number;
switch (chunk.operation) {
case 'sum':
result = chunk.data.reduce((a, b) => a + b, 0);
break;
case 'average':
result = chunk.data.reduce((a, b) => a + b, 0) / chunk.data.length;
break;
case 'max':
result = Math.max(...chunk.data);
break;
case 'min':
result = Math.min(...chunk.data);
break;
}
// ๐ค Send analysis result
parentPort.postMessage({
id: chunk.id,
result,
processTime: Date.now() - startTime
} as AnalysisResult);
});
}
// ๐ dataAnalyzer.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
class DataAnalyzer extends EventEmitter {
private workers: Worker[] = [];
private results = new Map<number, number>();
private pendingChunks = 0;
constructor(private workerCount: number = 4) {
super();
this.initializeWorkers();
}
// ๐ Analyze large dataset in parallel
async analyzeDataset(
data: number[],
operation: 'sum' | 'average' | 'max' | 'min',
chunkSize: number = 10000
): Promise<number> {
// ๐ช Split data into chunks
const chunks: DataChunk[] = [];
for (let i = 0; i < data.length; i += chunkSize) {
chunks.push({
id: chunks.length,
data: data.slice(i, i + chunkSize),
operation
});
}
this.pendingChunks = chunks.length;
this.results.clear();
// ๐จ Distribute chunks to workers
return new Promise((resolve) => {
let workerIndex = 0;
// Listen for completion
this.once('analysisComplete', () => {
const finalResult = this.aggregateResults(operation);
resolve(finalResult);
});
// Send chunks to workers
for (const chunk of chunks) {
this.workers[workerIndex].postMessage(chunk);
workerIndex = (workerIndex + 1) % this.workerCount;
}
});
}
// ๐๏ธ Initialize worker pool
private initializeWorkers(): void {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(
path.join(__dirname, 'dataWorker.js')
);
worker.on('message', (result: AnalysisResult) => {
console.log(
`โก Chunk ${result.id} processed in ${result.processTime}ms`
);
this.results.set(result.id, result.result);
this.pendingChunks--;
if (this.pendingChunks === 0) {
this.emit('analysisComplete');
}
});
this.workers.push(worker);
}
}
// ๐ฏ Aggregate results from all workers
private aggregateResults(operation: string): number {
const values = Array.from(this.results.values());
switch (operation) {
case 'sum':
case 'average':
return values.reduce((a, b) => a + b, 0);
case 'max':
return Math.max(...values);
case 'min':
return Math.min(...values);
default:
return 0;
}
}
}
// ๐ Usage example
async function analyzeMarketData() {
const analyzer = new DataAnalyzer(8); // 8 parallel workers
// ๐ Generate large dataset (1 million data points)
const marketData = Array.from(
{ length: 1_000_000 },
() => Math.random() * 1000
);
console.time('Analysis Time');
// ๐ Run parallel analysis
const sum = await analyzer.analyzeDataset(marketData, 'sum');
const avg = await analyzer.analyzeDataset(marketData, 'average');
const max = await analyzer.analyzeDataset(marketData, 'max');
console.timeEnd('Analysis Time');
console.log(`
๐ Market Analysis Results:
Sum: $${sum.toFixed(2)}
Average: $${avg.toFixed(2)}
Maximum: $${max.toFixed(2)}
`);
}
Example 3: Crypto Mining Simulator โ๏ธ
Letโs build a fun crypto mining simulator using worker threads!
// ๐ miningWorker.ts
import { parentPort, workerData } from 'worker_threads';
import crypto from 'crypto';
interface MiningTask {
blockData: string;
difficulty: number;
startNonce: number;
range: number;
}
interface MiningResult {
found: boolean;
nonce?: number;
hash?: string;
attempts: number;
}
// โ๏ธ Mine for valid hash
if (parentPort) {
parentPort.on('message', (task: MiningTask) => {
console.log(`โ๏ธ Worker mining with nonce range ${task.startNonce} - ${task.startNonce + task.range}`);
const target = '0'.repeat(task.difficulty);
let attempts = 0;
for (let nonce = task.startNonce; nonce < task.startNonce + task.range; nonce++) {
attempts++;
// ๐ Calculate hash
const data = task.blockData + nonce;
const hash = crypto.createHash('sha256').update(data).digest('hex');
// ๐ฏ Check if we found a valid hash
if (hash.startsWith(target)) {
parentPort.postMessage({
found: true,
nonce,
hash,
attempts
} as MiningResult);
return;
}
// ๐ Report progress every 10000 attempts
if (attempts % 10000 === 0) {
parentPort.postMessage({
found: false,
attempts
} as MiningResult);
}
}
// ๐ Didn't find in this range
parentPort.postMessage({
found: false,
attempts
} as MiningResult);
});
}
// ๐ cryptoMiner.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
class CryptoMiner extends EventEmitter {
private workers: Worker[] = [];
private mining = false;
private totalAttempts = 0;
constructor(private workerCount: number = 4) {
super();
}
// โ๏ธ Start mining a block
async mineBlock(blockData: string, difficulty: number): Promise<{
nonce: number;
hash: string;
attempts: number;
timeSeconds: number;
}> {
this.mining = true;
this.totalAttempts = 0;
const startTime = Date.now();
// ๐๏ธ Create workers for this mining session
this.createWorkers();
return new Promise((resolve) => {
let currentNonce = 0;
const rangePerWorker = 100000;
// ๐จ Distribute work
const distributeWork = () => {
if (!this.mining) return;
for (const worker of this.workers) {
worker.postMessage({
blockData,
difficulty,
startNonce: currentNonce,
range: rangePerWorker
} as MiningTask);
currentNonce += rangePerWorker;
}
};
// ๐ Handle worker results
const handleResult = (worker: Worker) => (result: MiningResult) => {
this.totalAttempts += result.attempts;
if (result.found) {
// ๐ Found valid hash!
this.mining = false;
const timeSeconds = (Date.now() - startTime) / 1000;
resolve({
nonce: result.nonce!,
hash: result.hash!,
attempts: this.totalAttempts,
timeSeconds
});
this.stopMining();
} else if (this.mining) {
// ๐ช Keep mining
worker.postMessage({
blockData,
difficulty,
startNonce: currentNonce,
range: rangePerWorker
} as MiningTask);
currentNonce += rangePerWorker;
// ๐ Emit progress
this.emit('progress', {
attempts: this.totalAttempts,
hashRate: this.totalAttempts / ((Date.now() - startTime) / 1000)
});
}
};
// Set up workers with handlers
for (const worker of this.workers) {
worker.on('message', handleResult(worker));
}
// ๐ Start mining!
distributeWork();
});
}
// ๐๏ธ Create worker pool
private createWorkers(): void {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(
path.join(__dirname, 'miningWorker.js')
);
worker.on('error', (error) => {
console.error('โ Mining worker error:', error);
});
this.workers.push(worker);
}
}
// ๐ Stop all mining
private async stopMining(): Promise<void> {
await Promise.all(
this.workers.map(worker => worker.terminate())
);
this.workers = [];
}
}
// ๐ Usage example
async function simulateMining() {
const miner = new CryptoMiner(8); // 8 parallel workers
// ๐ Track mining progress
miner.on('progress', ({ attempts, hashRate }) => {
console.log(`โก Mining: ${attempts} attempts | ${hashRate.toFixed(0)} H/s`);
});
console.log('โ๏ธ Starting crypto mining simulation...\n');
// Mine blocks with increasing difficulty
for (let difficulty = 4; difficulty <= 6; difficulty++) {
console.log(`\n๐ฏ Mining block with difficulty ${difficulty}...`);
const blockData = `Block #${Date.now()} | Previous: 0x1234...`;
const result = await miner.mineBlock(blockData, difficulty);
console.log(`
โ
Block mined successfully!
๐ Nonce: ${result.nonce}
๐ Hash: ${result.hash}
โฑ๏ธ Time: ${result.timeSeconds.toFixed(2)} seconds
๐ช Attempts: ${result.attempts.toLocaleString()}
`);
}
}
๐ Advanced Concepts
SharedArrayBuffer for Shared Memory ๐ง
Share memory between threads for ultra-fast communication!
// ๐ sharedMemoryExample.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
if (isMainThread) {
// ๐ง Create shared memory
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
// ๐ Initialize shared data
sharedArray[0] = 0; // Counter
// ๐๏ธ Create workers with shared memory
const workers: Worker[] = [];
for (let i = 0; i < 4; i++) {
const worker = new Worker(__filename, {
workerData: { sharedBuffer, workerId: i }
});
workers.push(worker);
}
// ๐ Monitor shared counter
setInterval(() => {
console.log(`Shared counter value: ${sharedArray[0]}`);
}, 1000);
} else {
// ๐ท Worker thread
const { sharedBuffer, workerId } = workerData;
const sharedArray = new Int32Array(sharedBuffer);
// ๐ Increment shared counter atomically
setInterval(() => {
Atomics.add(sharedArray, 0, 1);
console.log(`Worker ${workerId} incremented counter`);
}, 100);
}
Worker Pool Pattern ๐โโ๏ธ
Manage a pool of reusable workers efficiently!
// ๐ workerPool.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
interface PooledWorker {
worker: Worker;
busy: boolean;
}
class WorkerPool extends EventEmitter {
private workers: PooledWorker[] = [];
private taskQueue: Array<{
task: any;
resolve: (value: any) => void;
reject: (error: any) => void;
}> = [];
constructor(
private workerScript: string,
private poolSize: number = 4
) {
super();
this.initializePool();
}
// ๐๏ธ Initialize worker pool
private initializePool(): void {
for (let i = 0; i < this.poolSize; i++) {
const worker = new Worker(this.workerScript);
const pooledWorker: PooledWorker = {
worker,
busy: false
};
// ๐ Handle worker messages
worker.on('message', (result) => {
pooledWorker.busy = false;
this.emit('taskComplete', result);
this.processNextTask();
});
worker.on('error', (error) => {
pooledWorker.busy = false;
this.emit('error', error);
this.processNextTask();
});
this.workers.push(pooledWorker);
}
}
// ๐ฅ Submit task to pool
async execute<T>(task: any): Promise<T> {
return new Promise((resolve, reject) => {
this.taskQueue.push({ task, resolve, reject });
this.processNextTask();
});
}
// ๐ Process next task in queue
private processNextTask(): void {
if (this.taskQueue.length === 0) return;
// ๐ Find available worker
const availableWorker = this.workers.find(w => !w.busy);
if (!availableWorker) return;
// ๐ค Assign task to worker
const { task, resolve, reject } = this.taskQueue.shift()!;
availableWorker.busy = true;
// Set up one-time handlers
const messageHandler = (result: any) => {
availableWorker.worker.off('message', messageHandler);
availableWorker.worker.off('error', errorHandler);
resolve(result);
};
const errorHandler = (error: any) => {
availableWorker.worker.off('message', messageHandler);
availableWorker.worker.off('error', errorHandler);
reject(error);
};
availableWorker.worker.once('message', messageHandler);
availableWorker.worker.once('error', errorHandler);
availableWorker.worker.postMessage(task);
}
// ๐ Terminate all workers
async terminate(): Promise<void> {
await Promise.all(
this.workers.map(w => w.worker.terminate())
);
}
}
// ๐ฏ Usage
const pool = new WorkerPool('./processor.js', 6);
// Execute multiple tasks
const tasks = Array.from({ length: 20 }, (_, i) => ({
id: i,
data: `Task ${i}`
}));
Promise.all(
tasks.map(task => pool.execute(task))
).then(results => {
console.log('All tasks completed!', results);
pool.terminate();
});
โ ๏ธ Common Pitfalls and Solutions
โ Wrong: Sharing Non-Transferable Objects
// โ This won't work - functions can't be sent to workers
const worker = new Worker('./worker.js');
worker.postMessage({
data: [1, 2, 3],
processor: (x: number) => x * 2 // Functions can't be serialized!
});
โ Correct: Send Only Serializable Data
// โ
Send data and operation type instead
const worker = new Worker('./worker.js');
worker.postMessage({
data: [1, 2, 3],
operation: 'double' // Worker knows what to do
});
// In worker:
if (data.operation === 'double') {
const result = data.data.map(x => x * 2);
parentPort.postMessage(result);
}
โ Wrong: Creating Too Many Workers
// โ Don't create a new worker for each task
async function processTasks(tasks: Task[]) {
for (const task of tasks) {
const worker = new Worker('./worker.js'); // Memory explosion! ๐ฅ
worker.postMessage(task);
}
}
โ Correct: Use a Worker Pool
// โ
Reuse workers efficiently
const workerPool = new WorkerPool('./worker.js', 4);
async function processTasks(tasks: Task[]) {
const results = await Promise.all(
tasks.map(task => workerPool.execute(task))
);
return results;
}
๐ ๏ธ Best Practices
1. Use TypeScript Types for Messages ๐
// Define clear message types
interface WorkerMessage {
type: 'process' | 'cancel' | 'status';
payload: any;
}
interface WorkerResponse {
type: 'result' | 'error' | 'progress';
payload: any;
}
// Type-safe message handling
worker.on('message', (msg: WorkerResponse) => {
switch (msg.type) {
case 'result':
handleResult(msg.payload);
break;
case 'error':
handleError(msg.payload);
break;
case 'progress':
updateProgress(msg.payload);
break;
}
});
2. Implement Graceful Shutdown ๐
class GracefulWorkerPool {
private shuttingDown = false;
async shutdown(): Promise<void> {
this.shuttingDown = true;
// Wait for current tasks to complete
await this.waitForPendingTasks();
// Terminate workers
await Promise.all(
this.workers.map(async (worker) => {
worker.postMessage({ type: 'shutdown' });
await new Promise(resolve =>
worker.once('exit', resolve)
);
})
);
}
}
3. Monitor Worker Health ๐ฅ
class HealthyWorkerPool {
private workerHealth = new Map<Worker, {
lastHeartbeat: number;
taskCount: number;
}>();
private startHealthCheck(): void {
setInterval(() => {
const now = Date.now();
for (const [worker, health] of this.workerHealth) {
if (now - health.lastHeartbeat > 30000) {
console.warn('โ ๏ธ Worker unresponsive, restarting...');
this.restartWorker(worker);
}
}
}, 10000);
}
}
๐งช Hands-On Exercise
Ready to put your skills to the test? Letโs build a parallel web scraper! ๐ท๏ธ
Challenge: Parallel Web Scraper ๐
Create a web scraper that fetches multiple URLs in parallel using worker threads:
Requirements:
- โ Create a worker that fetches and parses HTML
- โ Use a worker pool to fetch multiple URLs concurrently
- โ Extract specific data (title, description, images)
- โ Handle errors gracefully
- โ Implement progress tracking
Starter Code:
// Your task: Complete this implementation!
interface ScrapingTask {
url: string;
selectors: {
title?: string;
description?: string;
images?: string;
};
}
interface ScrapingResult {
url: string;
data: {
title?: string;
description?: string;
images?: string[];
};
error?: string;
}
// TODO: Implement the worker
// TODO: Implement the scraper class
// TODO: Add error handling and retries
๐ก Solution
// ๐ scraperWorker.ts
import { parentPort } from 'worker_threads';
import fetch from 'node-fetch';
import * as cheerio from 'cheerio';
interface ScrapingTask {
url: string;
selectors: {
title?: string;
description?: string;
images?: string;
};
}
if (parentPort) {
parentPort.on('message', async (task: ScrapingTask) => {
try {
// ๐ Fetch the webpage
const response = await fetch(task.url);
const html = await response.text();
// ๐ Parse HTML
const $ = cheerio.load(html);
// ๐ Extract data
const data = {
title: task.selectors.title
? $(task.selectors.title).text().trim()
: $('title').text().trim(),
description: task.selectors.description
? $(task.selectors.description).text().trim()
: $('meta[name="description"]').attr('content'),
images: task.selectors.images
? $(task.selectors.images).map((_, el) => $(el).attr('src')).get()
: $('img').map((_, el) => $(el).attr('src')).get()
};
// ๐ค Send result
parentPort.postMessage({
url: task.url,
data,
error: null
});
} catch (error) {
// ๐ค Send error
parentPort.postMessage({
url: task.url,
data: null,
error: error.message
});
}
});
}
// ๐ webScraper.ts
import { Worker } from 'worker_threads';
import { EventEmitter } from 'events';
import path from 'path';
class WebScraper extends EventEmitter {
private workers: Worker[] = [];
private activeJobs = 0;
private results: ScrapingResult[] = [];
constructor(private workerCount: number = 4) {
super();
this.initializeWorkers();
}
// ๐๏ธ Initialize worker pool
private initializeWorkers(): void {
for (let i = 0; i < this.workerCount; i++) {
const worker = new Worker(
path.join(__dirname, 'scraperWorker.js')
);
worker.on('message', (result: ScrapingResult) => {
this.activeJobs--;
this.results.push(result);
// ๐ Emit progress
this.emit('progress', {
completed: this.results.length,
active: this.activeJobs
});
if (result.error) {
this.emit('error', result);
} else {
this.emit('scraped', result);
}
});
this.workers.push(worker);
}
}
// ๐ท๏ธ Scrape multiple URLs
async scrapeUrls(tasks: ScrapingTask[]): Promise<ScrapingResult[]> {
this.results = [];
this.activeJobs = tasks.length;
return new Promise((resolve) => {
let taskIndex = 0;
let workerIndex = 0;
// ๐ Track completion
const checkCompletion = () => {
if (this.results.length === tasks.length) {
resolve(this.results);
}
};
this.on('scraped', checkCompletion);
this.on('error', checkCompletion);
// ๐จ Distribute tasks
while (taskIndex < tasks.length) {
this.workers[workerIndex].postMessage(tasks[taskIndex]);
taskIndex++;
workerIndex = (workerIndex + 1) % this.workerCount;
}
});
}
// ๐ Cleanup
async terminate(): Promise<void> {
await Promise.all(
this.workers.map(w => w.terminate())
);
}
}
// ๐ Usage example
async function scrapeWebsites() {
const scraper = new WebScraper(6); // 6 parallel workers
// ๐ Monitor progress
scraper.on('progress', ({ completed, active }) => {
console.log(`Progress: ${completed} completed, ${active} active`);
});
scraper.on('scraped', (result) => {
console.log(`โ
Scraped ${result.url}: ${result.data.title}`);
});
// ๐ Define scraping tasks
const tasks: ScrapingTask[] = [
{
url: 'https://example.com',
selectors: {
title: 'h1',
description: '.description',
images: '.gallery img'
}
},
// Add more URLs...
];
// ๐ท๏ธ Start scraping
console.time('Scraping Time');
const results = await scraper.scrapeUrls(tasks);
console.timeEnd('Scraping Time');
// ๐ Display results
console.log(`\n๐ Scraping Summary:`);
console.log(`Total URLs: ${results.length}`);
console.log(`Successful: ${results.filter(r => !r.error).length}`);
console.log(`Failed: ${results.filter(r => r.error).length}`);
await scraper.terminate();
}
๐ Excellent work! Youโve built a powerful parallel web scraper!
๐ Key Takeaways
Youโve just unlocked the power of parallel processing! Hereโs what youโve mastered:
- Worker Threads Basics ๐งต - Creating and communicating with workers
- Worker Pools ๐โโ๏ธ - Efficiently managing multiple workers
- Parallel Processing โก - Distributing work across CPU cores
- Message Passing ๐จ - Type-safe communication between threads
- Error Handling ๐ก๏ธ - Graceful error recovery in workers
- Performance ๐ - Dramatic speedups for CPU-intensive tasks
๐ค Next Steps
Ready to explore more parallel processing techniques? Check out these tutorials:
- ๐ Cluster Module: Scale across multiple processes
- ๐ Async Patterns: Master advanced async operations
- ๐งฎ WebAssembly: Near-native performance in TypeScript
- ๐ Microservices: Distributed system patterns
Keep parallelizing, and remember: with great power comes great performance! Your TypeScript apps are now turbocharged! ๐โจ
Happy coding! ๐