+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 112 of 343

๐Ÿš€ Queue Implementation: FIFO Data Structure

Master queue implementation in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand queue fundamentals ๐ŸŽฏ
  • Apply queues in real projects ๐Ÿ—๏ธ
  • Debug common queue issues ๐Ÿ›
  • Write clean, Pythonic queue code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on Queue Implementation! ๐ŸŽ‰ In this guide, weโ€™ll explore the FIFO (First-In-First-Out) data structure that powers everything from print spoolers to web servers.

Youโ€™ll discover how queues can transform your Python development experience. Whether youโ€™re building task managers ๐Ÿ“‹, message systems ๐Ÿ’ฌ, or process schedulers ๐Ÿ”„, understanding queues is essential for writing efficient, scalable code.

By the end of this tutorial, youโ€™ll feel confident implementing and using queues in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Queues

๐Ÿค” What is a Queue?

A queue is like a line at your favorite coffee shop โ˜•. Think of it as a waiting line where the first person to arrive is the first person to be served - thatโ€™s FIFO (First-In-First-Out) in action!

In Python terms, a queue is a linear data structure that follows the FIFO principle. This means you can:

  • โœจ Add elements to the rear (enqueue)
  • ๐Ÿš€ Remove elements from the front (dequeue)
  • ๐Ÿ›ก๏ธ Process items in the order they arrived

๐Ÿ’ก Why Use Queues?

Hereโ€™s why developers love queues:

  1. Fair Processing ๐Ÿ”’: First come, first served - perfect fairness!
  2. Order Preservation ๐Ÿ’ป: Maintains the sequence of operations
  3. Decoupling ๐Ÿ“–: Separates producers from consumers
  4. Buffering ๐Ÿ”ง: Handles varying processing speeds

Real-world example: Imagine building a ticket booking system ๐ŸŽซ. With queues, you can ensure customers are served in the order they arrived - no jumping the line!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Queue Implementation

Letโ€™s start with a friendly example using Pythonโ€™s built-in list:

# ๐Ÿ‘‹ Hello, Queue!
class SimpleQueue:
    def __init__(self):
        self.items = []  # ๐Ÿ“ฆ Our queue storage
    
    def enqueue(self, item):
        """Add item to the rear of the queue ๐ŸŽฏ"""
        self.items.append(item)
        print(f"โž• Added '{item}' to queue!")
    
    def dequeue(self):
        """Remove item from the front of the queue ๐Ÿš€"""
        if not self.is_empty():
            item = self.items.pop(0)
            print(f"โž– Removed '{item}' from queue!")
            return item
        else:
            print("โš ๏ธ Queue is empty!")
            return None
    
    def is_empty(self):
        """Check if queue is empty ๐Ÿ”"""
        return len(self.items) == 0
    
    def size(self):
        """Get queue size ๐Ÿ“"""
        return len(self.items)
    
    def peek(self):
        """View front item without removing ๐Ÿ‘€"""
        if not self.is_empty():
            return self.items[0]
        return None

# ๐ŸŽฎ Let's try it!
queue = SimpleQueue()
queue.enqueue("First customer ๐Ÿฅ‡")
queue.enqueue("Second customer ๐Ÿฅˆ")
queue.enqueue("Third customer ๐Ÿฅ‰")

queue.dequeue()  # First one out!

๐Ÿ’ก Explanation: Notice how we add items to the end but remove from the beginning - thatโ€™s FIFO in action!

๐ŸŽฏ Using Pythonโ€™s deque

Hereโ€™s a more efficient implementation using collections.deque:

from collections import deque

