Prerequisites
- Basic understanding of programming concepts ๐
 - Python installation (3.8+) ๐
 - VS Code or preferred IDE ๐ป
 
What you'll learn
- Understand the actor model fundamentals ๐ฏ
 - Apply message passing in real projects ๐๏ธ
 - Debug common concurrency issues ๐
 - Write clean, scalable concurrent code โจ
 
๐ฏ Introduction
Welcome to the world of concurrent programming with the Actor Model! ๐ In this advanced tutorial, weโll explore how message passing can revolutionize the way you handle concurrency in Python.
Have you ever struggled with threads stepping on each otherโs toes? ๐ฑ Or dealt with the complexity of shared state and locks? The Actor Model offers an elegant solution where independent โactorsโ communicate through messages, eliminating many traditional concurrency headaches.
By the end of this tutorial, youโll be orchestrating actors like a maestro conducting a symphony! ๐ผ Letโs dive into this powerful paradigm! ๐โโ๏ธ
๐ Understanding the Actor Model
๐ค What is the Actor Model?
The Actor Model is like having a team of specialized workers ๐ทโโ๏ธ๐ทโโ๏ธ in separate offices, communicating only through messages. Think of it as a post office system ๐ฎ where each actor has its own mailbox and processes messages one at a time.
In Python terms, an actor is an independent unit that:
- โจ Has its own state (private data)
 - ๐ Processes messages sequentially
 - ๐ก๏ธ Communicates only through message passing
 - ๐จ Can create new actors
 - ๐ฏ Makes local decisions based on messages
 
๐ก Why Use the Actor Model?
Hereโs why developers love the Actor Model:
- No Shared State ๐: Each actor owns its data exclusively
 - Fault Isolation ๐ป: One actorโs failure doesnโt crash others
 - Scalability ๐: Easily distribute actors across cores or machines
 - Simpler Reasoning ๐ง: Think about one actor at a time
 
Real-world example: Imagine building a chat server ๐ฌ. With the Actor Model, each user connection can be an actor, handling messages independently without worrying about race conditions!
๐ง Basic Syntax and Usage
๐ Simple Actor Implementation
Letโs start with a friendly example using Pythonโs queue and threading:
import queue
import threading
from typing import Any, Callable, Dict
import time
# ๐ Hello, Actor Model!
class Actor:
    def __init__(self, name: str):
        self.name = name  # ๐ค Actor's name
        self.mailbox = queue.Queue()  # ๐ฎ Message inbox
        self.running = True  # ๐ฏ Actor state
        self.handlers: Dict[str, Callable] = {}  # ๐จ Message handlers
        
        # ๐ Start the actor's thread
        self.thread = threading.Thread(target=self._run)
        self.thread.start()
    
    def _run(self):
        """๐ Main actor loop - process messages"""
        while self.running:
            try:
                # ๐จ Wait for a message
                message = self.mailbox.get(timeout=0.1)
                self._handle_message(message)
            except queue.Empty:
                continue  # ๐ค No messages, keep waiting
    
    def _handle_message(self, message: Dict[str, Any]):
        """๐ฏ Process incoming message"""
        msg_type = message.get('type', 'unknown')
        if msg_type in self.handlers:
            self.handlers[msg_type](message)
        else:
            print(f"โ ๏ธ {self.name}: Unknown message type '{msg_type}'")
    
    def send(self, message: Dict[str, Any]):
        """๐ค Send message to this actor"""
        self.mailbox.put(message)
    
    def on(self, msg_type: str, handler: Callable):
        """๐จ Register a message handler"""
        self.handlers[msg_type] = handler
    
    def stop(self):
        """๐ Stop the actor"""
        self.running = False
        self.thread.join()
# ๐ฎ Let's create a simple actor!
greeter = Actor("Greeter")
# ๐ Define message handler
def handle_greet(message):
    name = message.get('name', 'Friend')
    print(f"โจ Greeter says: Hello, {name}! ๐")
