+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 465 of 541

๐Ÿš€ Message Queues: RabbitMQ with Python

Master message queues with RabbitMQ in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on Message Queues with RabbitMQ! ๐ŸŽ‰ In this guide, weโ€™ll explore how to build robust, scalable applications using one of the most popular message brokers in the world.

Youโ€™ll discover how message queues can transform your Python applications from tightly coupled systems into loosely coupled, scalable architectures. Whether youโ€™re building microservices ๐Ÿ—๏ธ, processing background tasks โšก, or handling real-time data streams ๐Ÿ“Š, understanding RabbitMQ is essential for modern distributed systems.

By the end of this tutorial, youโ€™ll feel confident implementing message queues in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Message Queues and RabbitMQ

๐Ÿค” What are Message Queues?

Message queues are like a postal service for your applications ๐Ÿ“ฎ. Think of it as a reliable delivery system where one part of your application (the sender) can drop off messages, and another part (the receiver) can pick them up when ready - even if theyโ€™re not running at the same time!

In Python terms, message queues enable asynchronous communication between different parts of your system. This means you can:

  • โœจ Decouple application components
  • ๐Ÿš€ Scale different parts independently
  • ๐Ÿ›ก๏ธ Handle failures gracefully
  • โšก Process tasks asynchronously

๐Ÿ’ก Why Use RabbitMQ?

Hereโ€™s why developers love RabbitMQ:

  1. Reliability ๐Ÿ”’: Messages are persisted and guaranteed delivery
  2. Flexibility ๐Ÿ’ป: Multiple messaging patterns (pub/sub, work queues, RPC)
  3. Scalability ๐Ÿ“–: Handle millions of messages per second
  4. Language Support ๐Ÿ”ง: Works with Python, Java, .NET, and more

Real-world example: Imagine an e-commerce site ๐Ÿ›’. When a customer places an order, you need to process payment, update inventory, send emails, and generate invoices. With RabbitMQ, each task becomes a message that different services handle independently!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Setting Up RabbitMQ

First, letโ€™s install the Python client:

# ๐ŸŽฏ Install pika - the Python RabbitMQ client
# pip install pika

import pika
import json
import datetime

# ๐Ÿ‘‹ Hello, RabbitMQ!
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# ๐ŸŽจ Create a queue
channel.queue_declare(queue='hello_queue')
print("โœ… Connected to RabbitMQ!")

๐Ÿ’ก Explanation: Weโ€™re using pika, the most popular Python client for RabbitMQ. The connection is like opening a phone line ๐Ÿ“ž to RabbitMQ!

๐ŸŽฏ Basic Producer (Sender)

Hereโ€™s how to send messages:

# ๐Ÿš€ Simple message producer
import pika
import json
from datetime import datetime

class MessageProducer:
    def __init__(self):
        # ๐Ÿ”Œ Establish connection
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Declare the queue
        self.channel.queue_declare(queue='task_queue', durable=True)
    
    def send_message(self, message):
        # ๐ŸŽจ Prepare the message
        body = json.dumps({
            'message': message,
            'timestamp': datetime.now().isoformat(),
            'emoji': '๐Ÿ“จ'
        })
        
        # ๐Ÿš€ Send it!
        self.channel.basic_publish(
            exchange='',
            routing_key='task_queue',
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Make message persistent
            )
        )
        print(f"โœ… Sent: {message}")
    
    def close(self):
        self.connection.close()
        print("๐Ÿ‘‹ Connection closed!")

# ๐ŸŽฎ Let's use it!
producer = MessageProducer()
producer.send_message("Hello RabbitMQ! ๐Ÿฐ")
producer.send_message("Process this order ๐Ÿ›’")
producer.close()

๐ŸŽฏ Basic Consumer (Receiver)

Now letโ€™s receive messages:

# ๐Ÿ“จ Simple message consumer
import pika
import json
import time

