+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 434 of 541

๐Ÿš€ CQRS: Command Query Separation

Master CQRS (Command Query Responsibility Segregation) in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on CQRS (Command Query Responsibility Segregation)! ๐ŸŽ‰ In this guide, weโ€™ll explore how to separate read and write operations to build more scalable and maintainable Python applications.

Youโ€™ll discover how CQRS can transform your Python development experience. Whether youโ€™re building microservices ๐ŸŒ, event-driven systems ๐Ÿ–ฅ๏ธ, or complex domain models ๐Ÿ“š, understanding CQRS is essential for writing robust, maintainable code.

By the end of this tutorial, youโ€™ll feel confident using CQRS patterns in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding CQRS

๐Ÿค” What is CQRS?

CQRS is like having two separate doors in a building ๐Ÿข - one for entering (commands) and one for exiting (queries). Think of it as splitting your applicationโ€™s brain into two specialized parts: one optimized for reading data and another for writing data.

In Python terms, CQRS means separating your code that changes state (commands) from code that reads state (queries). This means you can:

  • โœจ Optimize read and write operations independently
  • ๐Ÿš€ Scale read-heavy and write-heavy parts separately
  • ๐Ÿ›ก๏ธ Apply different security rules to commands and queries

๐Ÿ’ก Why Use CQRS?

Hereโ€™s why developers love CQRS:

  1. Performance Optimization ๐Ÿš€: Different data models for reads and writes
  2. Scalability ๐Ÿ’ป: Scale read and write sides independently
  3. Security ๐Ÿ›ก๏ธ: Apply different security policies to commands and queries
  4. Flexibility ๐Ÿ”ง: Use different storage technologies for each side

Real-world example: Imagine building an e-commerce platform ๐Ÿ›’. With CQRS, you can have a fast, denormalized read model for product browsing while keeping a normalized write model for order processing.

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Example

Letโ€™s start with a friendly example:

# ๐Ÿ‘‹ Hello, CQRS!
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import uuid

# ๐ŸŽจ Creating simple command and query base classes
class Command(ABC):
    """Base class for all commands"""
    pass

class Query(ABC):
    """Base class for all queries"""
    pass

# ๐Ÿ›’ Let's build a simple shopping cart system!
@dataclass
class AddItemToCartCommand(Command):
    cart_id: str      # ๐Ÿ›’ Cart identifier
    product_id: str   # ๐Ÿ“ฆ Product to add
    quantity: int     # ๐Ÿ”ข How many to add

@dataclass
class GetCartQuery(Query):
    cart_id: str      # ๐Ÿ›’ Which cart to retrieve

๐Ÿ’ก Explanation: Notice how we separate commands (that change state) from queries (that read state)! This separation is the heart of CQRS.

๐ŸŽฏ Common Patterns

Here are patterns youโ€™ll use daily:

# ๐Ÿ—๏ธ Pattern 1: Command Handler
class CommandHandler(ABC):
    @abstractmethod
    def handle(self, command: Command) -> None:
        pass

# ๐ŸŽจ Pattern 2: Query Handler
class QueryHandler(ABC):
    @abstractmethod
    def handle(self, query: Query):
        pass

# ๐Ÿ”„ Pattern 3: Command Bus
class CommandBus:
    def __init__(self):
        self.handlers = {}  # ๐Ÿ“š Store command handlers
    
    def register(self, command_type, handler):
        self.handlers[command_type] = handler
        print(f"โœ… Registered handler for {command_type.__name__}")
    
    def dispatch(self, command: Command):
        handler = self.handlers.get(type(command))
        if handler:
            handler.handle(command)
            print(f"๐Ÿš€ Executed {type(command).__name__}")

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: E-Commerce Order System

Letโ€™s build something real:

# ๐Ÿ›๏ธ Domain models
@dataclass
class Product:
    id: str
    name: str
    price: float
    emoji: str  # Every product needs an emoji! 