# ๐ฏ Register the handler
greeter.on('greet', handle_greet)
# ๐ค Send a message
greeter.send({'type': 'greet', 'name': 'Python Developer'})
time.sleep(0.1)  # Give it time to process
greeter.stop()
๐ก Explanation: Notice how the actor processes messages sequentially in its own thread. No locks needed! The mailbox ensures thread-safe communication.
๐ฏ Common Actor Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Stateful Actor
class Counter(Actor):
    def __init__(self):
        super().__init__("Counter")
        self.count = 0  # ๐ฏ Private state
        
        # ๐จ Register handlers
        self.on('increment', self._increment)
        self.on('get_count', self._get_count)
    
    def _increment(self, message):
        self.count += 1
        print(f"โจ Count is now: {self.count}")
    
    def _get_count(self, message):
        reply_to = message.get('reply_to')
        if reply_to:
            reply_to.send({'type': 'count_result', 'count': self.count})
# ๐จ Pattern 2: Actor Supervision
class Supervisor(Actor):
    def __init__(self):
        super().__init__("Supervisor")
        self.workers = []  # ๐ฅ Child actors
        self.on('create_worker', self._create_worker)
        self.on('broadcast', self._broadcast)
    
    def _create_worker(self, message):
        worker = Actor(f"Worker-{len(self.workers)}")
        self.workers.append(worker)
        print(f"๐ Created {worker.name}")
    
    def _broadcast(self, message):
        # ๐ข Send to all workers
        for worker in self.workers:
            worker.send(message.get('payload', {}))
# ๐ Pattern 3: Request-Reply
class Calculator(Actor):
    def __init__(self):
        super().__init__("Calculator")
        self.on('calculate', self._calculate)
    
    def _calculate(self, message):
        operation = message.get('operation')
        a, b = message.get('a', 0), message.get('b', 0)
        
        result = None
        if operation == 'add':
            result = a + b
        elif operation == 'multiply':
            result = a * b
        
        # ๐ค Send reply
        reply_to = message.get('reply_to')
        if reply_to and result is not None:
            reply_to.send({'type': 'result', 'value': result})
๐ก Practical Examples
๐ Example 1: Order Processing System
Letโs build a real-world order processing system:
# ๐๏ธ Order processing with actors
class OrderProcessor(Actor):
    def __init__(self):
        super().__init__("OrderProcessor")
        self.inventory = Actor("Inventory")
        self.payment = Actor("PaymentGateway")
        self.shipping = Actor("ShippingService")
        
        # ๐จ Set up handlers
        self.on('process_order', self._process_order)
        self._setup_services()
    
    def _setup_services(self):
        # ๐ฆ Inventory service
        inventory_stock = {'๐': 10, '๐': 15, '๐ฎ': 20}
        
        def check_stock(msg):
            item = msg.get('item')
            quantity = msg.get('quantity', 1)
            available = inventory_stock.get(item, 0) >= quantity
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'stock_result',
                    'available': available,
                    'item': item
                })
                
            if available:
                inventory_stock[item] -= quantity
                print(f"โ
 Reserved {quantity} {item}")
        
        self.inventory.on('check_stock', check_stock)
        
        # ๐ณ Payment service
        def process_payment(msg):
            amount = msg.get('amount', 0)
            print(f"๐ฐ Processing payment of ${amount}")
            
            # Simulate payment processing
            time.sleep(0.1)
            success = amount < 100  # ๐
 Demo logic
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'payment_result',
                    'success': success
                })
        
        self.payment.on('process_payment', process_payment)
        
        # ๐ Shipping service
        def ship_order(msg):
            items = msg.get('items', [])
            address = msg.get('address', 'Unknown')
            print(f"๐ฆ Shipping {items} to {address}")
            
            reply_to = msg.get('reply_to')
            if reply_to:
                reply_to.send({
                    'type': 'shipping_result',
                    'tracking': f"TRACK-{hash(str(items))}"
                })
        
        self.shipping.on('ship_order', ship_order)
    
    def _process_order(self, message):
        order_id = message.get('order_id')
        items = message.get('items', [])
        total = message.get('total', 0)
        
        print(f"๐ Processing order {order_id}")
        
        # ๐ Check inventory for all items
        for item, quantity in items:
            self.inventory.send({
                'type': 'check_stock',
                'item': item,
                'quantity': quantity,
                'reply_to': self
            })
        
        # ๐ณ Process payment
        self.payment.send({
            'type': 'process_payment',
            'amount': total,
            'reply_to': self
        })
        
        # Note: In real system, you'd wait for responses
        # before proceeding to shipping