class MessageConsumer:
    def __init__(self):
        # ๐Ÿ”Œ Connect to RabbitMQ
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Ensure queue exists
        self.channel.queue_declare(queue='task_queue', durable=True)
        
        # ๐ŸŽฏ Set prefetch to process one message at a time
        self.channel.basic_qos(prefetch_count=1)
    
    def process_message(self, ch, method, properties, body):
        # ๐ŸŽจ Decode the message
        message = json.loads(body)
        print(f"๐Ÿ“จ Received: {message['message']}")
        
        # ๐Ÿ’ก Simulate processing
        time.sleep(2)  # Pretend we're doing hard work! ๐Ÿ’ช
        print(f"โœ… Processed at: {message['timestamp']}")
        
        # ๐ŸŽฏ Acknowledge the message
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def start_consuming(self):
        # ๐ŸŽง Set up the consumer
        self.channel.basic_consume(
            queue='task_queue',
            on_message_callback=self.process_message
        )
        
        print("โณ Waiting for messages. Press CTRL+C to exit...")
        self.channel.start_consuming()

# ๐Ÿš€ Start consuming!
consumer = MessageConsumer()
consumer.start_consuming()

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-commerce Order Processing

Letโ€™s build a real order processing system:

# ๐Ÿ›๏ธ E-commerce order processor
import pika
import json
import uuid
from datetime import datetime
from enum import Enum

class OrderStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class Order:
    def __init__(self, customer_email, items, total):
        self.id = str(uuid.uuid4())
        self.customer_email = customer_email
        self.items = items
        self.total = total
        self.status = OrderStatus.PENDING
        self.created_at = datetime.now()

class OrderProcessor:
    def __init__(self):
        # ๐Ÿ”Œ Setup RabbitMQ connection
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Declare exchanges and queues
        self.channel.exchange_declare(
            exchange='orders', 
            exchange_type='topic'
        )
        
        # ๐ŸŽฏ Different queues for different tasks
        queues = [
            ('payment_queue', 'order.payment'),
            ('inventory_queue', 'order.inventory'),
            ('email_queue', 'order.email'),
            ('shipping_queue', 'order.shipping')
        ]
        
        for queue_name, routing_key in queues:
            self.channel.queue_declare(queue=queue_name, durable=True)
            self.channel.queue_bind(
                exchange='orders',
                queue=queue_name,
                routing_key=routing_key
            )
    
    def process_order(self, order):
        # ๐ŸŽจ Create order message
        order_data = {
            'order_id': order.id,
            'customer_email': order.customer_email,
            'items': order.items,
            'total': order.total,
            'timestamp': order.created_at.isoformat(),
            'emoji': '๐Ÿ›’'
        }
        
        # ๐Ÿ“จ Send to different services
        tasks = [
            ('order.payment', '๐Ÿ’ณ Processing payment'),
            ('order.inventory', '๐Ÿ“ฆ Updating inventory'),
            ('order.email', '๐Ÿ“ง Sending confirmation'),
            ('order.shipping', '๐Ÿšš Preparing shipment')
        ]
        
        for routing_key, description in tasks:
            self.channel.basic_publish(
                exchange='orders',
                routing_key=routing_key,
                body=json.dumps(order_data),
                properties=pika.BasicProperties(
                    delivery_mode=2,
                )
            )
            print(f"โœ… {description} for order {order.id}")

# ๐ŸŽฎ Let's process an order!
processor = OrderProcessor()

# ๐Ÿ›’ Create a sample order
order = Order(
    customer_email="[email protected]",
    items=[
        {"name": "Python Book", "price": 29.99, "emoji": "๐Ÿ“˜"},
        {"name": "Coffee Mug", "price": 12.99, "emoji": "โ˜•"}
    ],
    total=42.98
)

processor.process_order(order)
print(f"๐ŸŽ‰ Order {order.id} sent for processing!")

๐ŸŽฎ Example 2: Real-time Game Event System

Letโ€™s create a game event broadcasting system:

# ๐ŸŽฎ Real-time game event system
import pika
import json
import threading
from datetime import datetime