@dataclass
class Order:
    id: str
    customer_id: str
    items: List[dict]
    total: float
    status: str

# ๐Ÿ“ Commands for write operations
@dataclass
class CreateOrderCommand(Command):
    customer_id: str
    items: List[dict]

@dataclass
class CancelOrderCommand(Command):
    order_id: str
    reason: str

# ๐Ÿ” Queries for read operations
@dataclass
class GetOrderByIdQuery(Query):
    order_id: str

@dataclass
class GetCustomerOrdersQuery(Query):
    customer_id: str
    status: Optional[str] = None

# ๐Ÿ—๏ธ Write Model (Command Side)
class OrderCommandHandler(CommandHandler):
    def __init__(self):
        self.orders = {}  # ๐Ÿ’พ In-memory storage (use DB in production!)
        
    def handle(self, command: Command):
        if isinstance(command, CreateOrderCommand):
            # โœจ Create new order
            order_id = str(uuid.uuid4())
            total = sum(item['price'] * item['quantity'] for item in command.items)
            
            order = Order(
                id=order_id,
                customer_id=command.customer_id,
                items=command.items,
                total=total,
                status="pending"
            )
            
            self.orders[order_id] = order
            print(f"๐ŸŽ‰ Order {order_id} created! Total: ${total}")
            
        elif isinstance(command, CancelOrderCommand):
            # ๐Ÿšซ Cancel existing order
            if command.order_id in self.orders:
                self.orders[command.order_id].status = "cancelled"
                print(f"โŒ Order {command.order_id} cancelled: {command.reason}")

# ๐Ÿ“Š Read Model (Query Side)
class OrderQueryHandler(QueryHandler):
    def __init__(self, orders):
        self.orders = orders  # ๐Ÿ“š Reference to order data
        
    def handle(self, query: Query):
        if isinstance(query, GetOrderByIdQuery):
            # ๐Ÿ” Find single order
            order = self.orders.get(query.order_id)
            if order:
                print(f"๐Ÿ“ฆ Found order: {order.id} - ${order.total}")
                return order
            else:
                print(f"โ“ Order {query.order_id} not found")
                return None
                
        elif isinstance(query, GetCustomerOrdersQuery):
            # ๐Ÿ“‹ Find customer's orders
            customer_orders = [
                order for order in self.orders.values()
                if order.customer_id == query.customer_id
                and (not query.status or order.status == query.status)
            ]
            print(f"๐Ÿ›’ Found {len(customer_orders)} orders for customer")
            return customer_orders

# ๐ŸŽฎ Let's use it!
# Initialize handlers
orders_storage = {}
command_handler = OrderCommandHandler()
command_handler.orders = orders_storage
query_handler = OrderQueryHandler(orders_storage)

# Create an order
create_cmd = CreateOrderCommand(
    customer_id="customer-123",
    items=[
        {"name": "Python Book", "price": 29.99, "quantity": 1, "emoji": "๐Ÿ“˜"},
        {"name": "Coffee", "price": 4.99, "quantity": 2, "emoji": "โ˜•"}
    ]
)
command_handler.handle(create_cmd)

# Query the order
query = GetCustomerOrdersQuery(customer_id="customer-123")
customer_orders = query_handler.handle(query)

๐ŸŽฏ Try it yourself: Add an UpdateOrderCommand to modify order items and a GetOrdersByDateRangeQuery for reporting!

๐ŸŽฎ Example 2: Real-time Analytics System

Letโ€™s make it fun with event tracking:

# ๐Ÿ† Event tracking system with CQRS
from datetime import datetime
from collections import defaultdict

# ๐Ÿ“Š Events (Commands)
@dataclass
class TrackEventCommand(Command):
    user_id: str
    event_type: str
    properties: dict
    timestamp: datetime = None
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now()

@dataclass
class UpdateUserProfileCommand(Command):
    user_id: str
    attributes: dict

