Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on CQRS (Command Query Responsibility Segregation)! ๐ In this guide, weโll explore how to separate read and write operations to build more scalable and maintainable Python applications.
Youโll discover how CQRS can transform your Python development experience. Whether youโre building microservices ๐, event-driven systems ๐ฅ๏ธ, or complex domain models ๐, understanding CQRS is essential for writing robust, maintainable code.
By the end of this tutorial, youโll feel confident using CQRS patterns in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding CQRS
๐ค What is CQRS?
CQRS is like having two separate doors in a building ๐ข - one for entering (commands) and one for exiting (queries). Think of it as splitting your applicationโs brain into two specialized parts: one optimized for reading data and another for writing data.
In Python terms, CQRS means separating your code that changes state (commands) from code that reads state (queries). This means you can:
- โจ Optimize read and write operations independently
- ๐ Scale read-heavy and write-heavy parts separately
- ๐ก๏ธ Apply different security rules to commands and queries
๐ก Why Use CQRS?
Hereโs why developers love CQRS:
- Performance Optimization ๐: Different data models for reads and writes
- Scalability ๐ป: Scale read and write sides independently
- Security ๐ก๏ธ: Apply different security policies to commands and queries
- Flexibility ๐ง: Use different storage technologies for each side
Real-world example: Imagine building an e-commerce platform ๐. With CQRS, you can have a fast, denormalized read model for product browsing while keeping a normalized write model for order processing.
๐ง Basic Syntax and Usage
๐ Simple Example
Letโs start with a friendly example:
# ๐ Hello, CQRS!
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import uuid
# ๐จ Creating simple command and query base classes
class Command(ABC):
"""Base class for all commands"""
pass
class Query(ABC):
"""Base class for all queries"""
pass
# ๐ Let's build a simple shopping cart system!
@dataclass
class AddItemToCartCommand(Command):
cart_id: str # ๐ Cart identifier
product_id: str # ๐ฆ Product to add
quantity: int # ๐ข How many to add
@dataclass
class GetCartQuery(Query):
cart_id: str # ๐ Which cart to retrieve
๐ก Explanation: Notice how we separate commands (that change state) from queries (that read state)! This separation is the heart of CQRS.
๐ฏ Common Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Command Handler
class CommandHandler(ABC):
@abstractmethod
def handle(self, command: Command) -> None:
pass
# ๐จ Pattern 2: Query Handler
class QueryHandler(ABC):
@abstractmethod
def handle(self, query: Query):
pass
# ๐ Pattern 3: Command Bus
class CommandBus:
def __init__(self):
self.handlers = {} # ๐ Store command handlers
def register(self, command_type, handler):
self.handlers[command_type] = handler
print(f"โ
Registered handler for {command_type.__name__}")
def dispatch(self, command: Command):
handler = self.handlers.get(type(command))
if handler:
handler.handle(command)
print(f"๐ Executed {type(command).__name__}")
๐ก Practical Examples
๐ Example 1: E-Commerce Order System
Letโs build something real:
# ๐๏ธ Domain models
@dataclass
class Product:
id: str
name: str
price: float
emoji: str # Every product needs an emoji!
@dataclass
class Order:
id: str
customer_id: str
items: List[dict]
total: float
status: str
# ๐ Commands for write operations
@dataclass
class CreateOrderCommand(Command):
customer_id: str
items: List[dict]
@dataclass
class CancelOrderCommand(Command):
order_id: str
reason: str
# ๐ Queries for read operations
@dataclass
class GetOrderByIdQuery(Query):
order_id: str
@dataclass
class GetCustomerOrdersQuery(Query):
customer_id: str
status: Optional[str] = None
# ๐๏ธ Write Model (Command Side)
class OrderCommandHandler(CommandHandler):
def __init__(self):
self.orders = {} # ๐พ In-memory storage (use DB in production!)
def handle(self, command: Command):
if isinstance(command, CreateOrderCommand):
# โจ Create new order
order_id = str(uuid.uuid4())
total = sum(item['price'] * item['quantity'] for item in command.items)
order = Order(
id=order_id,
customer_id=command.customer_id,
items=command.items,
total=total,
status="pending"
)
self.orders[order_id] = order
print(f"๐ Order {order_id} created! Total: ${total}")
elif isinstance(command, CancelOrderCommand):
# ๐ซ Cancel existing order
if command.order_id in self.orders:
self.orders[command.order_id].status = "cancelled"
print(f"โ Order {command.order_id} cancelled: {command.reason}")
# ๐ Read Model (Query Side)
class OrderQueryHandler(QueryHandler):
def __init__(self, orders):
self.orders = orders # ๐ Reference to order data
def handle(self, query: Query):
if isinstance(query, GetOrderByIdQuery):
# ๐ Find single order
order = self.orders.get(query.order_id)
if order:
print(f"๐ฆ Found order: {order.id} - ${order.total}")
return order
else:
print(f"โ Order {query.order_id} not found")
return None
elif isinstance(query, GetCustomerOrdersQuery):
# ๐ Find customer's orders
customer_orders = [
order for order in self.orders.values()
if order.customer_id == query.customer_id
and (not query.status or order.status == query.status)
]
print(f"๐ Found {len(customer_orders)} orders for customer")
return customer_orders
# ๐ฎ Let's use it!
# Initialize handlers
orders_storage = {}
command_handler = OrderCommandHandler()
command_handler.orders = orders_storage
query_handler = OrderQueryHandler(orders_storage)
# Create an order
create_cmd = CreateOrderCommand(
customer_id="customer-123",
items=[
{"name": "Python Book", "price": 29.99, "quantity": 1, "emoji": "๐"},
{"name": "Coffee", "price": 4.99, "quantity": 2, "emoji": "โ"}
]
)
command_handler.handle(create_cmd)
# Query the order
query = GetCustomerOrdersQuery(customer_id="customer-123")
customer_orders = query_handler.handle(query)
๐ฏ Try it yourself: Add an UpdateOrderCommand
to modify order items and a GetOrdersByDateRangeQuery
for reporting!
๐ฎ Example 2: Real-time Analytics System
Letโs make it fun with event tracking:
# ๐ Event tracking system with CQRS
from datetime import datetime
from collections import defaultdict
# ๐ Events (Commands)
@dataclass
class TrackEventCommand(Command):
user_id: str
event_type: str
properties: dict
timestamp: datetime = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now()
@dataclass
class UpdateUserProfileCommand(Command):
user_id: str
attributes: dict
# ๐ Analytics Queries
@dataclass
class GetUserEventsQuery(Query):
user_id: str
event_type: Optional[str] = None
limit: int = 100
@dataclass
class GetEventStatsQuery(Query):
event_type: str
time_range: Optional[tuple] = None
# ๐ Write Model - Optimized for fast writes
class EventCommandHandler(CommandHandler):
def __init__(self):
self.events = [] # ๐ Append-only event log
self.user_profiles = {} # ๐ค User data
def handle(self, command: Command):
if isinstance(command, TrackEventCommand):
# โก Fast event tracking
event = {
"id": str(uuid.uuid4()),
"user_id": command.user_id,
"type": command.event_type,
"properties": command.properties,
"timestamp": command.timestamp,
"emoji": self._get_event_emoji(command.event_type)
}
self.events.append(event)
print(f"{event['emoji']} Tracked: {command.event_type} for user {command.user_id}")
elif isinstance(command, UpdateUserProfileCommand):
# ๐ค Update user profile
if command.user_id not in self.user_profiles:
self.user_profiles[command.user_id] = {}
self.user_profiles[command.user_id].update(command.attributes)
print(f"โจ Updated profile for user {command.user_id}")
def _get_event_emoji(self, event_type):
emoji_map = {
"page_view": "๐",
"button_click": "๐ฑ๏ธ",
"purchase": "๐ฐ",
"signup": "๐",
"login": "๐",
"logout": "๐",
"share": "๐ค",
"like": "โค๏ธ"
}
return emoji_map.get(event_type, "๐")
# ๐ Read Model - Optimized for analytics
class EventQueryHandler(QueryHandler):
def __init__(self, events, user_profiles):
self.events = events
self.user_profiles = user_profiles
self._build_indexes() # ๐๏ธ Pre-compute for fast queries
def _build_indexes(self):
# ๐ Build indexes for fast lookups
self.events_by_user = defaultdict(list)
self.events_by_type = defaultdict(list)
for event in self.events:
self.events_by_user[event["user_id"]].append(event)
self.events_by_type[event["type"]].append(event)
def handle(self, query: Query):
if isinstance(query, GetUserEventsQuery):
# ๐ Get user's events
user_events = self.events_by_user.get(query.user_id, [])
if query.event_type:
user_events = [e for e in user_events if e["type"] == query.event_type]
# ๐ Apply limit
user_events = user_events[-query.limit:]
print(f"๐ Found {len(user_events)} events for user {query.user_id}")
return user_events
elif isinstance(query, GetEventStatsQuery):
# ๐ Calculate event statistics
events = self.events_by_type.get(query.event_type, [])
if query.time_range:
start, end = query.time_range
events = [e for e in events if start <= e["timestamp"] <= end]
stats = {
"event_type": query.event_type,
"total_count": len(events),
"unique_users": len(set(e["user_id"] for e in events)),
"emoji": self._get_emoji_for_type(query.event_type)
}
print(f"{stats['emoji']} Stats: {stats['total_count']} {query.event_type} events")
return stats
def _get_emoji_for_type(self, event_type):
return {"page_view": "๐", "purchase": "๐ฐ"}.get(event_type, "๐")
# ๐ฎ Demo time!
events_storage = []
profiles_storage = {}
event_cmd_handler = EventCommandHandler()
event_cmd_handler.events = events_storage
event_cmd_handler.user_profiles = profiles_storage
event_query_handler = EventQueryHandler(events_storage, profiles_storage)
# Track some events
event_cmd_handler.handle(TrackEventCommand(
user_id="user-456",
event_type="page_view",
properties={"page": "/products", "device": "mobile"}
))
event_cmd_handler.handle(TrackEventCommand(
user_id="user-456",
event_type="purchase",
properties={"product_id": "python-book", "amount": 29.99}
))
# Query user events
user_events = event_query_handler.handle(
GetUserEventsQuery(user_id="user-456")
)
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Event Sourcing with CQRS
When youโre ready to level up, try combining CQRS with Event Sourcing:
# ๐ฏ Advanced: Event Sourcing + CQRS
@dataclass
class Event:
aggregate_id: str
event_type: str
data: dict
timestamp: datetime
version: int
class EventStore:
def __init__(self):
self.events = [] # ๐ Immutable event log
self.snapshots = {} # ๐ธ Performance optimization
def append(self, event: Event):
self.events.append(event)
print(f"โจ Stored event: {event.event_type} v{event.version}")
def get_events(self, aggregate_id: str, after_version: int = 0):
return [e for e in self.events
if e.aggregate_id == aggregate_id and e.version > after_version]
# ๐ช Projection builder for read models
class ProjectionBuilder:
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.projections = {}
def build_user_summary_projection(self):
# ๐๏ธ Build denormalized read model from events
user_summaries = {}
for event in self.event_store.events:
if event.event_type == "UserCreated":
user_summaries[event.aggregate_id] = {
"id": event.aggregate_id,
"name": event.data["name"],
"total_purchases": 0,
"last_activity": event.timestamp,
"sparkles": "โจ"
}
elif event.event_type == "PurchaseCompleted":
if event.aggregate_id in user_summaries:
user_summaries[event.aggregate_id]["total_purchases"] += 1
user_summaries[event.aggregate_id]["last_activity"] = event.timestamp
return user_summaries
๐๏ธ Advanced Topic 2: CQRS with Different Databases
For the brave developers - using different databases for commands and queries:
# ๐ Multiple database CQRS pattern
class MultiDatabaseCQRS:
def __init__(self):
# ๐ Write side - PostgreSQL for consistency
self.write_db = PostgreSQLConnection() # ACID compliance
# ๐ Read side - MongoDB for flexible queries
self.read_db = MongoDBConnection() # Fast, denormalized
# ๐ Sync mechanism
self.event_bus = EventBus()
def handle_command(self, command: Command):
# ๐พ Write to primary database
result = self.write_db.execute(command)
# ๐ค Publish event for read model update
event = self._command_to_event(command, result)
self.event_bus.publish(event)
return result
def handle_query(self, query: Query):
# โก Read from optimized read database
return self.read_db.find(query)
def _command_to_event(self, command, result):
# ๐จ Transform command to event
return Event(
type=f"{type(command).__name__}Completed",
data={"command": command, "result": result},
timestamp=datetime.now()
)
# ๐ฏ Async event handler for eventual consistency
class ReadModelUpdater:
async def handle_event(self, event: Event):
# ๐ Update read model asynchronously
if event.type == "OrderCreatedCompleted":
await self._update_order_summaries(event.data)
await self._update_customer_stats(event.data)
print(f"โ
Read models updated for {event.type}")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Synchronization Issues
# โ Wrong way - assuming immediate consistency
class BadCQRS:
def create_order_and_get_it(self, order_data):
# Create order (command)
self.command_handler.handle(CreateOrderCommand(order_data))
# Immediately query - might not be synchronized! ๐ฅ
order = self.query_handler.handle(GetOrderQuery(order_data.id))
return order # Could be None!
# โ
Correct way - handle eventual consistency
class GoodCQRS:
def create_order_and_get_it(self, order_data):
# Create order and return command result
order_id = self.command_handler.handle(CreateOrderCommand(order_data))
# Return the ID, let client poll or use events
return {
"order_id": order_id,
"status": "processing",
"message": "Order is being processed! โณ"
}
def wait_for_order(self, order_id, timeout=5):
# ๐ Poll with timeout for eventual consistency
import time
start = time.time()
while time.time() - start < timeout:
order = self.query_handler.handle(GetOrderQuery(order_id))
if order:
print(f"โ
Order {order_id} is ready!")
return order
time.sleep(0.1)
print(f"โฑ๏ธ Order {order_id} is still processing...")
return None
๐คฏ Pitfall 2: Over-engineering Simple Apps
# โ Overkill - CQRS for a simple todo app
class OverEngineeredTodo:
def __init__(self):
self.command_bus = CommandBus()
self.query_bus = QueryBus()
self.event_store = EventStore()
self.projection_builder = ProjectionBuilder()
# ๐ฐ Too much for a simple todo list!
# โ
Better - Use CQRS only when needed
class SimpleApp:
def should_use_cqrs(self):
criteria = {
"different_read_write_models": False,
"high_read_write_ratio": False,
"complex_queries": False,
"multiple_teams": False,
"event_sourcing": False
}
# ๐ฏ Use CQRS only if multiple criteria are true
if sum(criteria.values()) >= 3:
print("โ
CQRS makes sense for this app!")
return True
else:
print("๐ค Consider simpler architecture first")
return False
๐ ๏ธ Best Practices
- ๐ฏ Start Simple: Donโt use CQRS everywhere - only where it adds value!
- ๐ Clear Boundaries: Keep commands and queries completely separate
- ๐ก๏ธ Validate Commands: Always validate before changing state
- ๐จ Design for Eventually Consistent: Donโt assume immediate consistency
- โจ Use Domain Events: Communicate between command and query sides
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Banking System with CQRS
Create a banking system with separate command and query models:
๐ Requirements:
- โ Commands: OpenAccount, Deposit, Withdraw, Transfer
- ๐ท๏ธ Queries: GetBalance, GetTransactionHistory, GetAccountSummary
- ๐ค Support multiple account types (checking, savings)
- ๐ Track all transactions with timestamps
- ๐จ Each transaction type needs an emoji!
๐ Bonus Points:
- Add fraud detection on the command side
- Implement account statistics on the query side
- Create monthly statement generation
๐ก Solution
๐ Click to see solution
# ๐ฏ Banking system with CQRS!
from decimal import Decimal
from enum import Enum
import json
class AccountType(Enum):
CHECKING = "checking"
SAVINGS = "savings"
# ๐ฐ Commands
@dataclass
class OpenAccountCommand(Command):
customer_id: str
account_type: AccountType
initial_deposit: Decimal
@dataclass
class DepositCommand(Command):
account_id: str
amount: Decimal
description: str = "Deposit"
@dataclass
class WithdrawCommand(Command):
account_id: str
amount: Decimal
description: str = "Withdrawal"
@dataclass
class TransferCommand(Command):
from_account_id: str
to_account_id: str
amount: Decimal
description: str = "Transfer"
# ๐ Queries
@dataclass
class GetBalanceQuery(Query):
account_id: str
@dataclass
class GetTransactionHistoryQuery(Query):
account_id: str
limit: int = 50
@dataclass
class GetAccountSummaryQuery(Query):
customer_id: str
# ๐ฆ Write Model
class BankingCommandHandler(CommandHandler):
def __init__(self):
self.accounts = {}
self.transactions = []
def handle(self, command: Command):
if isinstance(command, OpenAccountCommand):
account_id = f"ACC-{uuid.uuid4().hex[:8].upper()}"
account = {
"id": account_id,
"customer_id": command.customer_id,
"type": command.account_type.value,
"balance": command.initial_deposit,
"created_at": datetime.now(),
"emoji": "๐ณ" if command.account_type == AccountType.CHECKING else "๐ฆ"
}
self.accounts[account_id] = account
# Record initial deposit
self._record_transaction(
account_id=account_id,
type="deposit",
amount=command.initial_deposit,
description="Initial deposit",
emoji="๐"
)
print(f"{account['emoji']} Opened {command.account_type.value} account: {account_id}")
return account_id
elif isinstance(command, DepositCommand):
if command.account_id in self.accounts:
self.accounts[command.account_id]["balance"] += command.amount
self._record_transaction(
account_id=command.account_id,
type="deposit",
amount=command.amount,
description=command.description,
emoji="๐ต"
)
print(f"๐ต Deposited ${command.amount} to {command.account_id}")
elif isinstance(command, WithdrawCommand):
account = self.accounts.get(command.account_id)
if account and account["balance"] >= command.amount:
account["balance"] -= command.amount
self._record_transaction(
account_id=command.account_id,
type="withdrawal",
amount=-command.amount,
description=command.description,
emoji="๐ธ"
)
print(f"๐ธ Withdrew ${command.amount} from {command.account_id}")
else:
print(f"โ Insufficient funds in {command.account_id}")
raise ValueError("Insufficient funds")
elif isinstance(command, TransferCommand):
# Validate both accounts exist and funds available
from_account = self.accounts.get(command.from_account_id)
to_account = self.accounts.get(command.to_account_id)
if from_account and to_account and from_account["balance"] >= command.amount:
# Execute transfer
from_account["balance"] -= command.amount
to_account["balance"] += command.amount
# Record both transactions
self._record_transaction(
account_id=command.from_account_id,
type="transfer_out",
amount=-command.amount,
description=f"Transfer to {command.to_account_id}",
emoji="โก๏ธ"
)
self._record_transaction(
account_id=command.to_account_id,
type="transfer_in",
amount=command.amount,
description=f"Transfer from {command.from_account_id}",
emoji="โฌ
๏ธ"
)
print(f"๐ธโก๏ธ๐ฐ Transferred ${command.amount}")
def _record_transaction(self, **kwargs):
transaction = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now(),
**kwargs
}
self.transactions.append(transaction)
# ๐ Read Model
class BankingQueryHandler(QueryHandler):
def __init__(self, accounts, transactions):
self.accounts = accounts
self.transactions = transactions
def handle(self, query: Query):
if isinstance(query, GetBalanceQuery):
account = self.accounts.get(query.account_id)
if account:
return {
"account_id": query.account_id,
"balance": float(account["balance"]),
"emoji": account["emoji"]
}
elif isinstance(query, GetTransactionHistoryQuery):
# Get transactions for account
account_transactions = [
t for t in self.transactions
if t["account_id"] == query.account_id
]
# Sort by timestamp and limit
account_transactions.sort(key=lambda x: x["timestamp"], reverse=True)
return account_transactions[:query.limit]
elif isinstance(query, GetAccountSummaryQuery):
# Get all accounts for customer
customer_accounts = [
acc for acc in self.accounts.values()
if acc["customer_id"] == query.customer_id
]
summary = {
"customer_id": query.customer_id,
"accounts": [],
"total_balance": Decimal("0"),
"emoji": "๐"
}
for account in customer_accounts:
account_info = {
"id": account["id"],
"type": account["type"],
"balance": float(account["balance"]),
"emoji": account["emoji"]
}
summary["accounts"].append(account_info)
summary["total_balance"] += account["balance"]
summary["total_balance"] = float(summary["total_balance"])
return summary
# ๐ฎ Test the banking system!
# Initialize
accounts_db = {}
transactions_db = []
command_handler = BankingCommandHandler()
command_handler.accounts = accounts_db
command_handler.transactions = transactions_db
query_handler = BankingQueryHandler(accounts_db, transactions_db)
# Open accounts
checking_id = command_handler.handle(OpenAccountCommand(
customer_id="CUST-001",
account_type=AccountType.CHECKING,
initial_deposit=Decimal("1000")
))
savings_id = command_handler.handle(OpenAccountCommand(
customer_id="CUST-001",
account_type=AccountType.SAVINGS,
initial_deposit=Decimal("5000")
))
# Make transactions
command_handler.handle(DepositCommand(
account_id=checking_id,
amount=Decimal("500"),
description="Paycheck"
))
command_handler.handle(TransferCommand(
from_account_id=checking_id,
to_account_id=savings_id,
amount=Decimal("200")
))
# Query the results
balance = query_handler.handle(GetBalanceQuery(account_id=checking_id))
print(f"\n๐ฐ Checking balance: ${balance['balance']}")
history = query_handler.handle(GetTransactionHistoryQuery(account_id=checking_id))
print(f"\n๐ Transaction history: {len(history)} transactions")
summary = query_handler.handle(GetAccountSummaryQuery(customer_id="CUST-001"))
print(f"\n๐ Total balance across all accounts: ${summary['total_balance']}")
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Implement CQRS patterns with confidence ๐ช
- โ Separate read and write concerns effectively ๐ก๏ธ
- โ Design scalable architectures using CQRS ๐ฏ
- โ Handle eventual consistency like a pro ๐
- โ Build awesome distributed systems with Python! ๐
Remember: CQRS is a powerful pattern, but use it wisely! Itโs here to help you build better, more scalable systems when you need that complexity. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered CQRS patterns!
Hereโs what to do next:
- ๐ป Practice with the banking system exercise above
- ๐๏ธ Try combining CQRS with Event Sourcing
- ๐ Explore how CQRS works with microservices
- ๐ Share your CQRS implementations with others!
Remember: Every distributed systems expert was once a beginner. Keep coding, keep learning, and most importantly, have fun! ๐
Happy coding! ๐๐โจ