class GameEventBroadcaster:
    def __init__(self, game_id):
        self.game_id = game_id
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ก Create fanout exchange for broadcasting
        self.exchange_name = f'game_{game_id}_events'
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type='fanout'
        )
        print(f"๐ŸŽฎ Game {game_id} event broadcaster ready!")
    
    def broadcast_event(self, event_type, player_id, data):
        # ๐ŸŽจ Create event message
        event = {
            'game_id': self.game_id,
            'event_type': event_type,
            'player_id': player_id,
            'data': data,
            'timestamp': datetime.now().isoformat(),
            'emoji': self._get_event_emoji(event_type)
        }
        
        # ๐Ÿ“ก Broadcast to all subscribers
        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key='',
            body=json.dumps(event)
        )
        print(f"{event['emoji']} Event: {event_type} by Player {player_id}")
    
    def _get_event_emoji(self, event_type):
        emojis = {
            'player_joined': '๐ŸŽฎ',
            'player_scored': '๐ŸŽฏ',
            'level_complete': '๐Ÿ†',
            'power_up': 'โšก',
            'player_defeated': '๐Ÿ’€',
            'game_over': '๐ŸŽฒ'
        }
        return emojis.get(event_type, '๐ŸŽฎ')

class GameEventListener:
    def __init__(self, game_id, player_name):
        self.game_id = game_id
        self.player_name = player_name
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ก Connect to game exchange
        exchange_name = f'game_{game_id}_events'
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type='fanout'
        )
        
        # ๐Ÿ“ฆ Create unique queue for this player
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.queue_name = result.method.queue
        
        # ๐Ÿ”— Bind queue to exchange
        self.channel.queue_bind(
            exchange=exchange_name,
            queue=self.queue_name
        )
        
        print(f"๐Ÿ‘ค {player_name} listening to game events...")
    
    def handle_event(self, ch, method, properties, body):
        event = json.loads(body)
        print(f"{event['emoji']} {self.player_name} received: {event['event_type']}")
        
        # ๐ŸŽฏ React to different events
        if event['event_type'] == 'player_scored':
            print(f"   ๐ŸŽ‰ Player {event['player_id']} scored {event['data']['points']} points!")
        elif event['event_type'] == 'power_up':
            print(f"   โšก Power-up activated: {event['data']['type']}!")
    
    def start_listening(self):
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self.handle_event,
            auto_ack=True
        )
        self.channel.start_consuming()

# ๐ŸŽฎ Demo the game system
if __name__ == "__main__":
    game_id = "epic_battle_123"
    
    # ๐ŸŽฏ Create broadcaster
    broadcaster = GameEventBroadcaster(game_id)
    
    # ๐ŸŽฎ Simulate game events
    broadcaster.broadcast_event('player_joined', 'Alice', {'character': 'Warrior'})
    broadcaster.broadcast_event('player_joined', 'Bob', {'character': 'Mage'})
    broadcaster.broadcast_event('player_scored', 'Alice', {'points': 100, 'combo': 5})
    broadcaster.broadcast_event('power_up', 'Bob', {'type': 'Speed Boost', 'duration': 30})

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: RPC (Remote Procedure Call)

When you need to call functions on remote servers:

# ๐ŸŽฏ RPC implementation with RabbitMQ
import pika
import uuid
import json

class RPCClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Create callback queue
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        
        # ๐ŸŽง Setup consumer
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
        
        self.response = None
        self.corr_id = None
    
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = json.loads(body)
    
    def call(self, method_name, *args, **kwargs):
        # ๐ŸŽจ Create unique correlation ID
        self.response = None
        self.corr_id = str(uuid.uuid4())
        
        # ๐Ÿ“จ Send RPC request
        request = {
            'method': method_name,
            'args': args,
            'kwargs': kwargs,
            'emoji': '๐Ÿ”ฎ'
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=json.dumps(request)
        )
        
        # โณ Wait for response
        while self.response is None:
            self.connection.process_data_events()
        
        return self.response