# ๐Ÿ“ˆ Analytics Queries
@dataclass
class GetUserEventsQuery(Query):
    user_id: str
    event_type: Optional[str] = None
    limit: int = 100

@dataclass
class GetEventStatsQuery(Query):
    event_type: str
    time_range: Optional[tuple] = None

# ๐Ÿš€ Write Model - Optimized for fast writes
class EventCommandHandler(CommandHandler):
    def __init__(self):
        self.events = []  # ๐Ÿ“ Append-only event log
        self.user_profiles = {}  # ๐Ÿ‘ค User data
        
    def handle(self, command: Command):
        if isinstance(command, TrackEventCommand):
            # โšก Fast event tracking
            event = {
                "id": str(uuid.uuid4()),
                "user_id": command.user_id,
                "type": command.event_type,
                "properties": command.properties,
                "timestamp": command.timestamp,
                "emoji": self._get_event_emoji(command.event_type)
            }
            self.events.append(event)
            print(f"{event['emoji']} Tracked: {command.event_type} for user {command.user_id}")
            
        elif isinstance(command, UpdateUserProfileCommand):
            # ๐Ÿ‘ค Update user profile
            if command.user_id not in self.user_profiles:
                self.user_profiles[command.user_id] = {}
            self.user_profiles[command.user_id].update(command.attributes)
            print(f"โœจ Updated profile for user {command.user_id}")
    
    def _get_event_emoji(self, event_type):
        emoji_map = {
            "page_view": "๐Ÿ‘€",
            "button_click": "๐Ÿ–ฑ๏ธ",
            "purchase": "๐Ÿ’ฐ",
            "signup": "๐ŸŽ‰",
            "login": "๐Ÿ”",
            "logout": "๐Ÿ‘‹",
            "share": "๐Ÿ“ค",
            "like": "โค๏ธ"
        }
        return emoji_map.get(event_type, "๐Ÿ“Š")

# ๐Ÿ“Š Read Model - Optimized for analytics
class EventQueryHandler(QueryHandler):
    def __init__(self, events, user_profiles):
        self.events = events
        self.user_profiles = user_profiles
        self._build_indexes()  # ๐Ÿ—๏ธ Pre-compute for fast queries
        
    def _build_indexes(self):
        # ๐Ÿš€ Build indexes for fast lookups
        self.events_by_user = defaultdict(list)
        self.events_by_type = defaultdict(list)
        
        for event in self.events:
            self.events_by_user[event["user_id"]].append(event)
            self.events_by_type[event["type"]].append(event)
    
    def handle(self, query: Query):
        if isinstance(query, GetUserEventsQuery):
            # ๐Ÿ” Get user's events
            user_events = self.events_by_user.get(query.user_id, [])
            
            if query.event_type:
                user_events = [e for e in user_events if e["type"] == query.event_type]
            
            # ๐Ÿ“‰ Apply limit
            user_events = user_events[-query.limit:]
            
            print(f"๐Ÿ“Š Found {len(user_events)} events for user {query.user_id}")
            return user_events
            
        elif isinstance(query, GetEventStatsQuery):
            # ๐Ÿ“ˆ Calculate event statistics
            events = self.events_by_type.get(query.event_type, [])
            
            if query.time_range:
                start, end = query.time_range
                events = [e for e in events if start <= e["timestamp"] <= end]
            
            stats = {
                "event_type": query.event_type,
                "total_count": len(events),
                "unique_users": len(set(e["user_id"] for e in events)),
                "emoji": self._get_emoji_for_type(query.event_type)
            }
            
            print(f"{stats['emoji']} Stats: {stats['total_count']} {query.event_type} events")
            return stats
    
    def _get_emoji_for_type(self, event_type):
        return {"page_view": "๐Ÿ‘€", "purchase": "๐Ÿ’ฐ"}.get(event_type, "๐Ÿ“Š")