# ๐Ÿ—๏ธ Efficient queue with deque
class EfficientQueue:
    def __init__(self):
        self.items = deque()  # โšก Optimized for both ends
    
    def enqueue(self, item):
        """Add to rear - O(1) time! ๐Ÿš€"""
        self.items.append(item)
    
    def dequeue(self):
        """Remove from front - O(1) time! โšก"""
        if self.items:
            return self.items.popleft()
        raise IndexError("Queue is empty! ๐Ÿ˜ฑ")
    
    def __len__(self):
        """Pythonic length check ๐Ÿ“"""
        return len(self.items)
    
    def __bool__(self):
        """Pythonic empty check โœจ"""
        return bool(self.items)

# ๐ŸŽจ Using our efficient queue
fast_queue = EfficientQueue()
fast_queue.enqueue("Lightning fast! โšก")
fast_queue.enqueue("Super efficient! ๐Ÿš€")

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Customer Service System

Letโ€™s build something real - a customer service queue system:

# ๐Ÿช Customer service queue system
from datetime import datetime
import time

class CustomerServiceQueue:
    def __init__(self):
        self.queue = deque()
        self.ticket_counter = 0
        self.served_count = 0
    
    def add_customer(self, name, issue):
        """New customer joins the queue ๐Ÿ™‹โ€โ™‚๏ธ"""
        self.ticket_counter += 1
        customer = {
            'ticket': f"T{self.ticket_counter:04d}",
            'name': name,
            'issue': issue,
            'joined_at': datetime.now(),
            'emoji': self._get_emoji(issue)
        }
        self.queue.append(customer)
        print(f"\n๐ŸŽซ {customer['emoji']} {name} joined queue")
        print(f"   Ticket: {customer['ticket']}")
        print(f"   Position: {len(self.queue)}")
        return customer['ticket']
    
    def serve_next_customer(self):
        """Serve the next customer in line ๐Ÿ›Ž๏ธ"""
        if not self.queue:
            print("๐Ÿ“ญ No customers waiting!")
            return None
        
        customer = self.queue.popleft()
        wait_time = (datetime.now() - customer['joined_at']).seconds
        self.served_count += 1
        
        print(f"\n๐ŸŽฏ Now serving: {customer['name']}")
        print(f"   Issue: {customer['issue']}")
        print(f"   Wait time: {wait_time} seconds")
        print(f"   Customers served today: {self.served_count}")
        
        return customer
    
    def show_queue_status(self):
        """Display current queue status ๐Ÿ“Š"""
        print(f"\n๐Ÿ“Š Queue Status:")
        print(f"   Waiting: {len(self.queue)} customers")
        print(f"   Served today: {self.served_count}")
        
        if self.queue:
            print("\n๐Ÿ‘ฅ Waiting customers:")
            for i, customer in enumerate(self.queue, 1):
                print(f"   {i}. {customer['emoji']} {customer['name']} - {customer['issue']}")
    
    def _get_emoji(self, issue):
        """Assign emoji based on issue type ๐ŸŽจ"""
        emojis = {
            'billing': '๐Ÿ’ณ',
            'technical': '๐Ÿ”ง',
            'complaint': '๐Ÿ˜ค',
            'inquiry': 'โ“',
            'return': '๐Ÿ“ฆ'
        }
        return emojis.get(issue.lower(), '๐Ÿ‘ค')

# ๐ŸŽฎ Simulate a busy day!
service = CustomerServiceQueue()

# Customers arrive
service.add_customer("Alice", "billing")
service.add_customer("Bob", "technical")
service.add_customer("Charlie", "complaint")
service.add_customer("Diana", "inquiry")

# Show queue
service.show_queue_status()

# Serve customers
time.sleep(1)  # Simulate processing time
service.serve_next_customer()
service.serve_next_customer()

# More customers arrive
service.add_customer("Eve", "return")
service.show_queue_status()

๐ŸŽฏ Try it yourself: Add a priority system where certain issues get faster service!

๐ŸŽฎ Example 2: Task Scheduler

Letโ€™s make a task scheduling system:

# ๐Ÿ”„ Task scheduler with queue
import heapq
from datetime import datetime, timedelta