# ๐Ÿš€ RPC Server
class RPCServer:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='rpc_queue')
        
        # ๐Ÿงฎ Available methods
        self.methods = {
            'calculate': self.calculate,
            'process_data': self.process_data,
            'get_status': self.get_status
        }
    
    def calculate(self, operation, a, b):
        operations = {
            'add': a + b,
            'multiply': a * b,
            'power': a ** b
        }
        return {
            'result': operations.get(operation, 0),
            'emoji': '๐Ÿงฎ'
        }
    
    def process_data(self, data):
        return {
            'processed': len(data),
            'summary': f"Processed {len(data)} items",
            'emoji': 'โšก'
        }
    
    def get_status(self):
        return {
            'status': 'operational',
            'uptime': '99.9%',
            'emoji': 'โœ…'
        }
    
    def on_request(self, ch, method, props, body):
        request = json.loads(body)
        
        # ๐ŸŽฏ Execute requested method
        method_name = request['method']
        args = request.get('args', [])
        kwargs = request.get('kwargs', {})
        
        if method_name in self.methods:
            response = self.methods[method_name](*args, **kwargs)
        else:
            response = {'error': 'Method not found', 'emoji': 'โŒ'}
        
        # ๐Ÿ“จ Send response
        ch.basic_publish(
            exchange='',
            routing_key=props.reply_to,
            properties=pika.BasicProperties(
                correlation_id=props.correlation_id
            ),
            body=json.dumps(response)
        )
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def start(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='rpc_queue',
            on_message_callback=self.on_request
        )
        print("๐Ÿš€ RPC Server ready!")
        self.channel.start_consuming()

๐Ÿ—๏ธ Advanced Topic 2: Priority Queues and Dead Letter Exchanges

For handling message priorities and failures:

# ๐Ÿš€ Advanced queue features
import pika
import json
import random

class AdvancedQueueManager:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ’€ Setup Dead Letter Exchange
        self.channel.exchange_declare(
            exchange='dlx',
            exchange_type='fanout'
        )
        self.channel.queue_declare(
            queue='dead_letter_queue',
            durable=True
        )
        self.channel.queue_bind(
            exchange='dlx',
            queue='dead_letter_queue'
        )
        
        # ๐ŸŽฏ Priority queue with DLX
        self.channel.queue_declare(
            queue='priority_tasks',
            durable=True,
            arguments={
                'x-max-priority': 10,
                'x-dead-letter-exchange': 'dlx',
                'x-message-ttl': 60000  # 60 seconds TTL
            }
        )
    
    def send_priority_task(self, task, priority=5):
        # ๐ŸŽจ Create task message
        message = {
            'task': task,
            'priority': priority,
            'timestamp': datetime.now().isoformat(),
            'emoji': self._get_priority_emoji(priority)
        }
        
        # ๐Ÿ“จ Send with priority
        self.channel.basic_publish(
            exchange='',
            routing_key='priority_tasks',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,
                priority=priority
            )
        )
        print(f"{message['emoji']} Sent task with priority {priority}: {task}")
    
    def _get_priority_emoji(self, priority):
        if priority >= 8:
            return '๐Ÿ”ด'  # Critical
        elif priority >= 5:
            return '๐ŸŸก'  # Normal
        else:
            return '๐ŸŸข'  # Low

# ๐ŸŽฎ Demo priority system
manager = AdvancedQueueManager()

# Send tasks with different priorities
tasks = [
    ("Process payment", 9),
    ("Send welcome email", 3),
    ("Generate report", 5),
    ("Critical security update", 10),
    ("Update user profile", 4)
]

for task, priority in tasks:
    manager.send_priority_task(task, priority)

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Not Handling Connection Failures

# โŒ Wrong way - no error handling!
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()  # ๐Ÿ’ฅ Crashes if RabbitMQ is down!

# โœ… Correct way - with retry logic!
import time

class ResilientConnection:
    def __init__(self, max_retries=5):
        self.max_retries = max_retries
        self.connection = None
        self.channel = None
        self.connect()
    
    def connect(self):
        for attempt in range(self.max_retries):
            try:
                self.connection = pika.BlockingConnection(
                    pika.ConnectionParameters('localhost')
                )
                self.channel = self.connection.channel()
                print("โœ… Connected to RabbitMQ!")
                return
            except pika.exceptions.AMQPConnectionError:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"โš ๏ธ Connection failed. Retrying in {wait_time}s...")
                time.sleep(wait_time)
        
        raise Exception("โŒ Could not connect to RabbitMQ after max retries!")