# ๐ŸŽฎ Demo time!
events_storage = []
profiles_storage = {}

event_cmd_handler = EventCommandHandler()
event_cmd_handler.events = events_storage
event_cmd_handler.user_profiles = profiles_storage

event_query_handler = EventQueryHandler(events_storage, profiles_storage)

# Track some events
event_cmd_handler.handle(TrackEventCommand(
    user_id="user-456",
    event_type="page_view",
    properties={"page": "/products", "device": "mobile"}
))

event_cmd_handler.handle(TrackEventCommand(
    user_id="user-456", 
    event_type="purchase",
    properties={"product_id": "python-book", "amount": 29.99}
))

# Query user events
user_events = event_query_handler.handle(
    GetUserEventsQuery(user_id="user-456")
)

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Event Sourcing with CQRS

When youโ€™re ready to level up, try combining CQRS with Event Sourcing:

# ๐ŸŽฏ Advanced: Event Sourcing + CQRS
@dataclass
class Event:
    aggregate_id: str
    event_type: str
    data: dict
    timestamp: datetime
    version: int

class EventStore:
    def __init__(self):
        self.events = []  # ๐Ÿ“š Immutable event log
        self.snapshots = {}  # ๐Ÿ“ธ Performance optimization
        
    def append(self, event: Event):
        self.events.append(event)
        print(f"โœจ Stored event: {event.event_type} v{event.version}")
        
    def get_events(self, aggregate_id: str, after_version: int = 0):
        return [e for e in self.events 
                if e.aggregate_id == aggregate_id and e.version > after_version]

# ๐Ÿช„ Projection builder for read models
class ProjectionBuilder:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.projections = {}
        
    def build_user_summary_projection(self):
        # ๐Ÿ—๏ธ Build denormalized read model from events
        user_summaries = {}
        
        for event in self.event_store.events:
            if event.event_type == "UserCreated":
                user_summaries[event.aggregate_id] = {
                    "id": event.aggregate_id,
                    "name": event.data["name"],
                    "total_purchases": 0,
                    "last_activity": event.timestamp,
                    "sparkles": "โœจ"
                }
            elif event.event_type == "PurchaseCompleted":
                if event.aggregate_id in user_summaries:
                    user_summaries[event.aggregate_id]["total_purchases"] += 1
                    user_summaries[event.aggregate_id]["last_activity"] = event.timestamp
                    
        return user_summaries

๐Ÿ—๏ธ Advanced Topic 2: CQRS with Different Databases

For the brave developers - using different databases for commands and queries:

# ๐Ÿš€ Multiple database CQRS pattern
class MultiDatabaseCQRS:
    def __init__(self):
        # ๐Ÿ“ Write side - PostgreSQL for consistency
        self.write_db = PostgreSQLConnection()  # ACID compliance
        
        # ๐Ÿ“Š Read side - MongoDB for flexible queries  
        self.read_db = MongoDBConnection()  # Fast, denormalized
        
        # ๐Ÿ”„ Sync mechanism
        self.event_bus = EventBus()
        
    def handle_command(self, command: Command):
        # ๐Ÿ’พ Write to primary database
        result = self.write_db.execute(command)
        
        # ๐Ÿ“ค Publish event for read model update
        event = self._command_to_event(command, result)
        self.event_bus.publish(event)
        
        return result
        
    def handle_query(self, query: Query):
        # โšก Read from optimized read database
        return self.read_db.find(query)
        
    def _command_to_event(self, command, result):
        # ๐ŸŽจ Transform command to event
        return Event(
            type=f"{type(command).__name__}Completed",
            data={"command": command, "result": result},
            timestamp=datetime.now()
        )

# ๐ŸŽฏ Async event handler for eventual consistency
class ReadModelUpdater:
    async def handle_event(self, event: Event):
        # ๐Ÿ”„ Update read model asynchronously
        if event.type == "OrderCreatedCompleted":
            await self._update_order_summaries(event.data)
            await self._update_customer_stats(event.data)
            print(f"โœ… Read models updated for {event.type}")

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Synchronization Issues

