Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting journey into Event Sourcing and database design! ๐ Have you ever wished you could track every single change in your application, like having a time machine for your data? Thatโs exactly what event sourcing gives you!
In this tutorial, weโll explore how to design databases that capture every business event as an immutable fact. Whether youโre building financial systems ๐ฆ, e-commerce platforms ๐, or collaborative tools ๐, event sourcing can revolutionize how you think about data persistence.
By the end of this tutorial, youโll understand how to implement event sourcing patterns in Python and design databases that tell the complete story of your application! Letโs dive in! ๐โโ๏ธ
๐ Understanding Event Sourcing
๐ค What is Event Sourcing?
Event sourcing is like keeping a detailed diary ๐ of everything that happens in your application. Instead of just storing the current state (like a snapshot ๐ธ), you store every event that led to that state.
Think of it like a bank account ๐ฆ: traditional databases would just store your current balance ($500), but event sourcing stores every transaction:
- โ Deposited $1000
- โ Withdrew $300
- โ Interest earned $50
- โ Fee charged $250
In Python terms, event sourcing means:
- โจ Every state change is an event
- ๐ Events are immutable (never changed)
- ๐ก๏ธ Current state is derived from events
- ๐ Complete audit trail automatically
๐ก Why Use Event Sourcing?
Hereโs why developers love event sourcing:
- Complete History ๐: Know exactly what happened and when
- Time Travel โฐ: Reconstruct state at any point in time
- Audit Trail ๐: Built-in compliance and debugging
- Event-Driven Architecture ๐ฏ: Natural fit for microservices
- Temporal Queries ๐: โWhat was the state on January 1st?โ
Real-world example: Imagine an e-commerce system ๐. With event sourcing, you can track:
- When items were added/removed from cart
- Price changes over time
- Exact sequence of user actions
- Why an order failed
๐ง Basic Syntax and Usage
๐ Simple Event Store Design
Letโs start with a basic event sourcing implementation:
# ๐ Hello, Event Sourcing!
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any
import json
import sqlite3
# ๐จ Define our base event structure
@dataclass
class Event:
aggregate_id: str # ๐ฏ Which entity this event belongs to
event_type: str # ๐ What happened
event_data: Dict # ๐ฆ Event details
timestamp: datetime # โฐ When it happened
version: int # ๐ข Event sequence number
# ๐๏ธ Create event store
class EventStore:
def __init__(self, db_path: str = "events.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
# ๐ Create events table
self.conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
version INTEGER NOT NULL,
INDEX idx_aggregate (aggregate_id)
)
""")
self.conn.commit()
# โ Store an event
def append_event(self, event: Event):
self.conn.execute(
"""INSERT INTO events
(aggregate_id, event_type, event_data, timestamp, version)
VALUES (?, ?, ?, ?, ?)""",
(event.aggregate_id, event.event_type,
json.dumps(event.event_data), event.timestamp, event.version)
)
self.conn.commit()
print(f"โจ Event stored: {event.event_type} for {event.aggregate_id}")
๐ก Explanation: Notice how we never update or delete events - theyโre immutable! Each event has a version number to maintain order.
๐ฏ Common Event Patterns
Here are patterns youโll use frequently:
# ๐๏ธ Pattern 1: Domain Events
class AccountCreated(Event):
def __init__(self, account_id: str, owner: str, initial_balance: float):
super().__init__(
aggregate_id=account_id,
event_type="AccountCreated",
event_data={
"owner": owner,
"initial_balance": initial_balance
},
timestamp=datetime.now(),
version=1
)
class MoneyDeposited(Event):
def __init__(self, account_id: str, amount: float, version: int):
super().__init__(
aggregate_id=account_id,
event_type="MoneyDeposited",
event_data={"amount": amount},
timestamp=datetime.now(),
version=version
)
# ๐จ Pattern 2: Event Replay
class EventPlayer:
def __init__(self, event_store: EventStore):
self.event_store = event_store
# ๐ Replay events to rebuild state
def replay_events(self, aggregate_id: str) -> Dict[str, Any]:
cursor = self.event_store.conn.execute(
"""SELECT * FROM events
WHERE aggregate_id = ?
ORDER BY version""",
(aggregate_id,)
)
state = {}
for row in cursor:
event_type = row[2]
event_data = json.loads(row[3])
# ๐ฏ Apply event to state
if event_type == "AccountCreated":
state = {
"balance": event_data["initial_balance"],
"owner": event_data["owner"]
}
elif event_type == "MoneyDeposited":
state["balance"] += event_data["amount"]
return state
๐ก Practical Examples
๐ Example 1: Shopping Cart Event Store
Letโs build a real e-commerce event system:
# ๐๏ธ Shopping cart with event sourcing
from uuid import uuid4
from enum import Enum
class CartEvent(Enum):
CART_CREATED = "CartCreated"
ITEM_ADDED = "ItemAdded"
ITEM_REMOVED = "ItemRemoved"
CART_CHECKED_OUT = "CartCheckedOut"
class ShoppingCartEventStore:
def __init__(self):
self.event_store = EventStore("shopping_carts.db")
self.carts = {} # ๐ฆ In-memory cache
# ๐ Create new cart
def create_cart(self, user_id: str) -> str:
cart_id = str(uuid4())
event = Event(
aggregate_id=cart_id,
event_type=CartEvent.CART_CREATED.value,
event_data={"user_id": user_id},
timestamp=datetime.now(),
version=1
)
self.event_store.append_event(event)
# ๐ฏ Update cache
self.carts[cart_id] = {
"user_id": user_id,
"items": {},
"total": 0.0,
"status": "active"
}
print(f"๐ Created cart {cart_id} for user {user_id}")
return cart_id
# โ Add item to cart
def add_item(self, cart_id: str, product_id: str,
product_name: str, price: float, quantity: int):
cart = self._get_cart(cart_id)
version = self._get_next_version(cart_id)
event = Event(
aggregate_id=cart_id,
event_type=CartEvent.ITEM_ADDED.value,
event_data={
"product_id": product_id,
"product_name": product_name,
"price": price,
"quantity": quantity,
"emoji": "๐๏ธ" # Every product needs an emoji!
},
timestamp=datetime.now(),
version=version
)
self.event_store.append_event(event)
# ๐ Update cache
if product_id in cart["items"]:
cart["items"][product_id]["quantity"] += quantity
else:
cart["items"][product_id] = {
"name": product_name,
"price": price,
"quantity": quantity
}
cart["total"] = sum(
item["price"] * item["quantity"]
for item in cart["items"].values()
)
print(f"โจ Added {quantity}x {product_name} to cart!")
# ๐ Get cart history
def get_cart_history(self, cart_id: str) -> List[Event]:
cursor = self.event_store.conn.execute(
"""SELECT * FROM events
WHERE aggregate_id = ?
ORDER BY version""",
(cart_id,)
)
events = []
for row in cursor:
events.append({
"type": row[2],
"data": json.loads(row[3]),
"timestamp": row[4]
})
return events
# ๐ Rebuild cart from events
def rebuild_cart(self, cart_id: str) -> Dict:
events = self.get_cart_history(cart_id)
cart = None
for event in events:
if event["type"] == CartEvent.CART_CREATED.value:
cart = {
"user_id": event["data"]["user_id"],
"items": {},
"total": 0.0,
"status": "active"
}
elif event["type"] == CartEvent.ITEM_ADDED.value:
# Apply item addition...
pass
return cart
# ๐ฎ Let's use it!
cart_store = ShoppingCartEventStore()
cart_id = cart_store.create_cart("user123")
cart_store.add_item(cart_id, "prod1", "Python Book ๐", 29.99, 1)
cart_store.add_item(cart_id, "prod2", "Coffee โ", 4.99, 2)
๐ฏ Try it yourself: Add methods for removing items and checking out!
๐ฎ Example 2: Game State Event Sourcing
Letโs make a game with complete history tracking:
# ๐ Game state with event sourcing
class GameEvents(Enum):
GAME_STARTED = "GameStarted"
PLAYER_MOVED = "PlayerMoved"
ITEM_COLLECTED = "ItemCollected"
ENEMY_DEFEATED = "EnemyDefeated"
LEVEL_COMPLETED = "LevelCompleted"
class GameEventStore:
def __init__(self):
self.event_store = EventStore("game_states.db")
self.snapshots = {} # ๐ธ Performance optimization
# ๐ฎ Start new game
def start_game(self, player_name: str) -> str:
game_id = str(uuid4())
event = Event(
aggregate_id=game_id,
event_type=GameEvents.GAME_STARTED.value,
event_data={
"player_name": player_name,
"starting_position": {"x": 0, "y": 0},
"health": 100,
"score": 0
},
timestamp=datetime.now(),
version=1
)
self.event_store.append_event(event)
print(f"๐ฎ {player_name} started a new game!")
return game_id
# ๐ Record player movement
def move_player(self, game_id: str, new_x: int, new_y: int):
version = self._get_next_version(game_id)
event = Event(
aggregate_id=game_id,
event_type=GameEvents.PLAYER_MOVED.value,
event_data={"x": new_x, "y": new_y},
timestamp=datetime.now(),
version=version
)
self.event_store.append_event(event)
print(f"๐ Player moved to ({new_x}, {new_y})")
# ๐ Collect item
def collect_item(self, game_id: str, item_name: str, points: int):
version = self._get_next_version(game_id)
event = Event(
aggregate_id=game_id,
event_type=GameEvents.ITEM_COLLECTED.value,
event_data={
"item_name": item_name,
"points": points,
"emoji": "๐"
},
timestamp=datetime.now(),
version=version
)
self.event_store.append_event(event)
print(f"๐ Collected {item_name} for {points} points!")
# ๐ Get game state at any point
def get_game_state_at(self, game_id: str, timestamp: datetime) -> Dict:
cursor = self.event_store.conn.execute(
"""SELECT * FROM events
WHERE aggregate_id = ? AND timestamp <= ?
ORDER BY version""",
(game_id, timestamp)
)
state = None
for row in cursor:
event_type = row[2]
event_data = json.loads(row[3])
if event_type == GameEvents.GAME_STARTED.value:
state = {
"player_name": event_data["player_name"],
"position": event_data["starting_position"],
"health": event_data["health"],
"score": event_data["score"],
"items": []
}
elif event_type == GameEvents.PLAYER_MOVED.value:
state["position"] = {"x": event_data["x"], "y": event_data["y"]}
elif event_type == GameEvents.ITEM_COLLECTED.value:
state["items"].append(event_data["item_name"])
state["score"] += event_data["points"]
return state
# ๐ Create snapshot for performance
def create_snapshot(self, game_id: str):
current_state = self.replay_game(game_id)
version = self._get_current_version(game_id)
self.snapshots[game_id] = {
"state": current_state,
"version": version,
"timestamp": datetime.now()
}
print(f"๐ธ Snapshot created at version {version}")
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Event Projections
When youโre ready to level up, implement projections:
# ๐ฏ Advanced event projections
class ProjectionEngine:
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.projections = {}
# ๐ Register projection handler
def register_projection(self, event_type: str, handler):
if event_type not in self.projections:
self.projections[event_type] = []
self.projections[event_type].append(handler)
# ๐ Process events through projections
def process_events(self, start_from: int = 0):
cursor = self.event_store.conn.execute(
"SELECT * FROM events WHERE id > ? ORDER BY id",
(start_from,)
)
for row in cursor:
event_type = row[2]
if event_type in self.projections:
event_data = json.loads(row[3])
for handler in self.projections[event_type]:
handler(event_data)
# ๐ Example: User activity projection
class UserActivityProjection:
def __init__(self):
self.user_actions = {} # ๐ Track user activity
def handle_cart_created(self, event_data):
user_id = event_data["user_id"]
if user_id not in self.user_actions:
self.user_actions[user_id] = {"carts": 0, "purchases": 0}
self.user_actions[user_id]["carts"] += 1
๐๏ธ Advanced Topic 2: CQRS with Event Sourcing
Combine Command Query Responsibility Segregation:
# ๐ CQRS + Event Sourcing pattern
class CommandHandler:
def __init__(self, event_store: EventStore):
self.event_store = event_store
# ๐ Handle commands
def handle_create_account(self, account_id: str, owner: str):
# Validate command
if self._account_exists(account_id):
raise ValueError("Account already exists! ๐ฑ")
# Create event
event = AccountCreated(account_id, owner, 0.0)
self.event_store.append_event(event)
# Publish for read model update
self._publish_event(event)
class QueryHandler:
def __init__(self, read_db):
self.read_db = read_db # ๐ Optimized for queries
# ๐ Fast queries on read model
def get_account_balance(self, account_id: str) -> float:
# Query optimized read model, not event store
result = self.read_db.execute(
"SELECT balance FROM accounts WHERE id = ?",
(account_id,)
).fetchone()
return result[0] if result else 0.0
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Mutable Events
# โ Wrong way - mutable events are dangerous!
event = Event(...)
event.event_data["amount"] = 100 # ๐ฅ Never modify events!
# โ
Correct way - events are immutable!
# Create new event instead of modifying
new_event = Event(
aggregate_id=event.aggregate_id,
event_type="AmountCorrected",
event_data={"amount": 100, "reason": "correction"},
timestamp=datetime.now(),
version=event.version + 1
)
๐คฏ Pitfall 2: Loading All Events
# โ Dangerous - loading millions of events!
def get_current_state(self, aggregate_id: str):
all_events = self.load_all_events(aggregate_id) # ๐ฅ Memory explosion!
return self.replay_all(all_events)
# โ
Safe - use snapshots and pagination!
def get_current_state(self, aggregate_id: str):
# Load from latest snapshot
snapshot = self.get_latest_snapshot(aggregate_id)
if snapshot:
# Only replay events after snapshot
recent_events = self.load_events_after(
aggregate_id,
snapshot.version
)
return self.replay_from_snapshot(snapshot, recent_events)
else:
# Paginate if no snapshot
return self.replay_with_pagination(aggregate_id)
๐ ๏ธ Best Practices
- ๐ฏ Events Are Facts: Never delete or modify events - they happened!
- ๐ Meaningful Names: Use domain language for event types
- ๐ก๏ธ Validate Commands: Check business rules before creating events
- ๐ธ Use Snapshots: Create periodic snapshots for performance
- โจ Keep Events Small: Store only what changed, not entire state
- ๐ Version Everything: Track schema versions for event evolution
- ๐ Separate Read/Write: Use CQRS for optimal performance
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Banking Event Store
Create an event-sourced banking system:
๐ Requirements:
- โ Account creation with initial deposit
- ๐ฐ Deposit and withdrawal operations
- ๐ Transfer between accounts
- ๐ Balance calculation from events
- ๐ Transaction history with timestamps
- ๐จ Each transaction type needs an emoji!
๐ Bonus Points:
- Add interest calculation events
- Implement account freeze/unfreeze
- Create monthly statement projections
๐ก Solution
๐ Click to see solution
# ๐ฏ Event-sourced banking system!
from decimal import Decimal
from typing import Optional
class BankingEvents(Enum):
ACCOUNT_OPENED = "AccountOpened"
MONEY_DEPOSITED = "MoneyDeposited"
MONEY_WITHDRAWN = "MoneyWithdrawn"
TRANSFER_INITIATED = "TransferInitiated"
TRANSFER_COMPLETED = "TransferCompleted"
INTEREST_APPLIED = "InterestApplied"
class BankEventStore:
def __init__(self):
self.event_store = EventStore("bank.db")
self.accounts = {} # ๐พ In-memory projection
# ๐ฆ Open new account
def open_account(self, account_number: str,
owner_name: str, initial_deposit: Decimal) -> None:
if initial_deposit < 0:
raise ValueError("Initial deposit must be positive! ๐
")
event = Event(
aggregate_id=account_number,
event_type=BankingEvents.ACCOUNT_OPENED.value,
event_data={
"owner_name": owner_name,
"initial_deposit": str(initial_deposit),
"emoji": "๐ฆ"
},
timestamp=datetime.now(),
version=1
)
self.event_store.append_event(event)
# Update projection
self.accounts[account_number] = {
"owner": owner_name,
"balance": initial_deposit,
"transactions": 1
}
print(f"๐ฆ Account {account_number} opened for {owner_name}!")
# ๐ฐ Deposit money
def deposit(self, account_number: str, amount: Decimal) -> None:
if amount <= 0:
raise ValueError("Deposit amount must be positive! ๐ธ")
version = self._get_next_version(account_number)
event = Event(
aggregate_id=account_number,
event_type=BankingEvents.MONEY_DEPOSITED.value,
event_data={
"amount": str(amount),
"emoji": "๐ฐ"
},
timestamp=datetime.now(),
version=version
)
self.event_store.append_event(event)
# Update projection
self.accounts[account_number]["balance"] += amount
self.accounts[account_number]["transactions"] += 1
print(f"๐ฐ Deposited ${amount} successfully!")
# ๐ธ Withdraw money
def withdraw(self, account_number: str, amount: Decimal) -> None:
account = self.accounts.get(account_number)
if not account:
raise ValueError("Account not found! ๐ฑ")
if account["balance"] < amount:
raise ValueError("Insufficient funds! ๐ข")
version = self._get_next_version(account_number)
event = Event(
aggregate_id=account_number,
event_type=BankingEvents.MONEY_WITHDRAWN.value,
event_data={
"amount": str(amount),
"emoji": "๐ธ"
},
timestamp=datetime.now(),
version=version
)
self.event_store.append_event(event)
# Update projection
account["balance"] -= amount
account["transactions"] += 1
print(f"๐ธ Withdrew ${amount} successfully!")
# ๐ Transfer between accounts
def transfer(self, from_account: str, to_account: str,
amount: Decimal) -> None:
# Validate
from_acc = self.accounts.get(from_account)
to_acc = self.accounts.get(to_account)
if not from_acc or not to_acc:
raise ValueError("Account not found! ๐ฑ")
if from_acc["balance"] < amount:
raise ValueError("Insufficient funds! ๐ข")
# Create transfer events
transfer_id = str(uuid4())
# Debit event
from_version = self._get_next_version(from_account)
debit_event = Event(
aggregate_id=from_account,
event_type=BankingEvents.TRANSFER_INITIATED.value,
event_data={
"transfer_id": transfer_id,
"to_account": to_account,
"amount": str(amount),
"emoji": "๐"
},
timestamp=datetime.now(),
version=from_version
)
self.event_store.append_event(debit_event)
# Credit event
to_version = self._get_next_version(to_account)
credit_event = Event(
aggregate_id=to_account,
event_type=BankingEvents.TRANSFER_COMPLETED.value,
event_data={
"transfer_id": transfer_id,
"from_account": from_account,
"amount": str(amount),
"emoji": "โ
"
},
timestamp=datetime.now(),
version=to_version
)
self.event_store.append_event(credit_event)
# Update projections
from_acc["balance"] -= amount
to_acc["balance"] += amount
print(f"๐ Transferred ${amount} from {from_account} to {to_account}!")
# ๐ Get account statement
def get_statement(self, account_number: str) -> Dict:
events = self._get_account_events(account_number)
statement = {
"account": account_number,
"transactions": [],
"final_balance": Decimal("0")
}
balance = Decimal("0")
for event in events:
event_type = event["type"]
event_data = event["data"]
transaction = {
"date": event["timestamp"],
"type": event_type,
"emoji": event_data.get("emoji", "๐")
}
if event_type == BankingEvents.ACCOUNT_OPENED.value:
amount = Decimal(event_data["initial_deposit"])
balance += amount
transaction["description"] = "Account opened"
transaction["credit"] = amount
elif event_type == BankingEvents.MONEY_DEPOSITED.value:
amount = Decimal(event_data["amount"])
balance += amount
transaction["description"] = "Deposit"
transaction["credit"] = amount
elif event_type == BankingEvents.MONEY_WITHDRAWN.value:
amount = Decimal(event_data["amount"])
balance -= amount
transaction["description"] = "Withdrawal"
transaction["debit"] = amount
transaction["balance"] = balance
statement["transactions"].append(transaction)
statement["final_balance"] = balance
return statement
def _get_next_version(self, account_number: str) -> int:
cursor = self.event_store.conn.execute(
"SELECT MAX(version) FROM events WHERE aggregate_id = ?",
(account_number,)
)
result = cursor.fetchone()
return (result[0] or 0) + 1
# ๐ฎ Test it out!
bank = BankEventStore()
bank.open_account("ACC001", "Alice", Decimal("1000"))
bank.deposit("ACC001", Decimal("500"))
bank.open_account("ACC002", "Bob", Decimal("750"))
bank.transfer("ACC001", "ACC002", Decimal("200"))
# ๐ Print statement
statement = bank.get_statement("ACC001")
print("\n๐ Account Statement:")
for tx in statement["transactions"]:
print(f"{tx['emoji']} {tx['description']}: ", end="")
if 'credit' in tx:
print(f"+${tx['credit']}", end="")
elif 'debit' in tx:
print(f"-${tx['debit']}", end="")
print(f" | Balance: ${tx['balance']}")
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Design event-sourced databases with confidence ๐ช
- โ Implement immutable event stores in Python ๐ก๏ธ
- โ Build projections and read models for queries ๐ฏ
- โ Handle temporal queries and time travel ๐
- โ Apply CQRS patterns with event sourcing! ๐
Remember: Event sourcing isnโt just a pattern, itโs a mindset shift. Think in events, not states! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered event sourcing database design!
Hereโs what to do next:
- ๐ป Practice with the banking exercise above
- ๐๏ธ Build an event-sourced application for your domain
- ๐ Explore event streaming platforms like Kafka
- ๐ Share your event sourcing journey with others!
Remember: Every expert was once a beginner. Keep coding, keep learning, and most importantly, have fun with events! ๐
Happy event sourcing! ๐๐โจ