+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 503 of 541

๐Ÿ“˜ Event Sourcing: Database Design

Master event sourcing: database design in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿš€Intermediate
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting journey into Event Sourcing and database design! ๐ŸŽ‰ Have you ever wished you could track every single change in your application, like having a time machine for your data? Thatโ€™s exactly what event sourcing gives you!

In this tutorial, weโ€™ll explore how to design databases that capture every business event as an immutable fact. Whether youโ€™re building financial systems ๐Ÿฆ, e-commerce platforms ๐Ÿ›’, or collaborative tools ๐Ÿ“, event sourcing can revolutionize how you think about data persistence.

By the end of this tutorial, youโ€™ll understand how to implement event sourcing patterns in Python and design databases that tell the complete story of your application! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Event Sourcing

๐Ÿค” What is Event Sourcing?

Event sourcing is like keeping a detailed diary ๐Ÿ“” of everything that happens in your application. Instead of just storing the current state (like a snapshot ๐Ÿ“ธ), you store every event that led to that state.

Think of it like a bank account ๐Ÿฆ: traditional databases would just store your current balance ($500), but event sourcing stores every transaction:

  • โž• Deposited $1000
  • โž– Withdrew $300
  • โž• Interest earned $50
  • โž– Fee charged $250

In Python terms, event sourcing means:

  • โœจ Every state change is an event
  • ๐Ÿš€ Events are immutable (never changed)
  • ๐Ÿ›ก๏ธ Current state is derived from events
  • ๐Ÿ“œ Complete audit trail automatically

๐Ÿ’ก Why Use Event Sourcing?

Hereโ€™s why developers love event sourcing:

  1. Complete History ๐Ÿ“œ: Know exactly what happened and when
  2. Time Travel โฐ: Reconstruct state at any point in time
  3. Audit Trail ๐Ÿ”: Built-in compliance and debugging
  4. Event-Driven Architecture ๐ŸŽฏ: Natural fit for microservices
  5. Temporal Queries ๐Ÿ“Š: โ€œWhat was the state on January 1st?โ€

Real-world example: Imagine an e-commerce system ๐Ÿ›’. With event sourcing, you can track:

  • When items were added/removed from cart
  • Price changes over time
  • Exact sequence of user actions
  • Why an order failed

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Event Store Design

Letโ€™s start with a basic event sourcing implementation:

# ๐Ÿ‘‹ Hello, Event Sourcing!
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any
import json
import sqlite3

# ๐ŸŽจ Define our base event structure
@dataclass
class Event:
    aggregate_id: str    # ๐ŸŽฏ Which entity this event belongs to
    event_type: str      # ๐Ÿ“ What happened
    event_data: Dict     # ๐Ÿ“ฆ Event details
    timestamp: datetime  # โฐ When it happened
    version: int        # ๐Ÿ”ข Event sequence number