# ๐ฎ Let's use it!
order_system = OrderProcessor()
# ๐ค Place an order
order_system.send({
    'type': 'process_order',
    'order_id': 'ORD-001',
    'items': [('๐', 2), ('๐', 1)],
    'total': 35.99
})
time.sleep(0.5)  # Let it process
๐ฏ Try it yourself: Add a notification actor that sends order updates to customers!
๐ฎ Example 2: Game Server with Actors
Letโs make a multiplayer game server:
# ๐ Game server with actor model
class GameRoom(Actor):
    def __init__(self, room_id: str):
        super().__init__(f"GameRoom-{room_id}")
        self.room_id = room_id
        self.players = {}  # ๐ฅ Player actors
        self.game_state = {
            'scores': {},
            'round': 1,
            'active': False
        }
        
        # ๐ฏ Register handlers
        self.on('join', self._handle_join)
        self.on('leave', self._handle_leave)
        self.on('player_action', self._handle_action)
        self.on('start_game', self._start_game)
    
    def _handle_join(self, message):
        player_id = message.get('player_id')
        player_name = message.get('name', f'Player-{player_id}')
        
        # ๐ฎ Create player actor
        player = PlayerActor(player_id, player_name, self)
        self.players[player_id] = player
        self.game_state['scores'][player_id] = 0
        
        print(f"โจ {player_name} joined room {self.room_id}!")
        
        # ๐ข Notify other players
        self._broadcast({
            'type': 'player_joined',
            'player': player_name
        }, exclude=player_id)
    
    def _handle_leave(self, message):
        player_id = message.get('player_id')
        if player_id in self.players:
            player = self.players[player_id]
            player.stop()
            del self.players[player_id]
            del self.game_state['scores'][player_id]
            
            print(f"๐ Player {player_id} left the room")
    
    def _handle_action(self, message):
        player_id = message.get('player_id')
        action = message.get('action')
        
        if action == 'score':
            points = message.get('points', 1)
            self.game_state['scores'][player_id] += points
            
            # ๐ Check for winner
            if self.game_state['scores'][player_id] >= 10:
                self._end_game(player_id)
            else:
                self._broadcast({
                    'type': 'score_update',
                    'scores': self.game_state['scores']
                })
    
    def _start_game(self, message):
        self.game_state['active'] = True
        print(f"๐ฎ Game started in room {self.room_id}!")
        
        self._broadcast({
            'type': 'game_started',
            'round': self.game_state['round']
        })
    
    def _end_game(self, winner_id):
        self.game_state['active'] = False
        winner_name = self.players[winner_id].name
        
        print(f"๐ {winner_name} wins in room {self.room_id}!")
        
        self._broadcast({
            'type': 'game_ended',
            'winner': winner_name,
            'final_scores': self.game_state['scores']
        })
    
    def _broadcast(self, message, exclude=None):
        """๐ข Send message to all players"""
        for player_id, player in self.players.items():
            if player_id != exclude:
                player.send(message)
class PlayerActor(Actor):
    def __init__(self, player_id: str, name: str, room: GameRoom):
        super().__init__(f"Player-{player_id}")
        self.player_id = player_id
        self.name = name
        self.room = room
        self.score = 0
        
        # ๐ฏ Register handlers
        self.on('game_update', self._handle_update)
        self.on('power_up', self._use_power_up)
    
    def _handle_update(self, message):
        update_type = message.get('type')
        print(f"๐ฑ {self.name} received: {update_type}")
    
    def _use_power_up(self, message):
        power = message.get('power', 'boost')
        print(f"โก {self.name} used {power}!")
        
        # ๐ค Notify room
        self.room.send({
            'type': 'player_action',
            'player_id': self.player_id,
            'action': 'score',
            'points': 2 if power == 'double' else 1
        })
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Actor Hierarchies and Supervision
When youโre ready to level up, implement sophisticated supervision strategies:
# ๐ฏ Advanced supervision with restart policies
class SupervisedActor(Actor):
    def __init__(self, name: str, supervisor=None):
        super().__init__(name)
        self.supervisor = supervisor
        self.restart_count = 0
        self.max_restarts = 3
        
    def _run(self):
        """๐ Enhanced run with error handling"""
        while self.running:
            try:
                message = self.mailbox.get(timeout=0.1)
                self._handle_message(message)
            except queue.Empty:
                continue
            except Exception as e:
                print(f"๐ฅ {self.name} crashed: {e}")
                self._notify_supervisor('crash', {'error': str(e)})
                
                if self.restart_count < self.max_restarts:
                    self.restart_count += 1
                    print(f"๐ Restarting {self.name} (attempt {self.restart_count})")
                    continue
                else:
                    print(f"โ ๏ธ {self.name} exceeded restart limit")
                    self.running = False
    
    def _notify_supervisor(self, event_type: str, data: Dict):
        """๐ค Notify supervisor of events"""
        if self.supervisor:
            self.supervisor.send({
                'type': 'supervision_event',
                'actor': self.name,
                'event': event_type,
                'data': data
            })