class TaskScheduler:
    def __init__(self):
        self.task_queue = deque()
        self.priority_queue = []  # For urgent tasks
        self.completed_tasks = []
        self.task_id = 0
    
    def add_task(self, description, duration_mins=30, urgent=False):
        """Schedule a new task ๐Ÿ“"""
        self.task_id += 1
        task = {
            'id': self.task_id,
            'description': description,
            'duration': duration_mins,
            'scheduled_at': datetime.now(),
            'status': 'pending',
            'emoji': '๐Ÿ”ฅ' if urgent else '๐Ÿ“‹'
        }
        
        if urgent:
            # Urgent tasks go to priority queue
            heapq.heappush(self.priority_queue, (0, task))
            print(f"๐Ÿ”ฅ URGENT task added: {description}")
        else:
            # Normal tasks go to regular queue
            self.task_queue.append(task)
            print(f"๐Ÿ“‹ Task added: {description}")
        
        return task['id']
    
    def get_next_task(self):
        """Get the next task to work on ๐ŸŽฏ"""
        # Priority tasks first
        if self.priority_queue:
            _, task = heapq.heappop(self.priority_queue)
            return task
        
        # Then regular queue
        if self.task_queue:
            return self.task_queue.popleft()
        
        return None
    
    def complete_task(self, task):
        """Mark task as completed โœ…"""
        task['status'] = 'completed'
        task['completed_at'] = datetime.now()
        self.completed_tasks.append(task)
        
        duration = (task['completed_at'] - task['scheduled_at']).seconds // 60
        print(f"\nโœ… Task completed: {task['description']}")
        print(f"   Time taken: {duration} minutes")
    
    def show_dashboard(self):
        """Display task dashboard ๐Ÿ“Š"""
        print("\n๐ŸŽฏ Task Dashboard")
        print("=" * 50)
        
        # Pending tasks
        pending_count = len(self.task_queue) + len(self.priority_queue)
        print(f"๐Ÿ“‹ Pending tasks: {pending_count}")
        
        if self.priority_queue:
            print(f"   ๐Ÿ”ฅ Urgent: {len(self.priority_queue)}")
        if self.task_queue:
            print(f"   ๐Ÿ“‹ Regular: {len(self.task_queue)}")
        
        # Completed tasks
        print(f"\nโœ… Completed today: {len(self.completed_tasks)}")
        
        # Estimated time
        total_time = sum(t['duration'] for t in self.task_queue)
        total_time += sum(t[1]['duration'] for t in self.priority_queue)
        print(f"\nโฑ๏ธ Estimated time for pending: {total_time} minutes")
        
        # Next task preview
        next_task = self.peek_next_task()
        if next_task:
            print(f"\n๐Ÿ‘€ Next task: {next_task['emoji']} {next_task['description']}")
    
    def peek_next_task(self):
        """Preview next task without removing ๐Ÿ‘€"""
        if self.priority_queue:
            return self.priority_queue[0][1]
        if self.task_queue:
            return self.task_queue[0]
        return None
    
    def work_on_tasks(self, max_tasks=3):
        """Simulate working on tasks ๐Ÿ’ช"""
        print("\n๐Ÿƒ Starting work session...")
        
        for i in range(max_tasks):
            task = self.get_next_task()
            if not task:
                print("๐ŸŽ‰ All tasks completed!")
                break
            
            print(f"\nโšก Working on: {task['description']}")
            # Simulate work
            import time
            time.sleep(1)  # In real life, this would be actual work
            
            self.complete_task(task)
        
        self.show_dashboard()

# ๐ŸŽฎ Let's manage some tasks!
scheduler = TaskScheduler()

# Add various tasks
scheduler.add_task("Review code PR", 45)
scheduler.add_task("Fix critical bug", 60, urgent=True)
scheduler.add_task("Write documentation", 30)
scheduler.add_task("Team meeting", 60)
scheduler.add_task("Deploy hotfix", 20, urgent=True)

