Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting deep dive into advanced design patterns in Python! ๐ In this guide, weโll explore sophisticated patterns that go beyond the classic Gang of Four patterns, taking your Python skills to the next level.
Youโll discover how advanced design patterns can transform your code architecture, making it more flexible, maintainable, and elegant. Whether youโre building microservices ๐, event-driven systems ๐, or complex applications ๐๏ธ, mastering these patterns is essential for writing professional-grade Python code.
By the end of this tutorial, youโll feel confident implementing advanced patterns like the Repository pattern, CQRS, Event Sourcing, and more in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Advanced Design Patterns
๐ค What are Advanced Design Patterns?
Advanced design patterns are like sophisticated blueprints for solving complex architectural problems ๐จ. Think of them as specialized tools in a master craftsmanโs workshop - each designed for specific, intricate tasks that basic tools canโt handle elegantly.
In Python terms, these patterns help you build systems that can scale, adapt to change, and handle complex business logic with grace. This means you can:
- โจ Build systems that scale effortlessly
- ๐ Separate concerns for cleaner architecture
- ๐ก๏ธ Handle complex domain logic elegantly
- ๐ Create event-driven architectures
- ๐ Implement sophisticated data access patterns
๐ก Why Use Advanced Patterns?
Hereโs why senior developers love advanced patterns:
- Scalability ๐: Design systems that grow with your needs
- Maintainability ๐ง: Keep complex systems manageable
- Testability ๐งช: Write unit tests with ease
- Flexibility ๐ฏ: Adapt to changing requirements
- Performance โก: Optimize critical paths effectively
Real-world example: Imagine building an e-commerce platform ๐. With advanced patterns like Repository and CQRS, you can separate your read and write operations, implement caching strategies, and handle millions of transactions efficiently!
๐ง Basic Syntax and Usage
๐ Repository Pattern
Letโs start with the Repository pattern - your data access superhero:
# ๐ Hello, Repository Pattern!
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
import json
# ๐จ Define our domain model
class Product:
def __init__(self, id: str, name: str, price: float, stock: int):
self.id = id
self.name = name
self.price = price
self.stock = stock
self.emoji = "๐ฆ" # Every product needs an emoji!
def __repr__(self):
return f"{self.emoji} Product({self.name}, ${self.price})"
# ๐ก๏ธ Abstract repository interface
class Repository(ABC):
@abstractmethod
def add(self, entity: Any) -> None:
pass
@abstractmethod
def get(self, id: str) -> Optional[Any]:
pass
@abstractmethod
def get_all(self) -> List[Any]:
pass
@abstractmethod
def update(self, entity: Any) -> None:
pass
@abstractmethod
def delete(self, id: str) -> None:
pass
# ๐ Concrete implementation
class ProductRepository(Repository):
def __init__(self):
self._storage: Dict[str, Product] = {}
print("๐ช Product repository initialized!")
def add(self, product: Product) -> None:
self._storage[product.id] = product
print(f"โ
Added {product.emoji} {product.name} to repository!")
def get(self, id: str) -> Optional[Product]:
return self._storage.get(id)
def get_all(self) -> List[Product]:
return list(self._storage.values())
def update(self, product: Product) -> None:
if product.id in self._storage:
self._storage[product.id] = product
print(f"๐ Updated {product.emoji} {product.name}")
def delete(self, id: str) -> None:
if id in self._storage:
product = self._storage.pop(id)
print(f"๐๏ธ Deleted {product.emoji} {product.name}")
๐ก Explanation: The Repository pattern abstracts data access, making it easy to switch between different storage mechanisms (database, file, memory) without changing your business logic!
๐ฏ Unit of Work Pattern
Now letโs combine it with Unit of Work for transaction management:
# ๐๏ธ Unit of Work pattern for transaction management
class UnitOfWork:
def __init__(self):
self._products = ProductRepository()
self._new: List[Product] = []
self._dirty: List[Product] = []
self._removed: List[str] = []
print("๐ผ Unit of Work initialized!")
# ๐จ Track new entities
def register_new(self, product: Product):
self._new.append(product)
print(f"๐ Registered new: {product.emoji} {product.name}")
# ๐ Track modified entities
def register_dirty(self, product: Product):
if product not in self._dirty:
self._dirty.append(product)
print(f"โ๏ธ Registered dirty: {product.emoji} {product.name}")
# ๐๏ธ Track removed entities
def register_removed(self, id: str):
self._removed.append(id)
print(f"๐๏ธ Registered for removal: {id}")
# ๐พ Commit all changes
def commit(self):
print("๐ Committing changes...")
# Add new products
for product in self._new:
self._products.add(product)
# Update modified products
for product in self._dirty:
self._products.update(product)
# Remove deleted products
for id in self._removed:
self._products.delete(id)
# Clear tracking lists
self._new.clear()
self._dirty.clear()
self._removed.clear()
print("โ
All changes committed!")
# ๐ Rollback changes
def rollback(self):
self._new.clear()
self._dirty.clear()
self._removed.clear()
print("โฉ๏ธ Changes rolled back!")
๐ก Practical Examples
๐ Example 1: E-commerce with CQRS Pattern
Letโs build a CQRS (Command Query Responsibility Segregation) system:
# ๐๏ธ CQRS Pattern - Separate read and write models
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
import uuid
# ๐ Commands (write operations)
@dataclass
class Command:
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
@dataclass
class CreateOrderCommand(Command):
customer_id: str
items: List[Dict[str, Any]]
emoji: str = "๐"
@dataclass
class UpdateOrderStatusCommand(Command):
order_id: str
status: str
emoji: str = "๐ฆ"
# ๐ Queries (read operations)
@dataclass
class Query:
pass
@dataclass
class GetOrderByIdQuery(Query):
order_id: str
@dataclass
class GetOrdersByCustomerQuery(Query):
customer_id: str
# ๐ฏ Command handlers
class CommandHandler:
def __init__(self):
self.events = [] # Event store
print("โก Command handler ready!")
def handle(self, command: Command):
if isinstance(command, CreateOrderCommand):
order_id = str(uuid.uuid4())
event = {
'type': 'OrderCreated',
'order_id': order_id,
'customer_id': command.customer_id,
'items': command.items,
'timestamp': command.timestamp,
'emoji': command.emoji
}
self.events.append(event)
print(f"{command.emoji} Order {order_id} created!")
return order_id
elif isinstance(command, UpdateOrderStatusCommand):
event = {
'type': 'OrderStatusUpdated',
'order_id': command.order_id,
'status': command.status,
'timestamp': command.timestamp,
'emoji': command.emoji
}
self.events.append(event)
print(f"{command.emoji} Order {command.order_id} status: {command.status}")
# ๐ Query handlers with read model
class QueryHandler:
def __init__(self, events):
self.events = events
self._orders = {} # Read model
self._rebuild_read_model()
print("๐ Query handler ready!")
def _rebuild_read_model(self):
# Rebuild read model from events
for event in self.events:
if event['type'] == 'OrderCreated':
self._orders[event['order_id']] = {
'id': event['order_id'],
'customer_id': event['customer_id'],
'items': event['items'],
'status': 'created',
'created_at': event['timestamp']
}
elif event['type'] == 'OrderStatusUpdated':
if event['order_id'] in self._orders:
self._orders[event['order_id']]['status'] = event['status']
def handle(self, query: Query):
if isinstance(query, GetOrderByIdQuery):
return self._orders.get(query.order_id)
elif isinstance(query, GetOrdersByCustomerQuery):
return [
order for order in self._orders.values()
if order['customer_id'] == query.customer_id
]
# ๐ฎ Let's use CQRS!
command_handler = CommandHandler()
query_handler = QueryHandler(command_handler.events)
# Create an order
create_cmd = CreateOrderCommand(
customer_id="cust-123",
items=[
{"name": "Python Book", "price": 29.99, "emoji": "๐"},
{"name": "Coffee", "price": 4.99, "emoji": "โ"}
]
)
order_id = command_handler.handle(create_cmd)
# Update order status
update_cmd = UpdateOrderStatusCommand(order_id=order_id, status="shipped")
command_handler.handle(update_cmd)
# Query the order
query_handler = QueryHandler(command_handler.events) # Rebuild read model
order = query_handler.handle(GetOrderByIdQuery(order_id=order_id))
print(f"๐ Order details: {order}")
๐ฏ Try it yourself: Add a cancel order command and implement event replay functionality!
๐ฎ Example 2: Event Sourcing Pattern
Letโs implement event sourcing for game state:
# ๐ Event Sourcing for a game
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Dict, Any
import json
# ๐ฏ Base event class
class Event(ABC):
def __init__(self, aggregate_id: str):
self.aggregate_id = aggregate_id
self.timestamp = datetime.now()
self.version = 1
@abstractmethod
def apply(self, state: Dict[str, Any]) -> Dict[str, Any]:
pass
# ๐ฎ Game events
class GameStartedEvent(Event):
def __init__(self, player_id: str, player_name: str):
super().__init__(player_id)
self.player_name = player_name
self.emoji = "๐ฎ"
def apply(self, state: Dict[str, Any]) -> Dict[str, Any]:
return {
**state,
'player_id': self.aggregate_id,
'player_name': self.player_name,
'score': 0,
'level': 1,
'achievements': ["๐ First Steps"],
'status': 'playing'
}
class PointsEarnedEvent(Event):
def __init__(self, player_id: str, points: int):
super().__init__(player_id)
self.points = points
self.emoji = "โจ"
def apply(self, state: Dict[str, Any]) -> Dict[str, Any]:
new_score = state['score'] + self.points
new_level = (new_score // 100) + 1
new_state = {
**state,
'score': new_score,
'level': new_level
}
# ๐ Level up achievement
if new_level > state['level']:
new_state['achievements'] = state['achievements'] + [f"๐ Level {new_level} Master"]
return new_state
class PowerUpCollectedEvent(Event):
def __init__(self, player_id: str, power_up: str):
super().__init__(player_id)
self.power_up = power_up
self.emoji = "๐ช"
def apply(self, state: Dict[str, Any]) -> Dict[str, Any]:
power_ups = state.get('power_ups', [])
return {
**state,
'power_ups': power_ups + [self.power_up]
}
# ๐ Event store
class EventStore:
def __init__(self):
self._events: Dict[str, List[Event]] = {}
print("๐พ Event store initialized!")
def save_event(self, event: Event):
if event.aggregate_id not in self._events:
self._events[event.aggregate_id] = []
# Set version number
event.version = len(self._events[event.aggregate_id]) + 1
self._events[event.aggregate_id].append(event)
print(f"{getattr(event, 'emoji', '๐')} Event saved: {event.__class__.__name__} v{event.version}")
def get_events(self, aggregate_id: str) -> List[Event]:
return self._events.get(aggregate_id, [])
# ๐ฎ Game aggregate
class GamePlayer:
def __init__(self, event_store: EventStore):
self.event_store = event_store
self._state: Dict[str, Any] = {}
def start_game(self, player_id: str, player_name: str):
event = GameStartedEvent(player_id, player_name)
self.event_store.save_event(event)
print(f"๐ฎ {player_name} started playing!")
def earn_points(self, player_id: str, points: int):
event = PointsEarnedEvent(player_id, points)
self.event_store.save_event(event)
# Get current state to show level up
state = self.get_player_state(player_id)
print(f"โจ Earned {points} points! Score: {state['score']}")
def collect_power_up(self, player_id: str, power_up: str):
event = PowerUpCollectedEvent(player_id, power_up)
self.event_store.save_event(event)
print(f"๐ช Collected power-up: {power_up}")
def get_player_state(self, player_id: str) -> Dict[str, Any]:
# Rebuild state from events
events = self.event_store.get_events(player_id)
state = {}
for event in events:
state = event.apply(state)
return state
def replay_game_history(self, player_id: str):
print(f"\n๐ฌ Replaying game history for player {player_id}:")
events = self.event_store.get_events(player_id)
state = {}
for i, event in enumerate(events):
state = event.apply(state)
print(f" ๐น Event {i+1}: {event.__class__.__name__} - Score: {state.get('score', 0)}")
# ๐ฎ Let's play!
event_store = EventStore()
game = GamePlayer(event_store)
# Start a new game
game.start_game("player-1", "Alice")
# Play the game
game.earn_points("player-1", 50)
game.collect_power_up("player-1", "๐ Speed Boost")
game.earn_points("player-1", 75)
game.collect_power_up("player-1", "๐ก๏ธ Shield")
game.earn_points("player-1", 100)
# Check final state
final_state = game.get_player_state("player-1")
print(f"\n๐ Final state: Level {final_state['level']}, Score: {final_state['score']}")
print(f"๐ Achievements: {', '.join(final_state['achievements'])}")
print(f"๐ช Power-ups: {', '.join(final_state.get('power_ups', []))}")
# Replay history
game.replay_game_history("player-1")
๐ Advanced Concepts
๐งโโ๏ธ Specification Pattern
When youโre ready to level up, try the Specification pattern for complex business rules:
# ๐ฏ Specification pattern for complex business rules
from abc import ABC, abstractmethod
from typing import Any, List
class Specification(ABC):
@abstractmethod
def is_satisfied_by(self, candidate: Any) -> bool:
pass
def and_(self, other: 'Specification') -> 'AndSpecification':
return AndSpecification(self, other)
def or_(self, other: 'Specification') -> 'OrSpecification':
return OrSpecification(self, other)
def not_(self) -> 'NotSpecification':
return NotSpecification(self)
# ๐ Composite specifications
class AndSpecification(Specification):
def __init__(self, left: Specification, right: Specification):
self.left = left
self.right = right
def is_satisfied_by(self, candidate: Any) -> bool:
return self.left.is_satisfied_by(candidate) and self.right.is_satisfied_by(candidate)
class OrSpecification(Specification):
def __init__(self, left: Specification, right: Specification):
self.left = left
self.right = right
def is_satisfied_by(self, candidate: Any) -> bool:
return self.left.is_satisfied_by(candidate) or self.right.is_satisfied_by(candidate)
class NotSpecification(Specification):
def __init__(self, spec: Specification):
self.spec = spec
def is_satisfied_by(self, candidate: Any) -> bool:
return not self.spec.is_satisfied_by(candidate)
# ๐๏ธ Business rule specifications
class PremiumCustomerSpec(Specification):
def is_satisfied_by(self, customer: Dict[str, Any]) -> bool:
return customer.get('tier') == 'premium'
class HighValueOrderSpec(Specification):
def __init__(self, threshold: float):
self.threshold = threshold
def is_satisfied_by(self, order: Dict[str, Any]) -> bool:
return order.get('total', 0) >= self.threshold
class RecentCustomerSpec(Specification):
def __init__(self, days: int):
self.days = days
def is_satisfied_by(self, customer: Dict[str, Any]) -> bool:
# Simplified: check if registered within days
return customer.get('days_since_registration', float('inf')) <= self.days
# ๐จ Using specifications
premium_spec = PremiumCustomerSpec()
high_value_spec = HighValueOrderSpec(100.0)
recent_spec = RecentCustomerSpec(30)
# Complex business rule: Premium OR (High value AND Recent)
eligible_for_discount = premium_spec.or_(high_value_spec.and_(recent_spec))
# Test customers
customers = [
{"name": "Alice", "tier": "premium", "order_total": 50, "days_since_registration": 60},
{"name": "Bob", "tier": "regular", "order_total": 150, "days_since_registration": 15},
{"name": "Charlie", "tier": "regular", "order_total": 30, "days_since_registration": 5},
]
for customer in customers:
order = {"total": customer["order_total"]}
if eligible_for_discount.is_satisfied_by(customer) and high_value_spec.is_satisfied_by(order):
print(f"โ
{customer['name']} is eligible for discount! ๐")
else:
print(f"โ {customer['name']} is not eligible")
๐๏ธ Saga Pattern for Distributed Transactions
For the brave developers - implement distributed transactions:
# ๐ Saga pattern for distributed transactions
from enum import Enum
from typing import List, Callable, Any, Optional
import uuid
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class SagaStep:
def __init__(self, name: str, action: Callable, compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation
self.status = SagaStatus.PENDING
self.result = None
self.error = None
self.emoji = "๐"
def execute(self) -> bool:
try:
print(f"โถ๏ธ Executing: {self.name}")
self.status = SagaStatus.RUNNING
self.result = self.action()
self.status = SagaStatus.COMPLETED
print(f"โ
Completed: {self.name}")
return True
except Exception as e:
self.status = SagaStatus.FAILED
self.error = str(e)
print(f"โ Failed: {self.name} - {e}")
return False
def compensate(self) -> bool:
if self.status != SagaStatus.COMPLETED:
return True # Nothing to compensate
try:
print(f"โฉ๏ธ Compensating: {self.name}")
self.status = SagaStatus.COMPENSATING
self.compensation(self.result)
print(f"โ
Compensated: {self.name}")
return True
except Exception as e:
print(f"โ ๏ธ Compensation failed: {self.name} - {e}")
return False
class Saga:
def __init__(self, name: str):
self.id = str(uuid.uuid4())
self.name = name
self.steps: List[SagaStep] = []
self.status = SagaStatus.PENDING
print(f"๐ Saga '{name}' created with ID: {self.id}")
def add_step(self, step: SagaStep):
self.steps.append(step)
print(f"โ Added step: {step.name}")
def execute(self):
print(f"\n๐ Starting saga: {self.name}")
self.status = SagaStatus.RUNNING
completed_steps = []
for step in self.steps:
if step.execute():
completed_steps.append(step)
else:
# Step failed, compensate completed steps
print(f"\nโ ๏ธ Saga failed at step: {step.name}")
self.status = SagaStatus.COMPENSATING
# Compensate in reverse order
for completed_step in reversed(completed_steps):
completed_step.compensate()
self.status = SagaStatus.FAILED
print(f"โ Saga '{self.name}' failed and compensated")
return False
self.status = SagaStatus.COMPLETED
print(f"\n๐ Saga '{self.name}' completed successfully!")
return True
# ๐ Example: Order processing saga
class OrderService:
def __init__(self):
self.orders = {}
def create_order(self, order_id: str, items: List[str]) -> str:
self.orders[order_id] = {"id": order_id, "items": items, "status": "created"}
print(f" ๐ฆ Order {order_id} created")
return order_id
def cancel_order(self, order_id: str):
if order_id in self.orders:
self.orders[order_id]["status"] = "cancelled"
print(f" ๐๏ธ Order {order_id} cancelled")
class PaymentService:
def __init__(self):
self.payments = {}
self.balance = 1000 # Starting balance
def process_payment(self, payment_id: str, amount: float) -> str:
if amount > self.balance:
raise Exception("Insufficient funds! ๐ธ")
self.balance -= amount
self.payments[payment_id] = {"id": payment_id, "amount": amount, "status": "processed"}
print(f" ๐ณ Payment {payment_id} processed: ${amount}")
return payment_id
def refund_payment(self, payment_id: str):
if payment_id in self.payments:
amount = self.payments[payment_id]["amount"]
self.balance += amount
self.payments[payment_id]["status"] = "refunded"
print(f" ๐ฐ Payment {payment_id} refunded: ${amount}")
class InventoryService:
def __init__(self):
self.inventory = {"item1": 10, "item2": 5, "item3": 0}
def reserve_items(self, reservation_id: str, items: List[str]) -> str:
for item in items:
if self.inventory.get(item, 0) <= 0:
raise Exception(f"Item {item} out of stock! ๐ฆ")
self.inventory[item] -= 1
print(f" ๐ Items reserved: {items}")
return reservation_id
def release_items(self, reservation_id: str):
# Simplified: just add items back
print(f" ๐ค Items released for reservation {reservation_id}")
# ๐ฎ Create and execute saga
order_service = OrderService()
payment_service = PaymentService()
inventory_service = InventoryService()
# Create order processing saga
saga = Saga("Process Online Order")
# Add saga steps
saga.add_step(SagaStep(
"Create Order",
lambda: order_service.create_order("order-123", ["item1", "item2"]),
lambda order_id: order_service.cancel_order(order_id)
))
saga.add_step(SagaStep(
"Reserve Inventory",
lambda: inventory_service.reserve_items("reservation-123", ["item1", "item2"]),
lambda reservation_id: inventory_service.release_items(reservation_id)
))
saga.add_step(SagaStep(
"Process Payment",
lambda: payment_service.process_payment("payment-123", 99.99),
lambda payment_id: payment_service.refund_payment(payment_id)
))
# Execute the saga
saga.execute()
# Try a failing saga
print("\n" + "="*50 + "\n")
failing_saga = Saga("Process Large Order")
failing_saga.add_step(SagaStep(
"Create Large Order",
lambda: order_service.create_order("order-456", ["item3"]), # Out of stock!
lambda order_id: order_service.cancel_order(order_id)
))
failing_saga.add_step(SagaStep(
"Reserve Out-of-Stock Items",
lambda: inventory_service.reserve_items("reservation-456", ["item3"]), # Will fail!
lambda reservation_id: inventory_service.release_items(reservation_id)
))
failing_saga.add_step(SagaStep(
"Process Large Payment",
lambda: payment_service.process_payment("payment-456", 5000), # Too expensive!
lambda payment_id: payment_service.refund_payment(payment_id)
))
failing_saga.execute()
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Over-Engineering
# โ Wrong way - too complex for simple needs!
class OverEngineeredUserService:
def __init__(self,
repository_factory,
unit_of_work_factory,
event_store,
command_bus,
query_bus,
saga_orchestrator,
specification_engine):
# ๐ฐ This is overkill for basic CRUD!
pass
# โ
Correct way - start simple!
class SimpleUserService:
def __init__(self, db_connection):
self.db = db_connection
print("โจ Simple and effective!")
def create_user(self, name: str, email: str):
# Direct and clean
return self.db.insert("users", {"name": name, "email": email})
๐คฏ Pitfall 2: Ignoring Transaction Boundaries
# โ Dangerous - no transaction management!
def transfer_money_unsafe(from_account: str, to_account: str, amount: float):
# What if this fails after debit? ๐ฅ
debit_account(from_account, amount)
credit_account(to_account, amount) # System crash here = money lost!
# โ
Safe - use Unit of Work or Saga!
def transfer_money_safe(uow: UnitOfWork, from_account: str, to_account: str, amount: float):
try:
uow.accounts.debit(from_account, amount)
uow.accounts.credit(to_account, amount)
uow.commit() # โ
All or nothing!
print("๐ฐ Transfer successful!")
except Exception as e:
uow.rollback() # ๐ก๏ธ Safe rollback
print(f"โ ๏ธ Transfer failed: {e}")
๐ ๏ธ Best Practices
- ๐ฏ Choose Wisely: Not every problem needs an advanced pattern!
- ๐ Document Intent: Explain WHY you chose a pattern
- ๐งช Test Thoroughly: Advanced patterns need comprehensive tests
- ๐จ Keep It Pythonic: Donโt fight the language
- โจ Iterate: Start simple, evolve as needed
- ๐ Monitor: Track pattern effectiveness in production
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Microservices Communication System
Create a complete system using multiple advanced patterns:
๐ Requirements:
- โ Implement message bus for service communication
- ๐ท๏ธ Use Repository pattern for data access
- ๐ค Apply CQRS for read/write separation
- ๐ Add event sourcing for audit trail
- ๐จ Include saga for distributed transactions
- ๐ก๏ธ Use specification pattern for business rules
๐ Bonus Points:
- Add circuit breaker pattern for resilience
- Implement bulkhead pattern for isolation
- Create a service mesh simulation
๐ก Solution
๐ Click to see solution
# ๐ฏ Microservices communication system!
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Callable, Any, Optional
from enum import Enum
import uuid
import time
# ๐จ Message bus for inter-service communication
class Message(ABC):
def __init__(self):
self.id = str(uuid.uuid4())
self.timestamp = datetime.now()
self.emoji = "๐จ"
@dataclass
class OrderPlacedMessage(Message):
order_id: str
customer_id: str
total: float
def __init__(self, order_id: str, customer_id: str, total: float):
super().__init__()
self.order_id = order_id
self.customer_id = customer_id
self.total = total
self.emoji = "๐"
class MessageBus:
def __init__(self):
self._handlers: Dict[type, List[Callable]] = {}
print("๐ Message bus initialized!")
def subscribe(self, message_type: type, handler: Callable):
if message_type not in self._handlers:
self._handlers[message_type] = []
self._handlers[message_type].append(handler)
print(f"๐ Subscribed to {message_type.__name__}")
def publish(self, message: Message):
message_type = type(message)
handlers = self._handlers.get(message_type, [])
print(f"{message.emoji} Publishing {message_type.__name__}")
for handler in handlers:
try:
handler(message)
except Exception as e:
print(f"โ ๏ธ Handler error: {e}")
# ๐๏ธ Service base with repository and CQRS
class ServiceBase:
def __init__(self, name: str, message_bus: MessageBus):
self.name = name
self.message_bus = message_bus
self.event_store = []
self.read_model = {}
print(f"๐ข {name} service initialized!")
def record_event(self, event: Dict[str, Any]):
event['service'] = self.name
event['timestamp'] = datetime.now()
self.event_store.append(event)
self._update_read_model(event)
def _update_read_model(self, event: Dict[str, Any]):
# Override in subclasses
pass
# ๐๏ธ Order service
class OrderService(ServiceBase):
def __init__(self, message_bus: MessageBus):
super().__init__("OrderService", message_bus)
self.orders = {} # Repository
def place_order(self, customer_id: str, items: List[Dict[str, Any]]) -> str:
order_id = f"order-{uuid.uuid4().hex[:8]}"
total = sum(item['price'] * item['quantity'] for item in items)
# Command: Create order
order = {
'id': order_id,
'customer_id': customer_id,
'items': items,
'total': total,
'status': 'pending'
}
self.orders[order_id] = order
# Record event
self.record_event({
'type': 'OrderPlaced',
'order': order
})
# Publish message
self.message_bus.publish(
OrderPlacedMessage(order_id, customer_id, total)
)
print(f"๐ฆ Order {order_id} placed! Total: ${total}")
return order_id
def _update_read_model(self, event: Dict[str, Any]):
if event['type'] == 'OrderPlaced':
order = event['order']
self.read_model[order['id']] = {
'id': order['id'],
'customer_id': order['customer_id'],
'total': order['total'],
'item_count': len(order['items'])
}
# ๐ณ Payment service with saga
class PaymentService(ServiceBase):
def __init__(self, message_bus: MessageBus):
super().__init__("PaymentService", message_bus)
self.payments = {}
self.customer_balances = {
'cust-123': 500.0,
'cust-456': 100.0
}
# Subscribe to order events
message_bus.subscribe(OrderPlacedMessage, self.handle_order_placed)
def handle_order_placed(self, message: OrderPlacedMessage):
print(f"๐ณ Processing payment for order {message.order_id}")
# Check if customer can afford it
balance = self.customer_balances.get(message.customer_id, 0)
if balance >= message.total:
# Process payment
payment_id = f"pay-{uuid.uuid4().hex[:8]}"
self.payments[payment_id] = {
'id': payment_id,
'order_id': message.order_id,
'amount': message.total,
'status': 'completed'
}
self.customer_balances[message.customer_id] -= message.total
self.record_event({
'type': 'PaymentCompleted',
'payment_id': payment_id,
'order_id': message.order_id,
'amount': message.total
})
print(f"โ
Payment {payment_id} completed!")
else:
print(f"โ Insufficient balance for customer {message.customer_id}")
self.record_event({
'type': 'PaymentFailed',
'order_id': message.order_id,
'reason': 'Insufficient balance'
})
# ๐ Query service for reporting
class QueryService:
def __init__(self, services: List[ServiceBase]):
self.services = services
print("๐ Query service ready!")
def get_customer_orders(self, customer_id: str) -> List[Dict[str, Any]]:
orders = []
for service in self.services:
if isinstance(service, OrderService):
for order_id, order_data in service.read_model.items():
if order_data['customer_id'] == customer_id:
orders.append(order_data)
return orders
def get_service_events(self, service_name: str) -> List[Dict[str, Any]]:
for service in self.services:
if service.name == service_name:
return service.event_store
return []
# ๐ก๏ธ Specification for business rules
class CustomerCreditSpec(Specification):
def __init__(self, min_balance: float):
self.min_balance = min_balance
def is_satisfied_by(self, customer: Dict[str, Any]) -> bool:
return customer.get('balance', 0) >= self.min_balance
# ๐ฎ Run the microservices system!
message_bus = MessageBus()
order_service = OrderService(message_bus)
payment_service = PaymentService(message_bus)
query_service = QueryService([order_service, payment_service])
# Place some orders
print("\n๐ Starting microservices demo...\n")
# Customer with sufficient balance
order1 = order_service.place_order('cust-123', [
{'name': 'Microservices Book', 'price': 49.99, 'quantity': 1, 'emoji': '๐'},
{'name': 'Coffee', 'price': 4.99, 'quantity': 2, 'emoji': 'โ'}
])
time.sleep(0.1) # Simulate async processing
# Customer with insufficient balance
order2 = order_service.place_order('cust-456', [
{'name': 'Cloud Architecture Course', 'price': 299.99, 'quantity': 1, 'emoji': 'โ๏ธ'}
])
# Query customer orders
print(f"\n๐ Customer cust-123 orders:")
for order in query_service.get_customer_orders('cust-123'):
print(f" ๐ฆ Order {order['id']}: ${order['total']} ({order['item_count']} items)")
# Show event sourcing
print(f"\n๐ฌ Payment service events:")
for event in query_service.get_service_events("PaymentService"):
print(f" ๐ {event['type']} at {event['timestamp'].strftime('%H:%M:%S')}")
๐ Key Takeaways
Youโve mastered advanced design patterns! Hereโs what you can now do:
- โ Implement Repository pattern for clean data access ๐ช
- โ Apply CQRS to separate read and write concerns ๐ก๏ธ
- โ Use Event Sourcing for complete audit trails ๐ฏ
- โ Build Sagas for distributed transactions ๐
- โ Create flexible systems with advanced patterns! ๐
Remember: Advanced patterns are powerful tools, but use them wisely! Start simple and evolve your architecture as needed. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered advanced design patterns!
Hereโs what to do next:
- ๐ป Practice implementing these patterns in your projects
- ๐๏ธ Build a small microservices system using CQRS and Event Sourcing
- ๐ Move on to our next tutorial: Parallel Programming Foundations
- ๐ Share your architectural insights with the community!
Remember: Every architect started with simple patterns. Keep building, keep learning, and most importantly, design systems that solve real problems! ๐
Happy architecting! ๐๐โจ