# โŒ Wrong way - assuming immediate consistency
class BadCQRS:
    def create_order_and_get_it(self, order_data):
        # Create order (command)
        self.command_handler.handle(CreateOrderCommand(order_data))
        
        # Immediately query - might not be synchronized! ๐Ÿ’ฅ
        order = self.query_handler.handle(GetOrderQuery(order_data.id))
        return order  # Could be None!

# โœ… Correct way - handle eventual consistency
class GoodCQRS:
    def create_order_and_get_it(self, order_data):
        # Create order and return command result
        order_id = self.command_handler.handle(CreateOrderCommand(order_data))
        
        # Return the ID, let client poll or use events
        return {
            "order_id": order_id,
            "status": "processing",
            "message": "Order is being processed! โณ"
        }
        
    def wait_for_order(self, order_id, timeout=5):
        # ๐Ÿ”„ Poll with timeout for eventual consistency
        import time
        start = time.time()
        
        while time.time() - start < timeout:
            order = self.query_handler.handle(GetOrderQuery(order_id))
            if order:
                print(f"โœ… Order {order_id} is ready!")
                return order
            time.sleep(0.1)
            
        print(f"โฑ๏ธ Order {order_id} is still processing...")
        return None

๐Ÿคฏ Pitfall 2: Over-engineering Simple Apps

# โŒ Overkill - CQRS for a simple todo app
class OverEngineeredTodo:
    def __init__(self):
        self.command_bus = CommandBus()
        self.query_bus = QueryBus()
        self.event_store = EventStore()
        self.projection_builder = ProjectionBuilder()
        # ๐Ÿ˜ฐ Too much for a simple todo list!

# โœ… Better - Use CQRS only when needed
class SimpleApp:
    def should_use_cqrs(self):
        criteria = {
            "different_read_write_models": False,
            "high_read_write_ratio": False,
            "complex_queries": False,
            "multiple_teams": False,
            "event_sourcing": False
        }
        
        # ๐ŸŽฏ Use CQRS only if multiple criteria are true
        if sum(criteria.values()) >= 3:
            print("โœ… CQRS makes sense for this app!")
            return True
        else:
            print("๐Ÿค” Consider simpler architecture first")
            return False

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Start Simple: Donโ€™t use CQRS everywhere - only where it adds value!
  2. ๐Ÿ“ Clear Boundaries: Keep commands and queries completely separate
  3. ๐Ÿ›ก๏ธ Validate Commands: Always validate before changing state
  4. ๐ŸŽจ Design for Eventually Consistent: Donโ€™t assume immediate consistency
  5. โœจ Use Domain Events: Communicate between command and query sides

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Banking System with CQRS

Create a banking system with separate command and query models:

๐Ÿ“‹ Requirements:

  • โœ… Commands: OpenAccount, Deposit, Withdraw, Transfer
  • ๐Ÿท๏ธ Queries: GetBalance, GetTransactionHistory, GetAccountSummary
  • ๐Ÿ‘ค Support multiple account types (checking, savings)
  • ๐Ÿ“… Track all transactions with timestamps
  • ๐ŸŽจ Each transaction type needs an emoji!

๐Ÿš€ Bonus Points:

  • Add fraud detection on the command side
  • Implement account statistics on the query side
  • Create monthly statement generation

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Banking system with CQRS!
from decimal import Decimal
from enum import Enum
import json

class AccountType(Enum):
    CHECKING = "checking"
    SAVINGS = "savings"

# ๐Ÿ’ฐ Commands
@dataclass
class OpenAccountCommand(Command):
    customer_id: str
    account_type: AccountType
    initial_deposit: Decimal

@dataclass 
class DepositCommand(Command):
    account_id: str
    amount: Decimal
    description: str = "Deposit"