# ๐ช Supervisor with strategies
class SupervisorActor(Actor):
    def __init__(self):
        super().__init__("Supervisor")
        self.children = {}
        self.strategies = {
            'one_for_one': self._restart_one,
            'one_for_all': self._restart_all,
            'rest_for_one': self._restart_rest
        }
        self.strategy = 'one_for_one'
        
        self.on('supervision_event', self._handle_supervision)
        self.on('create_child', self._create_child)
    
    def _create_child(self, message):
        name = message.get('name')
        child = SupervisedActor(name, supervisor=self)
        self.children[name] = child
        print(f"โจ Supervisor created child: {name}")
    
    def _handle_supervision(self, message):
        actor_name = message.get('actor')
        event = message.get('event')
        
        if event == 'crash':
            print(f"๐จ Supervisor handling crash of {actor_name}")
            self.strategies[self.strategy](actor_name)
    
    def _restart_one(self, actor_name):
        """๐ฏ Restart only the failed actor"""
        if actor_name in self.children:
            old_actor = self.children[actor_name]
            old_actor.stop()
            
            new_actor = SupervisedActor(actor_name, supervisor=self)
            self.children[actor_name] = new_actor
            print(f"โ
 Restarted {actor_name}")
    
    def _restart_all(self, actor_name):
        """๐ Restart all children"""
        print("๐ซ Restarting all children...")
        for name in list(self.children.keys()):
            self._restart_one(name)
    
    def _restart_rest(self, actor_name):
        """๐ Restart failed actor and all created after it"""
        # Implementation depends on creation order tracking
        pass
๐๏ธ Advanced Topic 2: Distributed Actors
For the brave developers, hereโs distributed actor communication:
import json
import socket
from typing import Optional
# ๐ Network-enabled actor
class NetworkActor(Actor):
    def __init__(self, name: str, host='localhost', port=None):
        super().__init__(name)
        self.host = host
        self.port = port or self._get_free_port()
        self.socket = None
        self.remote_actors = {}  # ๐ Remote actor addresses
        
        self._start_server()
        self.on('remote_message', self._handle_remote)
        self.on('register_remote', self._register_remote)
    
    def _get_free_port(self):
        """๐ Find available port"""
        with socket.socket() as s:
            s.bind(('', 0))
            return s.getsockname()[1]
    
    def _start_server(self):
        """๐ฅ๏ธ Start network server"""
        def server_thread():
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind((self.host, self.port))
            server.listen(5)
            server.settimeout(0.1)
            
            print(f"๐ {self.name} listening on {self.host}:{self.port}")
            
            while self.running:
                try:
                    client, addr = server.accept()
                    data = client.recv(4096)
                    if data:
                        message = json.loads(data.decode())
                        self.send(message)
                    client.close()
                except socket.timeout:
                    continue
                except Exception as e:
                    print(f"โ ๏ธ Network error: {e}")
            
            server.close()
        
        threading.Thread(target=server_thread, daemon=True).start()
    
    def _register_remote(self, message):
        """๐ Register remote actor address"""
        actor_name = message.get('name')
        host = message.get('host')
        port = message.get('port')
        
        self.remote_actors[actor_name] = (host, port)
        print(f"๐ก Registered remote actor: {actor_name} at {host}:{port}")
    
    def send_remote(self, actor_name: str, message: Dict):
        """๐ค Send message to remote actor"""
        if actor_name in self.remote_actors:
            host, port = self.remote_actors[actor_name]
            
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((host, port))
                sock.send(json.dumps(message).encode())
                sock.close()
                print(f"๐จ Sent to {actor_name}: {message['type']}")
            except Exception as e:
                print(f"โ Failed to send to {actor_name}: {e}")
# ๐ฏ Example: Distributed chat system
class ChatNode(NetworkActor):
    def __init__(self, node_id: str):
        super().__init__(f"ChatNode-{node_id}")
        self.messages = []
        
        self.on('chat_message', self._handle_chat)
        self.on('sync_request', self._handle_sync)
    
    def _handle_chat(self, message):
        user = message.get('user')
        text = message.get('text')
        timestamp = message.get('timestamp', time.time())
        
        chat_msg = {
            'user': user,
            'text': text,
            'timestamp': timestamp,
            'node': self.name
        }
        
        self.messages.append(chat_msg)
        print(f"๐ฌ {user}: {text}")
        
        # ๐ข Broadcast to other nodes
        for remote_node in self.remote_actors:
            self.send_remote(remote_node, {
                'type': 'chat_message',
                'user': user,
                'text': text,
                'timestamp': timestamp
            })
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Deadlocks in Request-Reply
# โ Wrong way - can cause deadlock!
class BadActor(Actor):
    def __init__(self):
        super().__init__("BadActor")
        self.other_actor = None
        
    def process(self, msg):
        # ๐ฅ Blocking wait for reply - DON'T DO THIS!
        reply_queue = queue.Queue()
        self.other_actor.send({
            'type': 'request',
            'reply_to': reply_queue  
        })
        # This blocks the actor's thread!
        result = reply_queue.get()  # ๐ฐ Deadlock risk!
# โ
 Correct way - use callbacks or futures!
class GoodActor(Actor):
    def __init__(self):
        super().__init__("GoodActor")
        self.pending_requests = {}
        
    def process(self, msg):
        request_id = str(uuid.uuid4())
        self.pending_requests[request_id] = msg.get('original_sender')
        
        # ๐ค Non-blocking send
        self.other_actor.send({
            'type': 'request',
            'request_id': request_id,
            'reply_to': self
        })
        # โ
 Continue processing other messages!
    
    def handle_reply(self, msg):
        request_id = msg.get('request_id')
        if request_id in self.pending_requests:
            # Process the reply asynchronously
            original_sender = self.pending_requests.pop(request_id)
            # Do something with the result
๐คฏ Pitfall 2: Message Ordering Assumptions
# โ Dangerous - assuming message order!
class OrderDependentActor(Actor):
    def process_sequence(self):
        self.send({'type': 'step1'})
        self.send({'type': 'step2'})  # ๐ฅ May arrive before step1!
        self.send({'type': 'step3'})
# โ
 Safe - explicit sequencing!
class SequencedActor(Actor):
    def __init__(self):
        super().__init__("SequencedActor")
        self.sequence_number = 0
        
    def send_sequenced(self, message):
        # ๐ฏ Add sequence number
        message['sequence'] = self.sequence_number
        self.sequence_number += 1
        self.send(message)
    
    def _handle_message(self, message):
        # โ
 Process in order
        seq = message.get('sequence', -1)
        # Buffer out-of-order messages if needed
๐ ๏ธ Best Practices
- ๐ฏ Keep Actors Focused: One responsibility per actor
 - ๐ Immutable Messages: Never modify messages after sending
 - ๐ก๏ธ Fail Fast: Let actors crash and restart cleanly
 - ๐จ Use Supervision: Always have a supervision strategy
 - โจ Avoid Blocking: Never block in message handlers
 - ๐ Monitor Mailboxes: Watch for message queue buildup
 - ๐ Design for Failure: Assume actors can crash anytime
 
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Distributed Task Queue
Create an actor-based task queue system:
๐ Requirements:
- โ Master actor that distributes tasks
 - ๐ท๏ธ Worker actors that process tasks
 - ๐ค Result aggregator actor
 - ๐ Task priorities and deadlines
 - ๐จ Fault tolerance with restart capability
 
๐ Bonus Points:
- Add work stealing between workers
 - Implement backpressure handling
 - Create a monitoring dashboard actor
 
๐ก Solution
๐ Click to see solution
import heapq
from dataclasses import dataclass
from datetime import datetime, timedelta
# ๐ฏ Our distributed task queue system!
@dataclass
class Task:
    id: str
    priority: int  # Lower = higher priority
    deadline: datetime
    payload: Dict[str, Any]
    retries: int = 0
    
    def __lt__(self, other):
        return self.priority < other.priority
class TaskMaster(Actor):
    def __init__(self):
        super().__init__("TaskMaster")
        self.task_queue = []  # ๐ Priority queue
        self.workers = []  # ๐ฅ Worker actors
        self.results = {}  # ๐ Task results
        self.pending_tasks = {}  # ๐ Tasks being processed
        
        # ๐จ Register handlers
        self.on('submit_task', self._submit_task)
        self.on('worker_ready', self._worker_ready)
        self.on('task_complete', self._task_complete)
        self.on('task_failed', self._task_failed)
        self.on('create_worker', self._create_worker)
    
    def _submit_task(self, message):
        task = Task(
            id=message.get('id', str(uuid.uuid4())),
            priority=message.get('priority', 5),
            deadline=datetime.now() + timedelta(seconds=message.get('timeout', 60)),
            payload=message.get('payload', {})
        )
        
        heapq.heappush(self.task_queue, task)
        print(f"โ
 Task {task.id} submitted with priority {task.priority}")
        
        # ๐ Try to assign immediately
        self._try_assign_tasks()
    
    def _create_worker(self, message):
        worker_id = f"Worker-{len(self.workers)}"
        worker = TaskWorker(worker_id, self)
        self.workers.append(worker)
        print(f"๐ Created {worker_id}")
    
    def _worker_ready(self, message):
        worker = message.get('worker')
        self._assign_task_to_worker(worker)
    
    def _try_assign_tasks(self):
        """๐ฏ Assign tasks to available workers"""
        for worker in self.workers:
            if not self.task_queue:
                break
                
            # Check if worker is available
            worker_id = worker.name
            if worker_id not in self.pending_tasks:
                self._assign_task_to_worker(worker)
    
    def _assign_task_to_worker(self, worker):
        if not self.task_queue:
            return
            
        task = heapq.heappop(self.task_queue)
        
        # โฐ Check deadline
        if datetime.now() > task.deadline:
            print(f"โ ๏ธ Task {task.id} expired!")
            self.results[task.id] = {'status': 'expired'}
            return
        
        self.pending_tasks[worker.name] = task
        worker.send({
            'type': 'process_task',
            'task': task
        })
        print(f"๐ค Assigned task {task.id} to {worker.name}")
    
    def _task_complete(self, message):
        worker_id = message.get('worker_id')
        task_id = message.get('task_id')
        result = message.get('result')
        
        if worker_id in self.pending_tasks:
            del self.pending_tasks[worker_id]
        
        self.results[task_id] = {
            'status': 'complete',
            'result': result,
            'worker': worker_id
        }
        
        print(f"โ
 Task {task_id} completed by {worker_id}")
        
        # ๐ Worker is ready for next task
        self._worker_ready({'worker': next(w for w in self.workers if w.name == worker_id)})
    
    def _task_failed(self, message):
        worker_id = message.get('worker_id')
        task_id = message.get('task_id')
        error = message.get('error')
        
        task = self.pending_tasks.get(worker_id)
        if task and task.retries < 3:
            # ๐ Retry the task
            task.retries += 1
            heapq.heappush(self.task_queue, task)
            print(f"๐ Retrying task {task_id} (attempt {task.retries + 1})")
        else:
            # โ Task failed permanently
            self.results[task_id] = {
                'status': 'failed',
                'error': error
            }
            print(f"โ Task {task_id} failed permanently")
        
        if worker_id in self.pending_tasks:
            del self.pending_tasks[worker_id]
class TaskWorker(Actor):
    def __init__(self, worker_id: str, master: TaskMaster):
        super().__init__(worker_id)
        self.master = master
        self.current_task = None
        
        self.on('process_task', self._process_task)
        
        # ๐ฏ Tell master we're ready
        self.master.send({
            'type': 'worker_ready',
            'worker': self
        })
    
    def _process_task(self, message):
        task = message.get('task')
        self.current_task = task
        
        try:
            # ๐ฎ Simulate task processing
            print(f"โ๏ธ {self.name} processing task {task.id}")
            
            # Your actual task processing logic here
            result = self._execute_task(task.payload)
            
            # ๐ค Report success
            self.master.send({
                'type': 'task_complete',
                'worker_id': self.name,
                'task_id': task.id,
                'result': result
            })
            
        except Exception as e:
            # ๐ฅ Report failure
            self.master.send({
                'type': 'task_failed',
                'worker_id': self.name,
                'task_id': task.id,
                'error': str(e)
            })
        
        finally:
            self.current_task = None
    
    def _execute_task(self, payload):
        # ๐ฏ Actual task execution
        operation = payload.get('operation', 'default')
        
        if operation == 'compute':
            # Heavy computation simulation
            time.sleep(0.1)
            return {'computed': payload.get('value', 0) * 2}
        elif operation == 'fetch':
            # I/O simulation
            time.sleep(0.05)
            return {'fetched': 'data'}
        else:
            return {'processed': True}
# ๐ Result Aggregator
class ResultAggregator(Actor):
    def __init__(self, master: TaskMaster):
        super().__init__("ResultAggregator")
        self.master = master
        self.aggregated_results = {}
        
        self.on('aggregate', self._aggregate)
        self.on('get_report', self._get_report)
    
    def _aggregate(self, message):
        batch_id = message.get('batch_id')
        task_ids = message.get('task_ids', [])
        
        results = []
        for task_id in task_ids:
            if task_id in self.master.results:
                results.append(self.master.results[task_id])
        
        self.aggregated_results[batch_id] = {
            'total': len(task_ids),
            'completed': sum(1 for r in results if r.get('status') == 'complete'),
            'failed': sum(1 for r in results if r.get('status') == 'failed'),
            'results': results
        }
        
        print(f"๐ Aggregated batch {batch_id}: {self.aggregated_results[batch_id]['completed']}/{len(task_ids)} completed")
    
    def _get_report(self, message):
        print("๐ Task Queue Report:")
        for batch_id, stats in self.aggregated_results.items():
            print(f"  Batch {batch_id}: {stats['completed']} completed, {stats['failed']} failed")
# ๐ฎ Test it out!
task_queue = TaskMaster()
# Create workers
for i in range(3):
    task_queue.send({'type': 'create_worker'})
# Submit tasks
for i in range(10):
    task_queue.send({
        'type': 'submit_task',
        'id': f'task-{i}',
        'priority': i % 3,  # Different priorities
        'payload': {
            'operation': ['compute', 'fetch'][i % 2],
            'value': i
        }
    })
# Let it process
time.sleep(1)
# Create aggregator and get report
aggregator = ResultAggregator(task_queue)
aggregator.send({
    'type': 'aggregate',
    'batch_id': 'batch-1',
    'task_ids': [f'task-{i}' for i in range(10)]
})
aggregator.send({'type': 'get_report'})๐ Key Takeaways
Youโve mastered the Actor Model! Hereโs what you can now do:
- โ Design concurrent systems without shared state complexity ๐ช
 - โ Build fault-tolerant applications with supervision ๐ก๏ธ
 - โ Scale across cores and machines with distributed actors ๐ฏ
 - โ Debug message-passing systems like a pro ๐
 - โ Create reactive, event-driven architectures with confidence! ๐
 
Remember: The Actor Model transforms complex concurrency into simple message passing. Let each actor do one thing well! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve conquered the Actor Model and message passing!
Hereโs what to do next:
- ๐ป Build a real-time chat system with actors
 - ๐๏ธ Explore actor frameworks like Pykka or Ray
 - ๐ Move on to our next tutorial: Async/Await Patterns
 - ๐ Share your actor-based creations with the community!
 
Remember: Every distributed system expert started with their first actor. Keep experimenting, keep learning, and most importantly, have fun with concurrent programming! ๐
Happy coding! ๐๐โจ