# Show initial state
scheduler.show_dashboard()

# Work on tasks
scheduler.work_on_tasks(3)

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Thread-Safe Queue

When youโ€™re ready to level up with concurrent programming:

# ๐ŸŽฏ Thread-safe queue for concurrent programs
import queue
import threading
import time
import random

class MessageQueue:
    def __init__(self, max_size=10):
        self.queue = queue.Queue(maxsize=max_size)
        self.processed_count = 0
        self.lock = threading.Lock()
    
    def producer(self, producer_id, num_messages=5):
        """Producer thread that adds messages ๐Ÿ“ค"""
        for i in range(num_messages):
            message = {
                'id': f"P{producer_id}-M{i}",
                'producer': producer_id,
                'data': f"Message {i} from Producer {producer_id}",
                'emoji': random.choice(['๐Ÿ“จ', '๐Ÿ’Œ', '๐Ÿ“ง', 'โœ‰๏ธ'])
            }
            
            try:
                self.queue.put(message, timeout=5)
                print(f"{message['emoji']} Producer {producer_id} sent: {message['id']}")
                time.sleep(random.uniform(0.1, 0.5))
            except queue.Full:
                print(f"โš ๏ธ Queue full! Producer {producer_id} waiting...")
    
    def consumer(self, consumer_id):
        """Consumer thread that processes messages ๐Ÿ“ฅ"""
        while True:
            try:
                message = self.queue.get(timeout=2)
                print(f"    ๐Ÿ’ซ Consumer {consumer_id} processing: {message['id']}")
                
                # Simulate processing
                time.sleep(random.uniform(0.2, 0.8))
                
                with self.lock:
                    self.processed_count += 1
                
                self.queue.task_done()
                
            except queue.Empty:
                print(f"    ๐Ÿ˜ด Consumer {consumer_id} idle...")
                break
    
    def run_simulation(self):
        """Run multi-threaded queue simulation ๐Ÿš€"""
        print("๐ŸŽฌ Starting message queue simulation...\n")
        
        # Create threads
        producers = []
        consumers = []
        
        # Start producers
        for i in range(3):
            t = threading.Thread(target=self.producer, args=(i, 4))
            producers.append(t)
            t.start()
        
        # Start consumers
        for i in range(2):
            t = threading.Thread(target=self.consumer, args=(i,))
            consumers.append(t)
            t.start()
        
        # Wait for producers
        for t in producers:
            t.join()
        
        # Wait for queue to be empty
        self.queue.join()
        
        # Wait for consumers
        for t in consumers:
            t.join()
        
        print(f"\nโœจ Simulation complete!")
        print(f"๐Ÿ“Š Total messages processed: {self.processed_count}")

# ๐Ÿช„ Run the concurrent queue
msg_queue = MessageQueue(max_size=5)
msg_queue.run_simulation()

๐Ÿ—๏ธ Priority Queue Implementation

For the brave developers - a custom priority queue:

# ๐Ÿš€ Custom priority queue with heap
class PriorityQueue:
    def __init__(self):
        self.heap = []
        self.index = 0  # For stable sorting
    
    def enqueue(self, item, priority):
        """Add item with priority (lower = higher priority) ๐ŸŽฏ"""
        # Use negative priority for min heap behavior
        # Add index for stable sorting of same priorities
        heapq.heappush(self.heap, (priority, self.index, item))
        self.index += 1
        print(f"โž• Added '{item}' with priority {priority}")
    
    def dequeue(self):
        """Remove highest priority item โšก"""
        if self.heap:
            priority, _, item = heapq.heappop(self.heap)
            print(f"โž– Removed '{item}' (priority {priority})")
            return item
        raise IndexError("Priority queue is empty! ๐Ÿ˜ฑ")
    
    def peek(self):
        """View highest priority item ๐Ÿ‘€"""
        if self.heap:
            return self.heap[0][2]
        return None
    
    def __len__(self):
        return len(self.heap)
    
    def __bool__(self):
        return bool(self.heap)