@dataclass
class WithdrawCommand(Command):
    account_id: str
    amount: Decimal
    description: str = "Withdrawal"

@dataclass
class TransferCommand(Command):
    from_account_id: str
    to_account_id: str
    amount: Decimal
    description: str = "Transfer"

# ๐Ÿ” Queries
@dataclass
class GetBalanceQuery(Query):
    account_id: str

@dataclass
class GetTransactionHistoryQuery(Query):
    account_id: str
    limit: int = 50

@dataclass
class GetAccountSummaryQuery(Query):
    customer_id: str

# ๐Ÿฆ Write Model
class BankingCommandHandler(CommandHandler):
    def __init__(self):
        self.accounts = {}
        self.transactions = []
        
    def handle(self, command: Command):
        if isinstance(command, OpenAccountCommand):
            account_id = f"ACC-{uuid.uuid4().hex[:8].upper()}"
            
            account = {
                "id": account_id,
                "customer_id": command.customer_id,
                "type": command.account_type.value,
                "balance": command.initial_deposit,
                "created_at": datetime.now(),
                "emoji": "๐Ÿ’ณ" if command.account_type == AccountType.CHECKING else "๐Ÿฆ"
            }
            
            self.accounts[account_id] = account
            
            # Record initial deposit
            self._record_transaction(
                account_id=account_id,
                type="deposit",
                amount=command.initial_deposit,
                description="Initial deposit",
                emoji="๐ŸŽ‰"
            )
            
            print(f"{account['emoji']} Opened {command.account_type.value} account: {account_id}")
            return account_id
            
        elif isinstance(command, DepositCommand):
            if command.account_id in self.accounts:
                self.accounts[command.account_id]["balance"] += command.amount
                self._record_transaction(
                    account_id=command.account_id,
                    type="deposit",
                    amount=command.amount,
                    description=command.description,
                    emoji="๐Ÿ’ต"
                )
                print(f"๐Ÿ’ต Deposited ${command.amount} to {command.account_id}")
                
        elif isinstance(command, WithdrawCommand):
            account = self.accounts.get(command.account_id)
            if account and account["balance"] >= command.amount:
                account["balance"] -= command.amount
                self._record_transaction(
                    account_id=command.account_id,
                    type="withdrawal",
                    amount=-command.amount,
                    description=command.description,
                    emoji="๐Ÿ’ธ"
                )
                print(f"๐Ÿ’ธ Withdrew ${command.amount} from {command.account_id}")
            else:
                print(f"โŒ Insufficient funds in {command.account_id}")
                raise ValueError("Insufficient funds")
                
        elif isinstance(command, TransferCommand):
            # Validate both accounts exist and funds available
            from_account = self.accounts.get(command.from_account_id)
            to_account = self.accounts.get(command.to_account_id)
            
            if from_account and to_account and from_account["balance"] >= command.amount:
                # Execute transfer
                from_account["balance"] -= command.amount
                to_account["balance"] += command.amount
                
                # Record both transactions
                self._record_transaction(
                    account_id=command.from_account_id,
                    type="transfer_out",
                    amount=-command.amount,
                    description=f"Transfer to {command.to_account_id}",
                    emoji="โžก๏ธ"
                )
                
                self._record_transaction(
                    account_id=command.to_account_id,
                    type="transfer_in",
                    amount=command.amount,
                    description=f"Transfer from {command.from_account_id}",
                    emoji="โฌ…๏ธ"
                )
                
                print(f"๐Ÿ’ธโžก๏ธ๐Ÿ’ฐ Transferred ${command.amount}")
                
    def _record_transaction(self, **kwargs):
        transaction = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.now(),
            **kwargs
        }
        self.transactions.append(transaction)

