Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand queue fundamentals ๐ฏ
- Apply queues in real projects ๐๏ธ
- Debug common queue issues ๐
- Write clean, Pythonic queue code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on Queue Implementation! ๐ In this guide, weโll explore the FIFO (First-In-First-Out) data structure that powers everything from print spoolers to web servers.
Youโll discover how queues can transform your Python development experience. Whether youโre building task managers ๐, message systems ๐ฌ, or process schedulers ๐, understanding queues is essential for writing efficient, scalable code.
By the end of this tutorial, youโll feel confident implementing and using queues in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Queues
๐ค What is a Queue?
A queue is like a line at your favorite coffee shop โ. Think of it as a waiting line where the first person to arrive is the first person to be served - thatโs FIFO (First-In-First-Out) in action!
In Python terms, a queue is a linear data structure that follows the FIFO principle. This means you can:
- โจ Add elements to the rear (enqueue)
- ๐ Remove elements from the front (dequeue)
- ๐ก๏ธ Process items in the order they arrived
๐ก Why Use Queues?
Hereโs why developers love queues:
- Fair Processing ๐: First come, first served - perfect fairness!
- Order Preservation ๐ป: Maintains the sequence of operations
- Decoupling ๐: Separates producers from consumers
- Buffering ๐ง: Handles varying processing speeds
Real-world example: Imagine building a ticket booking system ๐ซ. With queues, you can ensure customers are served in the order they arrived - no jumping the line!
๐ง Basic Syntax and Usage
๐ Simple Queue Implementation
Letโs start with a friendly example using Pythonโs built-in list:
# ๐ Hello, Queue!
class SimpleQueue:
def __init__(self):
self.items = [] # ๐ฆ Our queue storage
def enqueue(self, item):
"""Add item to the rear of the queue ๐ฏ"""
self.items.append(item)
print(f"โ Added '{item}' to queue!")
def dequeue(self):
"""Remove item from the front of the queue ๐"""
if not self.is_empty():
item = self.items.pop(0)
print(f"โ Removed '{item}' from queue!")
return item
else:
print("โ ๏ธ Queue is empty!")
return None
def is_empty(self):
"""Check if queue is empty ๐"""
return len(self.items) == 0
def size(self):
"""Get queue size ๐"""
return len(self.items)
def peek(self):
"""View front item without removing ๐"""
if not self.is_empty():
return self.items[0]
return None
# ๐ฎ Let's try it!
queue = SimpleQueue()
queue.enqueue("First customer ๐ฅ")
queue.enqueue("Second customer ๐ฅ")
queue.enqueue("Third customer ๐ฅ")
queue.dequeue() # First one out!
๐ก Explanation: Notice how we add items to the end but remove from the beginning - thatโs FIFO in action!
๐ฏ Using Pythonโs deque
Hereโs a more efficient implementation using collections.deque
:
from collections import deque
# ๐๏ธ Efficient queue with deque
class EfficientQueue:
def __init__(self):
self.items = deque() # โก Optimized for both ends
def enqueue(self, item):
"""Add to rear - O(1) time! ๐"""
self.items.append(item)
def dequeue(self):
"""Remove from front - O(1) time! โก"""
if self.items:
return self.items.popleft()
raise IndexError("Queue is empty! ๐ฑ")
def __len__(self):
"""Pythonic length check ๐"""
return len(self.items)
def __bool__(self):
"""Pythonic empty check โจ"""
return bool(self.items)
# ๐จ Using our efficient queue
fast_queue = EfficientQueue()
fast_queue.enqueue("Lightning fast! โก")
fast_queue.enqueue("Super efficient! ๐")
๐ก Practical Examples
๐ Example 1: Customer Service System
Letโs build something real - a customer service queue system:
# ๐ช Customer service queue system
from datetime import datetime
import time
class CustomerServiceQueue:
def __init__(self):
self.queue = deque()
self.ticket_counter = 0
self.served_count = 0
def add_customer(self, name, issue):
"""New customer joins the queue ๐โโ๏ธ"""
self.ticket_counter += 1
customer = {
'ticket': f"T{self.ticket_counter:04d}",
'name': name,
'issue': issue,
'joined_at': datetime.now(),
'emoji': self._get_emoji(issue)
}
self.queue.append(customer)
print(f"\n๐ซ {customer['emoji']} {name} joined queue")
print(f" Ticket: {customer['ticket']}")
print(f" Position: {len(self.queue)}")
return customer['ticket']
def serve_next_customer(self):
"""Serve the next customer in line ๐๏ธ"""
if not self.queue:
print("๐ญ No customers waiting!")
return None
customer = self.queue.popleft()
wait_time = (datetime.now() - customer['joined_at']).seconds
self.served_count += 1
print(f"\n๐ฏ Now serving: {customer['name']}")
print(f" Issue: {customer['issue']}")
print(f" Wait time: {wait_time} seconds")
print(f" Customers served today: {self.served_count}")
return customer
def show_queue_status(self):
"""Display current queue status ๐"""
print(f"\n๐ Queue Status:")
print(f" Waiting: {len(self.queue)} customers")
print(f" Served today: {self.served_count}")
if self.queue:
print("\n๐ฅ Waiting customers:")
for i, customer in enumerate(self.queue, 1):
print(f" {i}. {customer['emoji']} {customer['name']} - {customer['issue']}")
def _get_emoji(self, issue):
"""Assign emoji based on issue type ๐จ"""
emojis = {
'billing': '๐ณ',
'technical': '๐ง',
'complaint': '๐ค',
'inquiry': 'โ',
'return': '๐ฆ'
}
return emojis.get(issue.lower(), '๐ค')
# ๐ฎ Simulate a busy day!
service = CustomerServiceQueue()
# Customers arrive
service.add_customer("Alice", "billing")
service.add_customer("Bob", "technical")
service.add_customer("Charlie", "complaint")
service.add_customer("Diana", "inquiry")
# Show queue
service.show_queue_status()
# Serve customers
time.sleep(1) # Simulate processing time
service.serve_next_customer()
service.serve_next_customer()
# More customers arrive
service.add_customer("Eve", "return")
service.show_queue_status()
๐ฏ Try it yourself: Add a priority system where certain issues get faster service!
๐ฎ Example 2: Task Scheduler
Letโs make a task scheduling system:
# ๐ Task scheduler with queue
import heapq
from datetime import datetime, timedelta
class TaskScheduler:
def __init__(self):
self.task_queue = deque()
self.priority_queue = [] # For urgent tasks
self.completed_tasks = []
self.task_id = 0
def add_task(self, description, duration_mins=30, urgent=False):
"""Schedule a new task ๐"""
self.task_id += 1
task = {
'id': self.task_id,
'description': description,
'duration': duration_mins,
'scheduled_at': datetime.now(),
'status': 'pending',
'emoji': '๐ฅ' if urgent else '๐'
}
if urgent:
# Urgent tasks go to priority queue
heapq.heappush(self.priority_queue, (0, task))
print(f"๐ฅ URGENT task added: {description}")
else:
# Normal tasks go to regular queue
self.task_queue.append(task)
print(f"๐ Task added: {description}")
return task['id']
def get_next_task(self):
"""Get the next task to work on ๐ฏ"""
# Priority tasks first
if self.priority_queue:
_, task = heapq.heappop(self.priority_queue)
return task
# Then regular queue
if self.task_queue:
return self.task_queue.popleft()
return None
def complete_task(self, task):
"""Mark task as completed โ
"""
task['status'] = 'completed'
task['completed_at'] = datetime.now()
self.completed_tasks.append(task)
duration = (task['completed_at'] - task['scheduled_at']).seconds // 60
print(f"\nโ
Task completed: {task['description']}")
print(f" Time taken: {duration} minutes")
def show_dashboard(self):
"""Display task dashboard ๐"""
print("\n๐ฏ Task Dashboard")
print("=" * 50)
# Pending tasks
pending_count = len(self.task_queue) + len(self.priority_queue)
print(f"๐ Pending tasks: {pending_count}")
if self.priority_queue:
print(f" ๐ฅ Urgent: {len(self.priority_queue)}")
if self.task_queue:
print(f" ๐ Regular: {len(self.task_queue)}")
# Completed tasks
print(f"\nโ
Completed today: {len(self.completed_tasks)}")
# Estimated time
total_time = sum(t['duration'] for t in self.task_queue)
total_time += sum(t[1]['duration'] for t in self.priority_queue)
print(f"\nโฑ๏ธ Estimated time for pending: {total_time} minutes")
# Next task preview
next_task = self.peek_next_task()
if next_task:
print(f"\n๐ Next task: {next_task['emoji']} {next_task['description']}")
def peek_next_task(self):
"""Preview next task without removing ๐"""
if self.priority_queue:
return self.priority_queue[0][1]
if self.task_queue:
return self.task_queue[0]
return None
def work_on_tasks(self, max_tasks=3):
"""Simulate working on tasks ๐ช"""
print("\n๐ Starting work session...")
for i in range(max_tasks):
task = self.get_next_task()
if not task:
print("๐ All tasks completed!")
break
print(f"\nโก Working on: {task['description']}")
# Simulate work
import time
time.sleep(1) # In real life, this would be actual work
self.complete_task(task)
self.show_dashboard()
# ๐ฎ Let's manage some tasks!
scheduler = TaskScheduler()
# Add various tasks
scheduler.add_task("Review code PR", 45)
scheduler.add_task("Fix critical bug", 60, urgent=True)
scheduler.add_task("Write documentation", 30)
scheduler.add_task("Team meeting", 60)
scheduler.add_task("Deploy hotfix", 20, urgent=True)
# Show initial state
scheduler.show_dashboard()
# Work on tasks
scheduler.work_on_tasks(3)
๐ Advanced Concepts
๐งโโ๏ธ Thread-Safe Queue
When youโre ready to level up with concurrent programming:
# ๐ฏ Thread-safe queue for concurrent programs
import queue
import threading
import time
import random
class MessageQueue:
def __init__(self, max_size=10):
self.queue = queue.Queue(maxsize=max_size)
self.processed_count = 0
self.lock = threading.Lock()
def producer(self, producer_id, num_messages=5):
"""Producer thread that adds messages ๐ค"""
for i in range(num_messages):
message = {
'id': f"P{producer_id}-M{i}",
'producer': producer_id,
'data': f"Message {i} from Producer {producer_id}",
'emoji': random.choice(['๐จ', '๐', '๐ง', 'โ๏ธ'])
}
try:
self.queue.put(message, timeout=5)
print(f"{message['emoji']} Producer {producer_id} sent: {message['id']}")
time.sleep(random.uniform(0.1, 0.5))
except queue.Full:
print(f"โ ๏ธ Queue full! Producer {producer_id} waiting...")
def consumer(self, consumer_id):
"""Consumer thread that processes messages ๐ฅ"""
while True:
try:
message = self.queue.get(timeout=2)
print(f" ๐ซ Consumer {consumer_id} processing: {message['id']}")
# Simulate processing
time.sleep(random.uniform(0.2, 0.8))
with self.lock:
self.processed_count += 1
self.queue.task_done()
except queue.Empty:
print(f" ๐ด Consumer {consumer_id} idle...")
break
def run_simulation(self):
"""Run multi-threaded queue simulation ๐"""
print("๐ฌ Starting message queue simulation...\n")
# Create threads
producers = []
consumers = []
# Start producers
for i in range(3):
t = threading.Thread(target=self.producer, args=(i, 4))
producers.append(t)
t.start()
# Start consumers
for i in range(2):
t = threading.Thread(target=self.consumer, args=(i,))
consumers.append(t)
t.start()
# Wait for producers
for t in producers:
t.join()
# Wait for queue to be empty
self.queue.join()
# Wait for consumers
for t in consumers:
t.join()
print(f"\nโจ Simulation complete!")
print(f"๐ Total messages processed: {self.processed_count}")
# ๐ช Run the concurrent queue
msg_queue = MessageQueue(max_size=5)
msg_queue.run_simulation()
๐๏ธ Priority Queue Implementation
For the brave developers - a custom priority queue:
# ๐ Custom priority queue with heap
class PriorityQueue:
def __init__(self):
self.heap = []
self.index = 0 # For stable sorting
def enqueue(self, item, priority):
"""Add item with priority (lower = higher priority) ๐ฏ"""
# Use negative priority for min heap behavior
# Add index for stable sorting of same priorities
heapq.heappush(self.heap, (priority, self.index, item))
self.index += 1
print(f"โ Added '{item}' with priority {priority}")
def dequeue(self):
"""Remove highest priority item โก"""
if self.heap:
priority, _, item = heapq.heappop(self.heap)
print(f"โ Removed '{item}' (priority {priority})")
return item
raise IndexError("Priority queue is empty! ๐ฑ")
def peek(self):
"""View highest priority item ๐"""
if self.heap:
return self.heap[0][2]
return None
def __len__(self):
return len(self.heap)
def __bool__(self):
return bool(self.heap)
# ๐ฎ Emergency room triage system
class EmergencyRoom:
def __init__(self):
self.queue = PriorityQueue()
self.patient_id = 0
def admit_patient(self, name, condition):
"""Admit patient based on condition severity ๐"""
priorities = {
'critical': 1, # ๐จ Highest priority
'severe': 2, # ๐ฐ High priority
'moderate': 3, # ๐ Medium priority
'minor': 4 # ๐ค Low priority
}
self.patient_id += 1
priority = priorities.get(condition.lower(), 5)
patient = f"{name} (ID: {self.patient_id}, {condition})"
self.queue.enqueue(patient, priority)
emojis = {'critical': '๐จ', 'severe': '๐ฐ', 'moderate': '๐', 'minor': '๐ค'}
print(f"{emojis.get(condition.lower(), '๐ฅ')} Patient admitted: {patient}")
def treat_next_patient(self):
"""Treat the most critical patient ๐จโโ๏ธ"""
if self.queue:
patient = self.queue.dequeue()
print(f"\n๐จโโ๏ธ Now treating: {patient}")
else:
print("โ
No patients waiting!")
# Test emergency room
er = EmergencyRoom()
er.admit_patient("Alice", "minor")
er.admit_patient("Bob", "critical")
er.admit_patient("Charlie", "moderate")
er.admit_patient("Diana", "critical")
er.admit_patient("Eve", "severe")
print("\n๐ฅ Treatment order:")
while len(er.queue) > 0:
er.treat_next_patient()
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Using list.pop(0) for Large Queues
# โ Wrong way - O(n) time complexity!
class SlowQueue:
def __init__(self):
self.items = []
def dequeue(self):
return self.items.pop(0) # ๐ฅ Shifts all elements!
# โ
Correct way - Use deque for O(1) operations!
from collections import deque
class FastQueue:
def __init__(self):
self.items = deque()
def dequeue(self):
return self.items.popleft() # โก No shifting needed!
๐คฏ Pitfall 2: Not Handling Empty Queue
# โ Dangerous - Will crash on empty queue!
def process_queue(q):
while True:
item = q.dequeue() # ๐ฅ IndexError when empty!
process(item)
# โ
Safe - Always check before dequeuing!
def process_queue_safely(q):
while not q.is_empty():
item = q.dequeue()
process(item)
print("โ
Queue processing complete!")
# โ
Even better - Use exception handling!
def process_with_exceptions(q):
while True:
try:
item = q.dequeue()
process(item)
except IndexError:
print("๐ญ Queue is empty, stopping...")
break
๐ ๏ธ Best Practices
- ๐ฏ Choose the Right Implementation: Use
collections.deque
for most cases - ๐ Handle Empty Queue: Always check or use exceptions
- ๐ก๏ธ Thread Safety: Use
queue.Queue
for concurrent programs - ๐จ Clear Method Names: Use
enqueue/dequeue
orput/get
- โจ Implement Magic Methods: Support
len()
,bool()
, and iteration
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Print Spooler System
Create a print spooler that manages print jobs:
๐ Requirements:
- โ Add print jobs with document name, pages, and priority
- ๐ท๏ธ Support priority printing (urgent documents first)
- ๐ค Track which user submitted each job
- ๐ Show queue statistics and estimated wait time
- ๐จ Simulate printing with progress updates
๐ Bonus Points:
- Add job cancellation feature
- Implement printer pool (multiple printers)
- Add print history with timestamps
๐ก Solution
๐ Click to see solution
# ๐ฏ Complete print spooler system!
from collections import deque
import heapq
import time
from datetime import datetime, timedelta
class PrintJob:
def __init__(self, job_id, user, document, pages, urgent=False):
self.id = job_id
self.user = user
self.document = document
self.pages = pages
self.urgent = urgent
self.submitted_at = datetime.now()
self.status = 'queued'
self.priority = 1 if urgent else 2
def __lt__(self, other):
# For priority queue comparison
if self.priority != other.priority:
return self.priority < other.priority
return self.submitted_at < other.submitted_at
class PrintSpooler:
def __init__(self, pages_per_minute=10):
self.regular_queue = deque()
self.priority_queue = []
self.job_counter = 0
self.pages_per_minute = pages_per_minute
self.print_history = []
self.active_job = None
def add_job(self, user, document, pages, urgent=False):
"""Add a print job to the spooler ๐จ๏ธ"""
self.job_counter += 1
job = PrintJob(self.job_counter, user, document, pages, urgent)
if urgent:
heapq.heappush(self.priority_queue, job)
print(f"๐ฅ URGENT print job added: {document}")
else:
self.regular_queue.append(job)
print(f"๐ Print job added: {document}")
print(f" User: {user}")
print(f" Pages: {pages}")
print(f" Job ID: #{job.id}")
self.show_wait_time()
return job.id
def get_next_job(self):
"""Get the next job to print ๐"""
# Priority jobs first
if self.priority_queue:
return heapq.heappop(self.priority_queue)
# Then regular jobs
if self.regular_queue:
return self.regular_queue.popleft()
return None
def print_job(self, job):
"""Simulate printing a job ๐จ๏ธ"""
self.active_job = job
job.status = 'printing'
print(f"\n๐จ๏ธ Now printing: {job.document}")
print(f" User: {job.user}")
print(f" Pages: {job.pages}")
# Simulate printing with progress
for page in range(1, job.pages + 1):
progress = (page / job.pages) * 100
bar_length = int(progress / 5)
bar = 'โ' * bar_length + 'โ' * (20 - bar_length)
print(f"\r Progress: [{bar}] {progress:.0f}% - Page {page}/{job.pages}", end='')
time.sleep(60 / self.pages_per_minute / 10) # Simulate time
print("\n โ
Printing complete!")
# Update job status
job.status = 'completed'
job.completed_at = datetime.now()
self.print_history.append(job)
self.active_job = None
def show_queue_status(self):
"""Display current queue status ๐"""
total_jobs = len(self.regular_queue) + len(self.priority_queue)
print(f"\n๐ Print Queue Status")
print("=" * 50)
print(f"๐จ๏ธ Jobs in queue: {total_jobs}")
if self.priority_queue:
print(f" ๐ฅ Urgent jobs: {len(self.priority_queue)}")
if self.regular_queue:
print(f" ๐ Regular jobs: {len(self.regular_queue)}")
if self.active_job:
print(f"\nโถ๏ธ Currently printing: {self.active_job.document}")
# Show next few jobs
if total_jobs > 0:
print("\n๐ Next in queue:")
# Create temporary list to show jobs without removing
temp_priority = list(self.priority_queue)
temp_regular = list(self.regular_queue)
count = 0
while (temp_priority or temp_regular) and count < 3:
if temp_priority and (not temp_regular or temp_priority[0] < temp_regular[0]):
job = heapq.heappop(temp_priority)
else:
job = temp_regular.popleft() if temp_regular else None
if job:
emoji = "๐ฅ" if job.urgent else "๐"
print(f" {count + 1}. {emoji} {job.document} ({job.pages} pages) - {job.user}")
count += 1
def show_wait_time(self):
"""Calculate and show estimated wait time โฑ๏ธ"""
total_pages = sum(job.pages for job in self.regular_queue)
total_pages += sum(job.pages for job in self.priority_queue)
if self.active_job:
# Estimate remaining pages for current job
total_pages += self.active_job.pages // 2
wait_minutes = total_pages / self.pages_per_minute
if wait_minutes > 0:
print(f"โฑ๏ธ Estimated wait time: {wait_minutes:.1f} minutes")
def cancel_job(self, job_id):
"""Cancel a print job ๐ซ"""
# Check priority queue
for i, job in enumerate(self.priority_queue):
if job.id == job_id:
self.priority_queue.pop(i)
heapq.heapify(self.priority_queue)
print(f"โ Cancelled job #{job_id}: {job.document}")
return True
# Check regular queue
for i, job in enumerate(self.regular_queue):
if job.id == job_id:
del self.regular_queue[i]
print(f"โ Cancelled job #{job_id}: {job.document}")
return True
print(f"โ ๏ธ Job #{job_id} not found in queue")
return False
def process_queue(self, max_jobs=None):
"""Process print jobs ๐"""
processed = 0
while True:
if max_jobs and processed >= max_jobs:
break
job = self.get_next_job()
if not job:
print("\nโ
All print jobs completed!")
break
self.print_job(job)
processed += 1
# Show remaining queue
if self.regular_queue or self.priority_queue:
self.show_queue_status()
# ๐ฎ Test the print spooler!
spooler = PrintSpooler(pages_per_minute=20)
# Add various print jobs
job1 = spooler.add_job("Alice", "Report.pdf", 10)
job2 = spooler.add_job("Bob", "Presentation.pptx", 25, urgent=True)
job3 = spooler.add_job("Charlie", "Invoice.docx", 3)
job4 = spooler.add_job("Diana", "Contract.pdf", 8, urgent=True)
job5 = spooler.add_job("Eve", "Manual.pdf", 50)
# Show queue status
spooler.show_queue_status()
# Cancel a job
spooler.cancel_job(job5)
# Process some jobs
spooler.process_queue(3)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Implement queues from scratch or using built-in tools ๐ช
- โ Choose the right queue type for your needs ๐ก๏ธ
- โ Build real-world applications using queues ๐ฏ
- โ Handle concurrent access with thread-safe queues ๐
- โ Optimize performance with proper data structures ๐
Remember: Queues are everywhere in programming - from handling web requests to managing background tasks. Master them and youโll write better, more scalable code! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered queue implementation!
Hereโs what to do next:
- ๐ป Practice with the print spooler exercise
- ๐๏ธ Build a message queue system for a chat application
- ๐ Explore circular queues and double-ended queues
- ๐ Learn about distributed message queues like RabbitMQ
Remember: Every complex system uses queues somewhere. From operating systems to web servers, queues keep things running smoothly. Keep coding, keep learning, and most importantly, have fun! ๐
Happy coding! ๐๐โจ