๐Ÿคฏ Pitfall 2: Not Acknowledging Messages

# โŒ Dangerous - messages lost if consumer crashes!
def bad_callback(ch, method, properties, body):
    process_message(body)  # ๐Ÿ’ฅ If this fails, message is lost!
    # No acknowledgment!

# โœ… Safe - message redelivered on failure!
def good_callback(ch, method, properties, body):
    try:
        process_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # โœ… Acknowledge on success
        print("โœ… Message processed successfully!")
    except Exception as e:
        print(f"โŒ Error: {e}")
        # Message will be redelivered
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True  # ๐Ÿ”„ Put back in queue
        )

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Use Durable Queues: Survive RabbitMQ restarts
  2. ๐Ÿ“ Set Message TTL: Prevent queue overflow
  3. ๐Ÿ›ก๏ธ Implement Circuit Breakers: Handle downstream failures
  4. ๐ŸŽจ Use Dead Letter Exchanges: Handle failed messages
  5. โœจ Monitor Queue Lengths: Prevent memory issues

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Task Distribution System

Create a distributed task processing system:

๐Ÿ“‹ Requirements:

  • โœ… Multiple worker types (CPU intensive, I/O intensive)
  • ๐Ÿท๏ธ Task priorities and categories
  • ๐Ÿ‘ค Worker health monitoring
  • ๐Ÿ“… Task scheduling with delays
  • ๐ŸŽจ Progress tracking and reporting

๐Ÿš€ Bonus Points:

  • Add task retry logic with exponential backoff
  • Implement worker auto-scaling
  • Create a dashboard for monitoring

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Distributed task processing system!
import pika
import json
import uuid
import time
import threading
from datetime import datetime, timedelta
from enum import Enum

class TaskType(Enum):
    CPU_INTENSIVE = "cpu"
    IO_INTENSIVE = "io"
    SCHEDULED = "scheduled"

class TaskStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class Task:
    def __init__(self, name, task_type, priority=5, delay_seconds=0):
        self.id = str(uuid.uuid4())
        self.name = name
        self.task_type = task_type
        self.priority = priority
        self.status = TaskStatus.PENDING
        self.created_at = datetime.now()
        self.execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        self.attempts = 0
        self.max_attempts = 3

class TaskDistributor:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Setup exchanges and queues
        self.channel.exchange_declare(
            exchange='tasks',
            exchange_type='direct'
        )
        
        # ๐ŸŽฏ Different queues for different task types
        for task_type in TaskType:
            queue_name = f"{task_type.value}_tasks"
            self.channel.queue_declare(
                queue=queue_name,
                durable=True,
                arguments={'x-max-priority': 10}
            )
            self.channel.queue_bind(
                exchange='tasks',
                queue=queue_name,
                routing_key=task_type.value
            )
        
        # ๐Ÿ“Š Progress tracking
        self.task_progress = {}
    
    def submit_task(self, task):
        # ๐ŸŽจ Prepare task message
        task_data = {
            'id': task.id,
            'name': task.name,
            'type': task.task_type.value,
            'priority': task.priority,
            'status': task.status.value,
            'execute_at': task.execute_at.isoformat(),
            'attempts': task.attempts,
            'emoji': '๐Ÿš€'
        }
        
        # ๐Ÿ“จ Route to appropriate queue
        self.channel.basic_publish(
            exchange='tasks',
            routing_key=task.task_type.value,
            body=json.dumps(task_data),
            properties=pika.BasicProperties(
                delivery_mode=2,
                priority=task.priority
            )
        )
        
        self.task_progress[task.id] = task.status
        print(f"โœ… Task {task.id} submitted: {task.name}")
    
    def get_task_progress(self, task_id):
        return self.task_progress.get(task_id, TaskStatus.PENDING)