# ๐ŸŽฎ Emergency room triage system
class EmergencyRoom:
    def __init__(self):
        self.queue = PriorityQueue()
        self.patient_id = 0
    
    def admit_patient(self, name, condition):
        """Admit patient based on condition severity ๐Ÿš‘"""
        priorities = {
            'critical': 1,    # ๐Ÿšจ Highest priority
            'severe': 2,      # ๐Ÿ˜ฐ High priority  
            'moderate': 3,    # ๐Ÿ˜Ÿ Medium priority
            'minor': 4        # ๐Ÿค• Low priority
        }
        
        self.patient_id += 1
        priority = priorities.get(condition.lower(), 5)
        patient = f"{name} (ID: {self.patient_id}, {condition})"
        
        self.queue.enqueue(patient, priority)
        
        emojis = {'critical': '๐Ÿšจ', 'severe': '๐Ÿ˜ฐ', 'moderate': '๐Ÿ˜Ÿ', 'minor': '๐Ÿค•'}
        print(f"{emojis.get(condition.lower(), '๐Ÿฅ')} Patient admitted: {patient}")
    
    def treat_next_patient(self):
        """Treat the most critical patient ๐Ÿ‘จโ€โš•๏ธ"""
        if self.queue:
            patient = self.queue.dequeue()
            print(f"\n๐Ÿ‘จโ€โš•๏ธ Now treating: {patient}")
        else:
            print("โœ… No patients waiting!")

# Test emergency room
er = EmergencyRoom()
er.admit_patient("Alice", "minor")
er.admit_patient("Bob", "critical")
er.admit_patient("Charlie", "moderate")
er.admit_patient("Diana", "critical")
er.admit_patient("Eve", "severe")

print("\n๐Ÿฅ Treatment order:")
while len(er.queue) > 0:
    er.treat_next_patient()

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Using list.pop(0) for Large Queues

# โŒ Wrong way - O(n) time complexity!
class SlowQueue:
    def __init__(self):
        self.items = []
    
    def dequeue(self):
        return self.items.pop(0)  # ๐Ÿ’ฅ Shifts all elements!

# โœ… Correct way - Use deque for O(1) operations!
from collections import deque

class FastQueue:
    def __init__(self):
        self.items = deque()
    
    def dequeue(self):
        return self.items.popleft()  # โšก No shifting needed!

๐Ÿคฏ Pitfall 2: Not Handling Empty Queue

# โŒ Dangerous - Will crash on empty queue!
def process_queue(q):
    while True:
        item = q.dequeue()  # ๐Ÿ’ฅ IndexError when empty!
        process(item)

# โœ… Safe - Always check before dequeuing!
def process_queue_safely(q):
    while not q.is_empty():
        item = q.dequeue()
        process(item)
    print("โœ… Queue processing complete!")

# โœ… Even better - Use exception handling!
def process_with_exceptions(q):
    while True:
        try:
            item = q.dequeue()
            process(item)
        except IndexError:
            print("๐Ÿ“ญ Queue is empty, stopping...")
            break

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Choose the Right Implementation: Use collections.deque for most cases
  2. ๐Ÿ“ Handle Empty Queue: Always check or use exceptions
  3. ๐Ÿ›ก๏ธ Thread Safety: Use queue.Queue for concurrent programs
  4. ๐ŸŽจ Clear Method Names: Use enqueue/dequeue or put/get
  5. โœจ Implement Magic Methods: Support len(), bool(), and iteration

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Print Spooler System

Create a print spooler that manages print jobs:

๐Ÿ“‹ Requirements:

  • โœ… Add print jobs with document name, pages, and priority
  • ๐Ÿท๏ธ Support priority printing (urgent documents first)
  • ๐Ÿ‘ค Track which user submitted each job
  • ๐Ÿ“Š Show queue statistics and estimated wait time
  • ๐ŸŽจ Simulate printing with progress updates