# ๐Ÿ“Š Read Model
class BankingQueryHandler(QueryHandler):
    def __init__(self, accounts, transactions):
        self.accounts = accounts
        self.transactions = transactions
        
    def handle(self, query: Query):
        if isinstance(query, GetBalanceQuery):
            account = self.accounts.get(query.account_id)
            if account:
                return {
                    "account_id": query.account_id,
                    "balance": float(account["balance"]),
                    "emoji": account["emoji"]
                }
                
        elif isinstance(query, GetTransactionHistoryQuery):
            # Get transactions for account
            account_transactions = [
                t for t in self.transactions
                if t["account_id"] == query.account_id
            ]
            
            # Sort by timestamp and limit
            account_transactions.sort(key=lambda x: x["timestamp"], reverse=True)
            return account_transactions[:query.limit]
            
        elif isinstance(query, GetAccountSummaryQuery):
            # Get all accounts for customer
            customer_accounts = [
                acc for acc in self.accounts.values()
                if acc["customer_id"] == query.customer_id
            ]
            
            summary = {
                "customer_id": query.customer_id,
                "accounts": [],
                "total_balance": Decimal("0"),
                "emoji": "๐Ÿ“Š"
            }
            
            for account in customer_accounts:
                account_info = {
                    "id": account["id"],
                    "type": account["type"],
                    "balance": float(account["balance"]),
                    "emoji": account["emoji"]
                }
                summary["accounts"].append(account_info)
                summary["total_balance"] += account["balance"]
                
            summary["total_balance"] = float(summary["total_balance"])
            return summary

# ๐ŸŽฎ Test the banking system!
# Initialize
accounts_db = {}
transactions_db = []

command_handler = BankingCommandHandler()
command_handler.accounts = accounts_db
command_handler.transactions = transactions_db

query_handler = BankingQueryHandler(accounts_db, transactions_db)

# Open accounts
checking_id = command_handler.handle(OpenAccountCommand(
    customer_id="CUST-001",
    account_type=AccountType.CHECKING,
    initial_deposit=Decimal("1000")
))

savings_id = command_handler.handle(OpenAccountCommand(
    customer_id="CUST-001", 
    account_type=AccountType.SAVINGS,
    initial_deposit=Decimal("5000")
))

# Make transactions
command_handler.handle(DepositCommand(
    account_id=checking_id,
    amount=Decimal("500"),
    description="Paycheck"
))

command_handler.handle(TransferCommand(
    from_account_id=checking_id,
    to_account_id=savings_id,
    amount=Decimal("200")
))

# Query the results
balance = query_handler.handle(GetBalanceQuery(account_id=checking_id))
print(f"\n๐Ÿ’ฐ Checking balance: ${balance['balance']}")

history = query_handler.handle(GetTransactionHistoryQuery(account_id=checking_id))
print(f"\n๐Ÿ“œ Transaction history: {len(history)} transactions")

summary = query_handler.handle(GetAccountSummaryQuery(customer_id="CUST-001"))
print(f"\n๐Ÿ“Š Total balance across all accounts: ${summary['total_balance']}")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Implement CQRS patterns with confidence ๐Ÿ’ช
  • โœ… Separate read and write concerns effectively ๐Ÿ›ก๏ธ
  • โœ… Design scalable architectures using CQRS ๐ŸŽฏ
  • โœ… Handle eventual consistency like a pro ๐Ÿ›
  • โœ… Build awesome distributed systems with Python! ๐Ÿš€

Remember: CQRS is a powerful pattern, but use it wisely! Itโ€™s here to help you build better, more scalable systems when you need that complexity. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered CQRS patterns!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the banking system exercise above
  2. ๐Ÿ—๏ธ Try combining CQRS with Event Sourcing
  3. ๐Ÿ“š Explore how CQRS works with microservices
  4. ๐ŸŒŸ Share your CQRS implementations with others!

Remember: Every distributed systems expert was once a beginner. Keep coding, keep learning, and most importantly, have fun! ๐Ÿš€


Happy coding! ๐ŸŽ‰๐Ÿš€โœจ