class TaskWorker:
    def __init__(self, worker_id, task_type):
        self.worker_id = worker_id
        self.task_type = task_type
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        
        # ๐Ÿ“ฆ Setup queue
        self.queue_name = f"{task_type.value}_tasks"
        self.channel.queue_declare(
            queue=self.queue_name,
            durable=True,
            arguments={'x-max-priority': 10}
        )
        
        # ๐ŸŽฏ Process one task at a time
        self.channel.basic_qos(prefetch_count=1)
        
        # ๐Ÿ’ช Worker health
        self.is_healthy = True
        self.tasks_processed = 0
        
        print(f"๐Ÿค– Worker {worker_id} ready for {task_type.value} tasks!")
    
    def process_task(self, ch, method, properties, body):
        task_data = json.loads(body)
        task_id = task_data['id']
        
        print(f"โš™๏ธ Worker {self.worker_id} processing: {task_data['name']}")
        
        try:
            # ๐Ÿ• Check if scheduled for later
            execute_at = datetime.fromisoformat(task_data['execute_at'])
            if execute_at > datetime.now():
                delay = (execute_at - datetime.now()).total_seconds()
                print(f"โฐ Task scheduled for later, waiting {delay:.0f}s...")
                time.sleep(delay)
            
            # ๐Ÿ’ก Simulate processing
            if self.task_type == TaskType.CPU_INTENSIVE:
                time.sleep(3)  # Heavy computation
                result = "Computed complex result ๐Ÿงฎ"
            else:
                time.sleep(1)  # I/O operation
                result = "Fetched data successfully ๐Ÿ“Š"
            
            # โœ… Task completed
            self.tasks_processed += 1
            print(f"โœ… Worker {self.worker_id} completed: {result}")
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        except Exception as e:
            print(f"โŒ Task failed: {e}")
            task_data['attempts'] += 1
            
            # ๐Ÿ”„ Retry logic
            if task_data['attempts'] < 3:
                print(f"๐Ÿ”„ Retrying task (attempt {task_data['attempts']})")
                ch.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=True
                )
            else:
                print(f"๐Ÿ’€ Task failed permanently after 3 attempts")
                ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def start_working(self):
        self.channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=self.process_task
        )
        
        print(f"๐ŸŽง Worker {self.worker_id} listening...")
        self.channel.start_consuming()
    
    def get_health(self):
        return {
            'worker_id': self.worker_id,
            'is_healthy': self.is_healthy,
            'tasks_processed': self.tasks_processed,
            'emoji': '๐Ÿ’š' if self.is_healthy else '๐Ÿ’”'
        }

# ๐ŸŽฎ Test the system!
if __name__ == "__main__":
    # Create distributor
    distributor = TaskDistributor()
    
    # Submit various tasks
    tasks = [
        Task("Calculate prime numbers", TaskType.CPU_INTENSIVE, priority=8),
        Task("Fetch API data", TaskType.IO_INTENSIVE, priority=5),
        Task("Generate report", TaskType.CPU_INTENSIVE, priority=3, delay_seconds=5),
        Task("Send email batch", TaskType.IO_INTENSIVE, priority=7),
        Task("Process video", TaskType.CPU_INTENSIVE, priority=10)
    ]
    
    for task in tasks:
        distributor.submit_task(task)
    
    print("๐ŸŽ‰ All tasks submitted!")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Implement message queues with RabbitMQ and Python ๐Ÿ’ช
  • โœ… Build scalable architectures with decoupled components ๐Ÿ›ก๏ธ
  • โœ… Handle various messaging patterns (work queues, pub/sub, RPC) ๐ŸŽฏ
  • โœ… Debug common issues with connections and acknowledgments ๐Ÿ›
  • โœ… Create distributed systems with Python and RabbitMQ! ๐Ÿš€

Remember: Message queues are the backbone of modern distributed systems. Theyโ€™re here to help you build reliable, scalable applications! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered message queues with RabbitMQ!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Build a microservices architecture using RabbitMQ
  3. ๐Ÿ“š Explore advanced patterns like sagas and event sourcing
  4. ๐ŸŒŸ Share your distributed system journey with others!

Remember: Every distributed systems expert started with their first message queue. Keep building, keep learning, and most importantly, have fun! ๐Ÿš€


Happy message queuing! ๐ŸŽ‰๐Ÿš€โœจ