# ๐Ÿ—๏ธ Create event store
class EventStore:
    def __init__(self, db_path: str = "events.db"):
        self.conn = sqlite3.connect(db_path)
        self._create_tables()
    
    def _create_tables(self):
        # ๐Ÿ“Š Create events table
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                aggregate_id TEXT NOT NULL,
                event_type TEXT NOT NULL,
                event_data TEXT NOT NULL,
                timestamp TIMESTAMP NOT NULL,
                version INTEGER NOT NULL,
                INDEX idx_aggregate (aggregate_id)
            )
        """)
        self.conn.commit()
    
    # โž• Store an event
    def append_event(self, event: Event):
        self.conn.execute(
            """INSERT INTO events 
               (aggregate_id, event_type, event_data, timestamp, version)
               VALUES (?, ?, ?, ?, ?)""",
            (event.aggregate_id, event.event_type, 
             json.dumps(event.event_data), event.timestamp, event.version)
        )
        self.conn.commit()
        print(f"โœจ Event stored: {event.event_type} for {event.aggregate_id}")

๐Ÿ’ก Explanation: Notice how we never update or delete events - theyโ€™re immutable! Each event has a version number to maintain order.

๐ŸŽฏ Common Event Patterns

Here are patterns youโ€™ll use frequently:

# ๐Ÿ—๏ธ Pattern 1: Domain Events
class AccountCreated(Event):
    def __init__(self, account_id: str, owner: str, initial_balance: float):
        super().__init__(
            aggregate_id=account_id,
            event_type="AccountCreated",
            event_data={
                "owner": owner,
                "initial_balance": initial_balance
            },
            timestamp=datetime.now(),
            version=1
        )

class MoneyDeposited(Event):
    def __init__(self, account_id: str, amount: float, version: int):
        super().__init__(
            aggregate_id=account_id,
            event_type="MoneyDeposited",
            event_data={"amount": amount},
            timestamp=datetime.now(),
            version=version
        )

# ๐ŸŽจ Pattern 2: Event Replay
class EventPlayer:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    # ๐Ÿ”„ Replay events to rebuild state
    def replay_events(self, aggregate_id: str) -> Dict[str, Any]:
        cursor = self.event_store.conn.execute(
            """SELECT * FROM events 
               WHERE aggregate_id = ? 
               ORDER BY version""",
            (aggregate_id,)
        )
        
        state = {}
        for row in cursor:
            event_type = row[2]
            event_data = json.loads(row[3])
            
            # ๐ŸŽฏ Apply event to state
            if event_type == "AccountCreated":
                state = {
                    "balance": event_data["initial_balance"],
                    "owner": event_data["owner"]
                }
            elif event_type == "MoneyDeposited":
                state["balance"] += event_data["amount"]
        
        return state

๐Ÿ’ก Practical Examples

๐Ÿ›’ Example 1: Shopping Cart Event Store

Letโ€™s build a real e-commerce event system:

# ๐Ÿ›๏ธ Shopping cart with event sourcing
from uuid import uuid4
from enum import Enum

class CartEvent(Enum):
    CART_CREATED = "CartCreated"
    ITEM_ADDED = "ItemAdded"
    ITEM_REMOVED = "ItemRemoved"
    CART_CHECKED_OUT = "CartCheckedOut"

class ShoppingCartEventStore:
    def __init__(self):
        self.event_store = EventStore("shopping_carts.db")
        self.carts = {}  # ๐Ÿ“ฆ In-memory cache
    
    # ๐Ÿ›’ Create new cart
    def create_cart(self, user_id: str) -> str:
        cart_id = str(uuid4())
        event = Event(
            aggregate_id=cart_id,
            event_type=CartEvent.CART_CREATED.value,
            event_data={"user_id": user_id},
            timestamp=datetime.now(),
            version=1
        )
        self.event_store.append_event(event)
        
        # ๐ŸŽฏ Update cache
        self.carts[cart_id] = {
            "user_id": user_id,
            "items": {},
            "total": 0.0,
            "status": "active"
        }
        
        print(f"๐Ÿ›’ Created cart {cart_id} for user {user_id}")
        return cart_id
    
    # โž• Add item to cart
    def add_item(self, cart_id: str, product_id: str, 
                 product_name: str, price: float, quantity: int):
        cart = self._get_cart(cart_id)
        version = self._get_next_version(cart_id)
        
        event = Event(
            aggregate_id=cart_id,
            event_type=CartEvent.ITEM_ADDED.value,
            event_data={
                "product_id": product_id,
                "product_name": product_name,
                "price": price,
                "quantity": quantity,
                "emoji": "๐Ÿ›๏ธ"  # Every product needs an emoji!
            },
            timestamp=datetime.now(),
            version=version
        )
        self.event_store.append_event(event)
        
        # ๐Ÿ”„ Update cache
        if product_id in cart["items"]:
            cart["items"][product_id]["quantity"] += quantity
        else:
            cart["items"][product_id] = {
                "name": product_name,
                "price": price,
                "quantity": quantity
            }
        cart["total"] = sum(
            item["price"] * item["quantity"] 
            for item in cart["items"].values()
        )
        
        print(f"โœจ Added {quantity}x {product_name} to cart!")
    
    # ๐Ÿ“œ Get cart history
    def get_cart_history(self, cart_id: str) -> List[Event]:
        cursor = self.event_store.conn.execute(
            """SELECT * FROM events 
               WHERE aggregate_id = ? 
               ORDER BY version""",
            (cart_id,)
        )
        
        events = []
        for row in cursor:
            events.append({
                "type": row[2],
                "data": json.loads(row[3]),
                "timestamp": row[4]
            })
        
        return events
    
    # ๐Ÿ”„ Rebuild cart from events
    def rebuild_cart(self, cart_id: str) -> Dict:
        events = self.get_cart_history(cart_id)
        cart = None
        
        for event in events:
            if event["type"] == CartEvent.CART_CREATED.value:
                cart = {
                    "user_id": event["data"]["user_id"],
                    "items": {},
                    "total": 0.0,
                    "status": "active"
                }
            elif event["type"] == CartEvent.ITEM_ADDED.value:
                # Apply item addition...
                pass
        
        return cart

# ๐ŸŽฎ Let's use it!
cart_store = ShoppingCartEventStore()
cart_id = cart_store.create_cart("user123")
cart_store.add_item(cart_id, "prod1", "Python Book ๐Ÿ“˜", 29.99, 1)
cart_store.add_item(cart_id, "prod2", "Coffee โ˜•", 4.99, 2)

๐ŸŽฏ Try it yourself: Add methods for removing items and checking out!

๐ŸŽฎ Example 2: Game State Event Sourcing

Letโ€™s make a game with complete history tracking:

# ๐Ÿ† Game state with event sourcing
class GameEvents(Enum):
    GAME_STARTED = "GameStarted"
    PLAYER_MOVED = "PlayerMoved"
    ITEM_COLLECTED = "ItemCollected"
    ENEMY_DEFEATED = "EnemyDefeated"
    LEVEL_COMPLETED = "LevelCompleted"

class GameEventStore:
    def __init__(self):
        self.event_store = EventStore("game_states.db")
        self.snapshots = {}  # ๐Ÿ“ธ Performance optimization
    
    # ๐ŸŽฎ Start new game
    def start_game(self, player_name: str) -> str:
        game_id = str(uuid4())
        event = Event(
            aggregate_id=game_id,
            event_type=GameEvents.GAME_STARTED.value,
            event_data={
                "player_name": player_name,
                "starting_position": {"x": 0, "y": 0},
                "health": 100,
                "score": 0
            },
            timestamp=datetime.now(),
            version=1
        )
        self.event_store.append_event(event)
        print(f"๐ŸŽฎ {player_name} started a new game!")
        return game_id
    
    # ๐Ÿƒ Record player movement
    def move_player(self, game_id: str, new_x: int, new_y: int):
        version = self._get_next_version(game_id)
        event = Event(
            aggregate_id=game_id,
            event_type=GameEvents.PLAYER_MOVED.value,
            event_data={"x": new_x, "y": new_y},
            timestamp=datetime.now(),
            version=version
        )
        self.event_store.append_event(event)
        print(f"๐Ÿƒ Player moved to ({new_x}, {new_y})")
    
    # ๐Ÿ’Ž Collect item
    def collect_item(self, game_id: str, item_name: str, points: int):
        version = self._get_next_version(game_id)
        event = Event(
            aggregate_id=game_id,
            event_type=GameEvents.ITEM_COLLECTED.value,
            event_data={
                "item_name": item_name,
                "points": points,
                "emoji": "๐Ÿ’Ž"
            },
            timestamp=datetime.now(),
            version=version
        )
        self.event_store.append_event(event)
        print(f"๐Ÿ’Ž Collected {item_name} for {points} points!")
    
    # ๐Ÿ“Š Get game state at any point
    def get_game_state_at(self, game_id: str, timestamp: datetime) -> Dict:
        cursor = self.event_store.conn.execute(
            """SELECT * FROM events 
               WHERE aggregate_id = ? AND timestamp <= ?
               ORDER BY version""",
            (game_id, timestamp)
        )
        
        state = None
        for row in cursor:
            event_type = row[2]
            event_data = json.loads(row[3])
            
            if event_type == GameEvents.GAME_STARTED.value:
                state = {
                    "player_name": event_data["player_name"],
                    "position": event_data["starting_position"],
                    "health": event_data["health"],
                    "score": event_data["score"],
                    "items": []
                }
            elif event_type == GameEvents.PLAYER_MOVED.value:
                state["position"] = {"x": event_data["x"], "y": event_data["y"]}
            elif event_type == GameEvents.ITEM_COLLECTED.value:
                state["items"].append(event_data["item_name"])
                state["score"] += event_data["points"]
        
        return state
    
    # ๐Ÿ”„ Create snapshot for performance
    def create_snapshot(self, game_id: str):
        current_state = self.replay_game(game_id)
        version = self._get_current_version(game_id)
        
        self.snapshots[game_id] = {
            "state": current_state,
            "version": version,
            "timestamp": datetime.now()
        }
        print(f"๐Ÿ“ธ Snapshot created at version {version}")

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Event Projections

When youโ€™re ready to level up, implement projections:

# ๐ŸŽฏ Advanced event projections
class ProjectionEngine:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.projections = {}
    
    # ๐Ÿ”„ Register projection handler
    def register_projection(self, event_type: str, handler):
        if event_type not in self.projections:
            self.projections[event_type] = []
        self.projections[event_type].append(handler)
    
    # ๐Ÿš€ Process events through projections
    def process_events(self, start_from: int = 0):
        cursor = self.event_store.conn.execute(
            "SELECT * FROM events WHERE id > ? ORDER BY id",
            (start_from,)
        )
        
        for row in cursor:
            event_type = row[2]
            if event_type in self.projections:
                event_data = json.loads(row[3])
                for handler in self.projections[event_type]:
                    handler(event_data)

# ๐Ÿ“Š Example: User activity projection
class UserActivityProjection:
    def __init__(self):
        self.user_actions = {}  # ๐Ÿ“ˆ Track user activity
    
    def handle_cart_created(self, event_data):
        user_id = event_data["user_id"]
        if user_id not in self.user_actions:
            self.user_actions[user_id] = {"carts": 0, "purchases": 0}
        self.user_actions[user_id]["carts"] += 1

๐Ÿ—๏ธ Advanced Topic 2: CQRS with Event Sourcing

Combine Command Query Responsibility Segregation:

# ๐Ÿš€ CQRS + Event Sourcing pattern
class CommandHandler:
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
    
    # ๐Ÿ“ Handle commands
    def handle_create_account(self, account_id: str, owner: str):
        # Validate command
        if self._account_exists(account_id):
            raise ValueError("Account already exists! ๐Ÿ˜ฑ")
        
        # Create event
        event = AccountCreated(account_id, owner, 0.0)
        self.event_store.append_event(event)
        
        # Publish for read model update
        self._publish_event(event)

class QueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db  # ๐Ÿ“Š Optimized for queries
    
    # ๐Ÿ” Fast queries on read model
    def get_account_balance(self, account_id: str) -> float:
        # Query optimized read model, not event store
        result = self.read_db.execute(
            "SELECT balance FROM accounts WHERE id = ?",
            (account_id,)
        ).fetchone()
        return result[0] if result else 0.0

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Mutable Events

# โŒ Wrong way - mutable events are dangerous!
event = Event(...)
event.event_data["amount"] = 100  # ๐Ÿ’ฅ Never modify events!

# โœ… Correct way - events are immutable!
# Create new event instead of modifying
new_event = Event(
    aggregate_id=event.aggregate_id,
    event_type="AmountCorrected",
    event_data={"amount": 100, "reason": "correction"},
    timestamp=datetime.now(),
    version=event.version + 1
)

๐Ÿคฏ Pitfall 2: Loading All Events

# โŒ Dangerous - loading millions of events!
def get_current_state(self, aggregate_id: str):
    all_events = self.load_all_events(aggregate_id)  # ๐Ÿ’ฅ Memory explosion!
    return self.replay_all(all_events)

# โœ… Safe - use snapshots and pagination!
def get_current_state(self, aggregate_id: str):
    # Load from latest snapshot
    snapshot = self.get_latest_snapshot(aggregate_id)
    if snapshot:
        # Only replay events after snapshot
        recent_events = self.load_events_after(
            aggregate_id, 
            snapshot.version
        )
        return self.replay_from_snapshot(snapshot, recent_events)
    else:
        # Paginate if no snapshot
        return self.replay_with_pagination(aggregate_id)

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Events Are Facts: Never delete or modify events - they happened!
  2. ๐Ÿ“ Meaningful Names: Use domain language for event types
  3. ๐Ÿ›ก๏ธ Validate Commands: Check business rules before creating events
  4. ๐Ÿ“ธ Use Snapshots: Create periodic snapshots for performance
  5. โœจ Keep Events Small: Store only what changed, not entire state
  6. ๐Ÿ”„ Version Everything: Track schema versions for event evolution
  7. ๐Ÿ“Š Separate Read/Write: Use CQRS for optimal performance

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Banking Event Store

Create an event-sourced banking system:

๐Ÿ“‹ Requirements:

  • โœ… Account creation with initial deposit
  • ๐Ÿ’ฐ Deposit and withdrawal operations
  • ๐Ÿ”„ Transfer between accounts
  • ๐Ÿ“Š Balance calculation from events
  • ๐Ÿ“œ Transaction history with timestamps
  • ๐ŸŽจ Each transaction type needs an emoji!

๐Ÿš€ Bonus Points:

  • Add interest calculation events
  • Implement account freeze/unfreeze
  • Create monthly statement projections

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Event-sourced banking system!
from decimal import Decimal
from typing import Optional

class BankingEvents(Enum):
    ACCOUNT_OPENED = "AccountOpened"
    MONEY_DEPOSITED = "MoneyDeposited"
    MONEY_WITHDRAWN = "MoneyWithdrawn"
    TRANSFER_INITIATED = "TransferInitiated"
    TRANSFER_COMPLETED = "TransferCompleted"
    INTEREST_APPLIED = "InterestApplied"

class BankEventStore:
    def __init__(self):
        self.event_store = EventStore("bank.db")
        self.accounts = {}  # ๐Ÿ’พ In-memory projection
    
    # ๐Ÿฆ Open new account
    def open_account(self, account_number: str, 
                    owner_name: str, initial_deposit: Decimal) -> None:
        if initial_deposit < 0:
            raise ValueError("Initial deposit must be positive! ๐Ÿ˜…")
        
        event = Event(
            aggregate_id=account_number,
            event_type=BankingEvents.ACCOUNT_OPENED.value,
            event_data={
                "owner_name": owner_name,
                "initial_deposit": str(initial_deposit),
                "emoji": "๐Ÿฆ"
            },
            timestamp=datetime.now(),
            version=1
        )
        self.event_store.append_event(event)
        
        # Update projection
        self.accounts[account_number] = {
            "owner": owner_name,
            "balance": initial_deposit,
            "transactions": 1
        }
        print(f"๐Ÿฆ Account {account_number} opened for {owner_name}!")
    
    # ๐Ÿ’ฐ Deposit money
    def deposit(self, account_number: str, amount: Decimal) -> None:
        if amount <= 0:
            raise ValueError("Deposit amount must be positive! ๐Ÿ’ธ")
        
        version = self._get_next_version(account_number)
        event = Event(
            aggregate_id=account_number,
            event_type=BankingEvents.MONEY_DEPOSITED.value,
            event_data={
                "amount": str(amount),
                "emoji": "๐Ÿ’ฐ"
            },
            timestamp=datetime.now(),
            version=version
        )
        self.event_store.append_event(event)
        
        # Update projection
        self.accounts[account_number]["balance"] += amount
        self.accounts[account_number]["transactions"] += 1
        print(f"๐Ÿ’ฐ Deposited ${amount} successfully!")
    
    # ๐Ÿ’ธ Withdraw money
    def withdraw(self, account_number: str, amount: Decimal) -> None:
        account = self.accounts.get(account_number)
        if not account:
            raise ValueError("Account not found! ๐Ÿ˜ฑ")
        
        if account["balance"] < amount:
            raise ValueError("Insufficient funds! ๐Ÿ˜ข")
        
        version = self._get_next_version(account_number)
        event = Event(
            aggregate_id=account_number,
            event_type=BankingEvents.MONEY_WITHDRAWN.value,
            event_data={
                "amount": str(amount),
                "emoji": "๐Ÿ’ธ"
            },
            timestamp=datetime.now(),
            version=version
        )
        self.event_store.append_event(event)
        
        # Update projection
        account["balance"] -= amount
        account["transactions"] += 1
        print(f"๐Ÿ’ธ Withdrew ${amount} successfully!")
    
    # ๐Ÿ”„ Transfer between accounts
    def transfer(self, from_account: str, to_account: str, 
                amount: Decimal) -> None:
        # Validate
        from_acc = self.accounts.get(from_account)
        to_acc = self.accounts.get(to_account)
        
        if not from_acc or not to_acc:
            raise ValueError("Account not found! ๐Ÿ˜ฑ")
        
        if from_acc["balance"] < amount:
            raise ValueError("Insufficient funds! ๐Ÿ˜ข")
        
        # Create transfer events
        transfer_id = str(uuid4())
        
        # Debit event
        from_version = self._get_next_version(from_account)
        debit_event = Event(
            aggregate_id=from_account,
            event_type=BankingEvents.TRANSFER_INITIATED.value,
            event_data={
                "transfer_id": transfer_id,
                "to_account": to_account,
                "amount": str(amount),
                "emoji": "๐Ÿ”„"
            },
            timestamp=datetime.now(),
            version=from_version
        )
        self.event_store.append_event(debit_event)
        
        # Credit event
        to_version = self._get_next_version(to_account)
        credit_event = Event(
            aggregate_id=to_account,
            event_type=BankingEvents.TRANSFER_COMPLETED.value,
            event_data={
                "transfer_id": transfer_id,
                "from_account": from_account,
                "amount": str(amount),
                "emoji": "โœ…"
            },
            timestamp=datetime.now(),
            version=to_version
        )
        self.event_store.append_event(credit_event)
        
        # Update projections
        from_acc["balance"] -= amount
        to_acc["balance"] += amount
        print(f"๐Ÿ”„ Transferred ${amount} from {from_account} to {to_account}!")
    
    # ๐Ÿ“Š Get account statement
    def get_statement(self, account_number: str) -> Dict:
        events = self._get_account_events(account_number)
        
        statement = {
            "account": account_number,
            "transactions": [],
            "final_balance": Decimal("0")
        }
        
        balance = Decimal("0")
        for event in events:
            event_type = event["type"]
            event_data = event["data"]
            
            transaction = {
                "date": event["timestamp"],
                "type": event_type,
                "emoji": event_data.get("emoji", "๐Ÿ“")
            }
            
            if event_type == BankingEvents.ACCOUNT_OPENED.value:
                amount = Decimal(event_data["initial_deposit"])
                balance += amount
                transaction["description"] = "Account opened"
                transaction["credit"] = amount
                
            elif event_type == BankingEvents.MONEY_DEPOSITED.value:
                amount = Decimal(event_data["amount"])
                balance += amount
                transaction["description"] = "Deposit"
                transaction["credit"] = amount
                
            elif event_type == BankingEvents.MONEY_WITHDRAWN.value:
                amount = Decimal(event_data["amount"])
                balance -= amount
                transaction["description"] = "Withdrawal"
                transaction["debit"] = amount
            
            transaction["balance"] = balance
            statement["transactions"].append(transaction)
        
        statement["final_balance"] = balance
        return statement
    
    def _get_next_version(self, account_number: str) -> int:
        cursor = self.event_store.conn.execute(
            "SELECT MAX(version) FROM events WHERE aggregate_id = ?",
            (account_number,)
        )
        result = cursor.fetchone()
        return (result[0] or 0) + 1

# ๐ŸŽฎ Test it out!
bank = BankEventStore()
bank.open_account("ACC001", "Alice", Decimal("1000"))
bank.deposit("ACC001", Decimal("500"))
bank.open_account("ACC002", "Bob", Decimal("750"))
bank.transfer("ACC001", "ACC002", Decimal("200"))

# ๐Ÿ“Š Print statement
statement = bank.get_statement("ACC001")
print("\n๐Ÿ“Š Account Statement:")
for tx in statement["transactions"]:
    print(f"{tx['emoji']} {tx['description']}: ", end="")
    if 'credit' in tx:
        print(f"+${tx['credit']}", end="")
    elif 'debit' in tx:
        print(f"-${tx['debit']}", end="")
    print(f" | Balance: ${tx['balance']}")

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Design event-sourced databases with confidence ๐Ÿ’ช
  • โœ… Implement immutable event stores in Python ๐Ÿ›ก๏ธ
  • โœ… Build projections and read models for queries ๐ŸŽฏ
  • โœ… Handle temporal queries and time travel ๐Ÿ›
  • โœ… Apply CQRS patterns with event sourcing! ๐Ÿš€

Remember: Event sourcing isnโ€™t just a pattern, itโ€™s a mindset shift. Think in events, not states! ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered event sourcing database design!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the banking exercise above
  2. ๐Ÿ—๏ธ Build an event-sourced application for your domain
  3. ๐Ÿ“š Explore event streaming platforms like Kafka
  4. ๐ŸŒŸ Share your event sourcing journey with others!

Remember: Every expert was once a beginner. Keep coding, keep learning, and most importantly, have fun with events! ๐Ÿš€


Happy event sourcing! ๐ŸŽ‰๐Ÿš€โœจ