๐Ÿš€ Bonus Points:

  • Add job cancellation feature
  • Implement printer pool (multiple printers)
  • Add print history with timestamps

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Complete print spooler system!
from collections import deque
import heapq
import time
from datetime import datetime, timedelta

class PrintJob:
    def __init__(self, job_id, user, document, pages, urgent=False):
        self.id = job_id
        self.user = user
        self.document = document
        self.pages = pages
        self.urgent = urgent
        self.submitted_at = datetime.now()
        self.status = 'queued'
        self.priority = 1 if urgent else 2
    
    def __lt__(self, other):
        # For priority queue comparison
        if self.priority != other.priority:
            return self.priority < other.priority
        return self.submitted_at < other.submitted_at

class PrintSpooler:
    def __init__(self, pages_per_minute=10):
        self.regular_queue = deque()
        self.priority_queue = []
        self.job_counter = 0
        self.pages_per_minute = pages_per_minute
        self.print_history = []
        self.active_job = None
    
    def add_job(self, user, document, pages, urgent=False):
        """Add a print job to the spooler ๐Ÿ–จ๏ธ"""
        self.job_counter += 1
        job = PrintJob(self.job_counter, user, document, pages, urgent)
        
        if urgent:
            heapq.heappush(self.priority_queue, job)
            print(f"๐Ÿ”ฅ URGENT print job added: {document}")
        else:
            self.regular_queue.append(job)
            print(f"๐Ÿ“„ Print job added: {document}")
        
        print(f"   User: {user}")
        print(f"   Pages: {pages}")
        print(f"   Job ID: #{job.id}")
        
        self.show_wait_time()
        return job.id
    
    def get_next_job(self):
        """Get the next job to print ๐Ÿ“‹"""
        # Priority jobs first
        if self.priority_queue:
            return heapq.heappop(self.priority_queue)
        
        # Then regular jobs
        if self.regular_queue:
            return self.regular_queue.popleft()
        
        return None
    
    def print_job(self, job):
        """Simulate printing a job ๐Ÿ–จ๏ธ"""
        self.active_job = job
        job.status = 'printing'
        print(f"\n๐Ÿ–จ๏ธ Now printing: {job.document}")
        print(f"   User: {job.user}")
        print(f"   Pages: {job.pages}")
        
        # Simulate printing with progress
        for page in range(1, job.pages + 1):
            progress = (page / job.pages) * 100
            bar_length = int(progress / 5)
            bar = 'โ–ˆ' * bar_length + 'โ–‘' * (20 - bar_length)
            print(f"\r   Progress: [{bar}] {progress:.0f}% - Page {page}/{job.pages}", end='')
            time.sleep(60 / self.pages_per_minute / 10)  # Simulate time
        
        print("\n   โœ… Printing complete!")
        
        # Update job status
        job.status = 'completed'
        job.completed_at = datetime.now()
        self.print_history.append(job)
        self.active_job = None
    
    def show_queue_status(self):
        """Display current queue status ๐Ÿ“Š"""
        total_jobs = len(self.regular_queue) + len(self.priority_queue)
        
        print(f"\n๐Ÿ“Š Print Queue Status")
        print("=" * 50)
        print(f"๐Ÿ–จ๏ธ Jobs in queue: {total_jobs}")
        
        if self.priority_queue:
            print(f"   ๐Ÿ”ฅ Urgent jobs: {len(self.priority_queue)}")
        if self.regular_queue:
            print(f"   ๐Ÿ“„ Regular jobs: {len(self.regular_queue)}")
        
        if self.active_job:
            print(f"\nโ–ถ๏ธ Currently printing: {self.active_job.document}")
        
        # Show next few jobs
        if total_jobs > 0:
            print("\n๐Ÿ“‹ Next in queue:")
            
            # Create temporary list to show jobs without removing
            temp_priority = list(self.priority_queue)
            temp_regular = list(self.regular_queue)
            
            count = 0
            while (temp_priority or temp_regular) and count < 3:
                if temp_priority and (not temp_regular or temp_priority[0] < temp_regular[0]):
                    job = heapq.heappop(temp_priority)
                else:
                    job = temp_regular.popleft() if temp_regular else None
                
                if job:
                    emoji = "๐Ÿ”ฅ" if job.urgent else "๐Ÿ“„"
                    print(f"   {count + 1}. {emoji} {job.document} ({job.pages} pages) - {job.user}")
                    count += 1
    
    def show_wait_time(self):
        """Calculate and show estimated wait time โฑ๏ธ"""
        total_pages = sum(job.pages for job in self.regular_queue)
        total_pages += sum(job.pages for job in self.priority_queue)
        
        if self.active_job:
            # Estimate remaining pages for current job
            total_pages += self.active_job.pages // 2
        
        wait_minutes = total_pages / self.pages_per_minute
        
        if wait_minutes > 0:
            print(f"โฑ๏ธ Estimated wait time: {wait_minutes:.1f} minutes")
    
    def cancel_job(self, job_id):
        """Cancel a print job ๐Ÿšซ"""
        # Check priority queue
        for i, job in enumerate(self.priority_queue):
            if job.id == job_id:
                self.priority_queue.pop(i)
                heapq.heapify(self.priority_queue)
                print(f"โŒ Cancelled job #{job_id}: {job.document}")
                return True
        
        # Check regular queue
        for i, job in enumerate(self.regular_queue):
            if job.id == job_id:
                del self.regular_queue[i]
                print(f"โŒ Cancelled job #{job_id}: {job.document}")
                return True
        
        print(f"โš ๏ธ Job #{job_id} not found in queue")
        return False
    
    def process_queue(self, max_jobs=None):
        """Process print jobs ๐Ÿƒ"""
        processed = 0
        
        while True:
            if max_jobs and processed >= max_jobs:
                break
            
            job = self.get_next_job()
            if not job:
                print("\nโœ… All print jobs completed!")
                break
            
            self.print_job(job)
            processed += 1
            
            # Show remaining queue
            if self.regular_queue or self.priority_queue:
                self.show_queue_status()

