Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on Event Sourcing! ๐ In this guide, weโll explore how to implement event sourcing patterns in Python - a powerful architectural pattern that stores all changes to application state as a sequence of events.
Youโll discover how event sourcing can transform your Python applications by providing complete audit trails, time-travel debugging, and the ability to reconstruct any past state. Whether youโre building financial systems ๐ฐ, e-commerce platforms ๐, or distributed applications ๐, understanding event sourcing is essential for writing scalable, maintainable systems.
By the end of this tutorial, youโll feel confident implementing event sourcing in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Event Sourcing
๐ค What is Event Sourcing?
Event sourcing is like keeping a detailed diary of everything that happens in your application ๐. Think of it as recording every single action (event) that changes your applicationโs state, rather than just storing the final result.
In Python terms, instead of updating a database record directly, you store events that describe what happened. This means you can:
- โจ Replay events to reconstruct any past state
- ๐ Debug issues by examining the exact sequence of events
- ๐ก๏ธ Never lose data - everything is preserved in the event log
๐ก Why Use Event Sourcing?
Hereโs why developers love event sourcing:
- Complete Audit Trail ๐: Every change is recorded with who, what, when
- Time Travel ๐ป: Reconstruct application state at any point in time
- Event-Driven Architecture ๐: Natural fit for microservices and CQRS
- Business Intelligence ๐ง: Rich data for analytics and reporting
Real-world example: Imagine building a banking system ๐ฆ. With event sourcing, you can track every transaction, reconstruct account balances at any date, and provide complete audit trails for compliance.
๐ง Basic Syntax and Usage
๐ Simple Event Store Implementation
Letโs start with a friendly example:
# ๐ Hello, Event Sourcing!
from datetime import datetime
from typing import List, Dict, Any
from dataclasses import dataclass, field
import json
# ๐จ Creating a simple event
@dataclass
class Event:
event_type: str # ๐ฏ Type of event
aggregate_id: str # ๐ Entity this event belongs to
data: Dict[str, Any] # ๐ฆ Event payload
timestamp: datetime = field(default_factory=datetime.now) # โฐ When it happened
version: int = 1 # ๐ Event version
# ๐๏ธ Basic Event Store
class EventStore:
def __init__(self):
self.events: List[Event] = [] # ๐ Our event log
def append(self, event: Event):
"""โ Add event to the store"""
self.events.append(event)
print(f"๐ Stored event: {event.event_type} for {event.aggregate_id}")
def get_events(self, aggregate_id: str) -> List[Event]:
"""๐ Get all events for an aggregate"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
๐ก Explanation: Notice how we store events as immutable records! Each event captures what happened, when, and to which entity.
๐ฏ Common Event Patterns
Here are patterns youโll use daily:
# ๐๏ธ Pattern 1: Domain Events
class AccountCreated(Event):
def __init__(self, account_id: str, owner: str, initial_balance: float):
super().__init__(
event_type="AccountCreated",
aggregate_id=account_id,
data={"owner": owner, "balance": initial_balance}
)
class MoneyDeposited(Event):
def __init__(self, account_id: str, amount: float):
super().__init__(
event_type="MoneyDeposited",
aggregate_id=account_id,
data={"amount": amount}
)
# ๐จ Pattern 2: Event Handlers
class EventHandler:
def handle(self, event: Event):
"""๐ Process events based on type"""
handlers = {
"AccountCreated": self.handle_account_created,
"MoneyDeposited": self.handle_money_deposited
}
handler = handlers.get(event.event_type)
if handler:
handler(event)
def handle_account_created(self, event: Event):
print(f"โจ Creating account for {event.data['owner']}")
def handle_money_deposited(self, event: Event):
print(f"๐ฐ Depositing ${event.data['amount']}")
๐ก Practical Examples
๐ Example 1: Shopping Cart Event Store
Letโs build something real:
# ๐๏ธ Shopping cart with event sourcing
class ShoppingCart:
def __init__(self, cart_id: str, event_store: EventStore):
self.cart_id = cart_id
self.event_store = event_store
self.items = {} # ๐ Current state
self._rebuild_state()
def _rebuild_state(self):
"""๐ Replay events to rebuild current state"""
events = self.event_store.get_events(self.cart_id)
for event in events:
self._apply_event(event)
def _apply_event(self, event: Event):
"""๐ฏ Apply event to update state"""
if event.event_type == "ItemAdded":
item = event.data
if item['product_id'] in self.items:
self.items[item['product_id']]['quantity'] += item['quantity']
else:
self.items[item['product_id']] = item
elif event.event_type == "ItemRemoved":
product_id = event.data['product_id']
if product_id in self.items:
del self.items[product_id]
def add_item(self, product_id: str, name: str, price: float, quantity: int = 1):
"""โ Add item to cart"""
event = Event(
event_type="ItemAdded",
aggregate_id=self.cart_id,
data={
"product_id": product_id,
"name": name,
"price": price,
"quantity": quantity,
"emoji": "๐๏ธ"
}
)
self.event_store.append(event)
self._apply_event(event)
print(f"Added {event.data['emoji']} {name} to cart!")
def remove_item(self, product_id: str):
"""โ Remove item from cart"""
event = Event(
event_type="ItemRemoved",
aggregate_id=self.cart_id,
data={"product_id": product_id}
)
self.event_store.append(event)
self._apply_event(event)
def get_total(self) -> float:
"""๐ฐ Calculate total"""
return sum(item['price'] * item['quantity']
for item in self.items.values())
# ๐ฎ Let's use it!
store = EventStore()
cart = ShoppingCart("cart-123", store)
cart.add_item("1", "Python Book", 29.99)
cart.add_item("2", "Coffee", 4.99, quantity=3)
print(f"Total: ${cart.get_total():.2f}")
๐ฏ Try it yourself: Add a checkout
event and implement discount codes!
๐ฎ Example 2: Game State Event Sourcing
Letโs make it fun:
# ๐ Game with event-sourced state
class GameEventStore:
def __init__(self):
self.events = []
self.snapshots = {} # ๐ธ State snapshots for performance
def append(self, event: Event):
self.events.append(event)
def create_snapshot(self, game_id: str, state: Dict):
"""๐ธ Save state snapshot"""
self.snapshots[game_id] = {
"state": state.copy(),
"version": len([e for e in self.events if e.aggregate_id == game_id])
}
class RPGGame:
def __init__(self, game_id: str, player_name: str, event_store: GameEventStore):
self.game_id = game_id
self.event_store = event_store
self.state = {
"player": player_name,
"level": 1,
"hp": 100,
"xp": 0,
"inventory": [],
"achievements": ["๐ New Adventure"]
}
# ๐ฎ Start game event
start_event = Event(
event_type="GameStarted",
aggregate_id=game_id,
data={"player": player_name}
)
self.event_store.append(start_event)
def attack_monster(self, monster: str, damage: int):
"""โ๏ธ Battle event"""
event = Event(
event_type="MonsterDefeated",
aggregate_id=self.game_id,
data={
"monster": monster,
"damage": damage,
"xp_gained": damage * 10
}
)
self.event_store.append(event)
# ๐ฏ Update state
self.state["xp"] += damage * 10
print(f"โ๏ธ Defeated {monster}! Gained {damage * 10} XP!")
# ๐ Check for level up
if self.state["xp"] >= self.state["level"] * 100:
self.level_up()
def level_up(self):
"""๐ Level up event"""
self.state["level"] += 1
self.state["hp"] = 100 + (self.state["level"] * 20)
event = Event(
event_type="LevelUp",
aggregate_id=self.game_id,
data={
"new_level": self.state["level"],
"new_hp": self.state["hp"]
}
)
self.event_store.append(event)
self.state["achievements"].append(f"๐ Level {self.state['level']} Hero")
print(f"๐ Level up! Now level {self.state['level']}!")
def replay_game(self) -> List[str]:
"""๐ Replay all game events"""
events = [e for e in self.event_store.events if e.aggregate_id == self.game_id]
history = []
for event in events:
if event.event_type == "GameStarted":
history.append(f"๐ฎ {event.data['player']} started their adventure!")
elif event.event_type == "MonsterDefeated":
history.append(f"โ๏ธ Defeated {event.data['monster']}!")
elif event.event_type == "LevelUp":
history.append(f"๐ Reached level {event.data['new_level']}!")
return history
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Event Sourcing with Projections
When youโre ready to level up, try this advanced pattern:
# ๐ฏ Advanced projection system
class Projection:
"""๐ฎ Build read models from events"""
def __init__(self):
self.handlers = {}
def register(self, event_type: str, handler):
"""๐ Register event handler"""
self.handlers[event_type] = handler
def project(self, events: List[Event]) -> Dict:
"""๐๏ธ Build projection from events"""
state = {}
for event in events:
handler = self.handlers.get(event.event_type)
if handler:
state = handler(state, event)
return state
# ๐ช Account balance projection
def create_account_projection():
projection = Projection()
def handle_account_created(state: Dict, event: Event) -> Dict:
state[event.aggregate_id] = {
"balance": event.data["balance"],
"owner": event.data["owner"],
"transactions": []
}
return state
def handle_money_deposited(state: Dict, event: Event) -> Dict:
if event.aggregate_id in state:
state[event.aggregate_id]["balance"] += event.data["amount"]
state[event.aggregate_id]["transactions"].append({
"type": "deposit",
"amount": event.data["amount"],
"timestamp": event.timestamp
})
return state
projection.register("AccountCreated", handle_account_created)
projection.register("MoneyDeposited", handle_money_deposited)
return projection
๐๏ธ Advanced Topic 2: Event Store with Persistence
For the brave developers:
# ๐ Persistent event store with JSON
import json
from pathlib import Path
class PersistentEventStore:
def __init__(self, filepath: str):
self.filepath = Path(filepath)
self.events = self._load_events()
def _load_events(self) -> List[Event]:
"""๐ Load events from file"""
if not self.filepath.exists():
return []
with open(self.filepath, 'r') as f:
data = json.load(f)
return [self._deserialize_event(e) for e in data]
def _deserialize_event(self, data: Dict) -> Event:
"""๐ Convert JSON to Event"""
return Event(
event_type=data["event_type"],
aggregate_id=data["aggregate_id"],
data=data["data"],
timestamp=datetime.fromisoformat(data["timestamp"]),
version=data["version"]
)
def append(self, event: Event):
"""๐พ Append and persist event"""
self.events.append(event)
self._save_events()
def _save_events(self):
"""๐พ Save events to file"""
data = [{
"event_type": e.event_type,
"aggregate_id": e.aggregate_id,
"data": e.data,
"timestamp": e.timestamp.isoformat(),
"version": e.version
} for e in self.events]
with open(self.filepath, 'w') as f:
json.dump(data, f, indent=2)
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Mutable Events
# โ Wrong way - mutable events are dangerous!
event = Event("UserUpdated", "user-123", {"name": "Alice"})
event.data["name"] = "Bob" # ๐ฅ Modifying event data!
# โ
Correct way - events are immutable!
from dataclasses import dataclass, field
from typing import Dict, Any
@dataclass(frozen=True) # ๐ก๏ธ Make it immutable!
class ImmutableEvent:
event_type: str
aggregate_id: str
data: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
# ๐ Make data immutable too
object.__setattr__(self, 'data', dict(self.data))
๐คฏ Pitfall 2: Loading All Events
# โ Dangerous - loading millions of events!
def get_current_state(aggregate_id: str):
all_events = event_store.get_all_events() # ๐ฅ Memory explosion!
return rebuild_from_events(all_events)
# โ
Safe - use snapshots and pagination!
def get_current_state(aggregate_id: str):
# ๐ธ Start from latest snapshot
snapshot = event_store.get_latest_snapshot(aggregate_id)
# ๐ Only replay events after snapshot
events_after_snapshot = event_store.get_events_after(
aggregate_id,
snapshot.version if snapshot else 0
)
state = snapshot.state if snapshot else {}
return apply_events(state, events_after_snapshot)
๐ ๏ธ Best Practices
- ๐ฏ Events are Immutable: Never modify events after creation
- ๐ Event Names are Past Tense:
OrderPlaced
notPlaceOrder
- ๐ก๏ธ Version Your Events: Handle schema evolution gracefully
- ๐จ Keep Events Small: Store only what changed, not entire state
- โจ Use Projections: Build read models for queries
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Bank Account System
Create an event-sourced banking system:
๐ Requirements:
- โ Create accounts with initial balance
- ๐ท๏ธ Deposit and withdraw money
- ๐ค Transfer between accounts
- ๐ Generate account statements
- ๐จ Calculate interest over time
๐ Bonus Points:
- Add transaction limits
- Implement overdraft protection
- Create monthly statements from events
๐ก Solution
๐ Click to see solution
# ๐ฏ Event-sourced banking system!
from datetime import datetime, timedelta
from typing import List, Optional
class BankEventStore:
def __init__(self):
self.events = []
def append(self, event: Event):
self.events.append(event)
def get_account_events(self, account_id: str) -> List[Event]:
return [e for e in self.events if e.aggregate_id == account_id]
class BankAccount:
def __init__(self, account_id: str, owner: str, event_store: BankEventStore):
self.account_id = account_id
self.owner = owner
self.event_store = event_store
self.balance = 0.0
self.is_active = True
# ๐ Create account event
create_event = Event(
event_type="AccountCreated",
aggregate_id=account_id,
data={
"owner": owner,
"initial_balance": 0.0,
"created_at": datetime.now().isoformat()
}
)
self.event_store.append(create_event)
def deposit(self, amount: float, description: str = ""):
"""๐ฐ Deposit money"""
if amount <= 0:
raise ValueError("Amount must be positive")
event = Event(
event_type="MoneyDeposited",
aggregate_id=self.account_id,
data={
"amount": amount,
"description": description,
"balance_after": self.balance + amount
}
)
self.event_store.append(event)
self.balance += amount
print(f"๐ฐ Deposited ${amount:.2f}. New balance: ${self.balance:.2f}")
def withdraw(self, amount: float, description: str = ""):
"""๐ธ Withdraw money"""
if amount <= 0:
raise ValueError("Amount must be positive")
if amount > self.balance:
raise ValueError("Insufficient funds!")
event = Event(
event_type="MoneyWithdrawn",
aggregate_id=self.account_id,
data={
"amount": amount,
"description": description,
"balance_after": self.balance - amount
}
)
self.event_store.append(event)
self.balance -= amount
print(f"๐ธ Withdrew ${amount:.2f}. New balance: ${self.balance:.2f}")
def transfer_to(self, target_account: 'BankAccount', amount: float):
"""๐ฑ Transfer money to another account"""
# ๐ค Withdrawal event
self.withdraw(amount, f"Transfer to {target_account.account_id}")
# ๐ฅ Deposit event
target_account.deposit(amount, f"Transfer from {self.account_id}")
# ๐ Transfer event for audit
transfer_event = Event(
event_type="MoneyTransferred",
aggregate_id=self.account_id,
data={
"from_account": self.account_id,
"to_account": target_account.account_id,
"amount": amount
}
)
self.event_store.append(transfer_event)
def get_statement(self, start_date: Optional[datetime] = None) -> List[str]:
"""๐ Generate account statement"""
events = self.event_store.get_account_events(self.account_id)
statement = [f"๐ Statement for {self.owner} (Account: {self.account_id})"]
statement.append("=" * 50)
running_balance = 0.0
for event in events:
if start_date and event.timestamp < start_date:
continue
if event.event_type == "AccountCreated":
statement.append(f"โจ Account opened")
elif event.event_type == "MoneyDeposited":
running_balance = event.data["balance_after"]
statement.append(
f"๐ฐ {event.timestamp.strftime('%Y-%m-%d %H:%M')} | "
f"Deposit: ${event.data['amount']:.2f} | "
f"Balance: ${running_balance:.2f}"
)
elif event.event_type == "MoneyWithdrawn":
running_balance = event.data["balance_after"]
statement.append(
f"๐ธ {event.timestamp.strftime('%Y-%m-%d %H:%M')} | "
f"Withdrawal: ${event.data['amount']:.2f} | "
f"Balance: ${running_balance:.2f}"
)
statement.append("=" * 50)
statement.append(f"๐ฐ Current Balance: ${self.balance:.2f}")
return statement
# ๐ฎ Test it out!
event_store = BankEventStore()
# Create accounts
alice_account = BankAccount("ACC-001", "Alice", event_store)
bob_account = BankAccount("ACC-002", "Bob", event_store)
# Transactions
alice_account.deposit(1000, "Initial deposit")
alice_account.withdraw(50, "ATM withdrawal")
alice_account.transfer_to(bob_account, 200)
# Generate statements
print("\n".join(alice_account.get_statement()))
print("\n")
print("\n".join(bob_account.get_statement()))
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Implement event sourcing with confidence ๐ช
- โ Build event stores that preserve history ๐ก๏ธ
- โ Create projections for read models ๐ฏ
- โ Handle common pitfalls like a pro ๐
- โ Design event-driven systems with Python! ๐
Remember: Event sourcing is powerful but not always necessary. Use it when you need audit trails, temporal queries, or event-driven architectures! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered event sourcing patterns!
Hereโs what to do next:
- ๐ป Practice with the banking exercise above
- ๐๏ธ Build an event-sourced order management system
- ๐ Learn about CQRS (Command Query Responsibility Segregation)
- ๐ Explore event streaming with Kafka or RabbitMQ
Remember: Every distributed systems expert started with simple event stores. Keep coding, keep learning, and most importantly, have fun! ๐
Happy coding! ๐๐โจ