# ๐ŸŽฎ Test the print spooler!
spooler = PrintSpooler(pages_per_minute=20)

# Add various print jobs
job1 = spooler.add_job("Alice", "Report.pdf", 10)
job2 = spooler.add_job("Bob", "Presentation.pptx", 25, urgent=True)
job3 = spooler.add_job("Charlie", "Invoice.docx", 3)
job4 = spooler.add_job("Diana", "Contract.pdf", 8, urgent=True)
job5 = spooler.add_job("Eve", "Manual.pdf", 50)

# Show queue status
spooler.show_queue_status()

# Cancel a job
spooler.cancel_job(job5)

# Process some jobs
spooler.process_queue(3)

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Implement queues from scratch or using built-in tools ๐Ÿ’ช
  • โœ… Choose the right queue type for your needs ๐Ÿ›ก๏ธ
  • โœ… Build real-world applications using queues ๐ŸŽฏ
  • โœ… Handle concurrent access with thread-safe queues ๐Ÿ›
  • โœ… Optimize performance with proper data structures ๐Ÿš€

Remember: Queues are everywhere in programming - from handling web requests to managing background tasks. Master them and youโ€™ll write better, more scalable code! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered queue implementation!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the print spooler exercise
  2. ๐Ÿ—๏ธ Build a message queue system for a chat application
  3. ๐Ÿ“š Explore circular queues and double-ended queues
  4. ๐ŸŒŸ Learn about distributed message queues like RabbitMQ

Remember: Every complex system uses queues somewhere. From operating systems to web servers, queues keep things running smoothly. Keep coding, keep learning, and most importantly, have fun! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