Prerequisites
- Basic understanding of programming concepts 📝
- Python installation (3.8+) 🐍
- VS Code or preferred IDE 💻
What you'll learn
- Understand the concept fundamentals 🎯
- Apply the concept in real projects 🏗️
- Debug common issues 🐛
- Write clean, Pythonic code ✨
🎯 Introduction
Welcome to this exciting tutorial on monitoring with Prometheus and Grafana! 🎉 In this guide, we’ll explore how to build robust monitoring systems that give you superpowers to see what’s happening in your Python applications!
You’ll discover how Prometheus and Grafana can transform your DevOps experience. Whether you’re building web applications 🌐, microservices 🖥️, or data pipelines 📊, understanding monitoring is essential for keeping your systems healthy and happy!
By the end of this tutorial, you’ll feel confident setting up professional monitoring for your own projects! Let’s dive in! 🏊♂️
📚 Understanding Monitoring with Prometheus and Grafana
🤔 What is Prometheus?
Prometheus is like a super-smart health tracker for your applications! 🏃♂️ Think of it as a fitness watch that constantly checks your app’s vital signs - CPU usage, memory, response times, and more!
In technical terms, Prometheus is a time-series database that collects metrics from your applications. This means you can:
- ✨ Track performance over time
- 🚀 Set up alerts when things go wrong
- 🛡️ Prevent issues before users notice them
💡 What is Grafana?
Grafana is like the beautiful dashboard in a luxury car! 🚗 While Prometheus collects all the data, Grafana makes it look amazing with colorful charts and graphs.
Here’s why developers love this combo:
- Real-time Visibility 👁️: See what’s happening right now
- Historical Analysis 📈: Track trends over days, weeks, or months
- Beautiful Dashboards 🎨: Impress your team with stunning visuals
- Instant Alerts 🚨: Get notified before disasters strike
Real-world example: Imagine running an online pizza delivery service 🍕. With Prometheus and Grafana, you can track order processing times, delivery speeds, and customer satisfaction - all in real-time!
🔧 Basic Setup and Usage
📝 Installing the Components
Let’s start by setting up our monitoring stack:
# 👋 Hello, Monitoring!
# First, let's install the Python Prometheus client
# pip install prometheus-client
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import random
# 🎨 Creating our first metrics
request_count = Counter('app_requests_total', 'Total number of requests')
request_duration = Histogram('app_request_duration_seconds', 'Request duration')
active_users = Gauge('app_active_users', 'Number of active users')
# 🚀 Start the metrics server
start_http_server(8000) # Prometheus will scrape metrics from port 8000
print("Metrics server started on port 8000! 🎉")
💡 Explanation: We’ve created three types of metrics: Counter (always goes up), Histogram (tracks distributions), and Gauge (can go up or down)!
🎯 Creating Your First Metrics
Here’s how to instrument your Python code:
# 🏗️ A simple web application with monitoring
from flask import Flask
from prometheus_client import make_wsgi_app, Counter, Histogram
from werkzeug.middleware.dispatcher import DispatcherMiddleware
import time
import random
app = Flask(__name__)
# 🎨 Define our metrics
request_count = Counter(
'flask_requests_total',
'Total requests',
['method', 'endpoint', 'status']
)
request_latency = Histogram(
'flask_request_latency_seconds',
'Request latency'
)
# 🔄 Decorator to track request metrics
def track_metrics(f):
def wrapper(*args, **kwargs):
start_time = time.time()
# Execute the actual function
result = f(*args, **kwargs)
# 📊 Record metrics
request_count.labels(
method='GET',
endpoint=f.__name__,
status='200'
).inc()
request_latency.observe(time.time() - start_time)
return result
return wrapper
@app.route('/')
@track_metrics
def home():
# 🎲 Simulate some processing time
time.sleep(random.uniform(0.1, 0.5))
return "Welcome to our monitored app! 🚀"
@app.route('/api/data')
@track_metrics
def get_data():
# 📊 Simulate data processing
time.sleep(random.uniform(0.2, 1.0))
return {"data": "Here's your data! 📦"}
# 🛡️ Add Prometheus metrics endpoint
app.wsgi_app = DispatcherMiddleware(app.wsgi_app, {
'/metrics': make_wsgi_app()
})
if __name__ == '__main__':
print("Starting Flask app with Prometheus metrics! 🎉")
app.run(port=5000)
💡 Practical Examples
🛒 Example 1: E-commerce Order Tracking
Let’s build a real monitoring system for an online store:
# 🛍️ E-commerce monitoring system
from prometheus_client import Counter, Histogram, Gauge, Info
from datetime import datetime
import random
import time
import threading
# 📊 Define our business metrics
order_counter = Counter(
'shop_orders_total',
'Total number of orders',
['product_category', 'payment_method']
)
order_value = Histogram(
'shop_order_value_dollars',
'Order value in dollars',
buckets=[10, 25, 50, 100, 250, 500, 1000]
)
inventory_level = Gauge(
'shop_inventory_items',
'Current inventory level',
['product_name']
)
shop_info = Info('shop_info', 'Shop information')
shop_info.info({
'version': '1.0.0',
'location': 'San Francisco',
'emoji': '🛒'
})
class EcommerceMonitor:
def __init__(self):
self.products = {
'laptop': {'emoji': '💻', 'price': 999, 'stock': 50},
'phone': {'emoji': '📱', 'price': 699, 'stock': 100},
'headphones': {'emoji': '🎧', 'price': 199, 'stock': 200},
'keyboard': {'emoji': '⌨️', 'price': 89, 'stock': 150}
}
# 🛒 Process an order
def process_order(self, product, quantity, payment_method):
product_info = self.products[product]
total_value = product_info['price'] * quantity
# 📈 Update metrics
order_counter.labels(
product_category=product,
payment_method=payment_method
).inc()
order_value.observe(total_value)
# 📦 Update inventory
product_info['stock'] -= quantity
inventory_level.labels(product_name=product).set(product_info['stock'])
print(f"✅ Order processed: {quantity}x {product_info['emoji']} {product} = ${total_value}")
# 🎊 Check for low stock alert
if product_info['stock'] < 20:
print(f"⚠️ Low stock alert for {product}! Only {product_info['stock']} left!")
# 🔄 Simulate orders
def simulate_orders(self):
products = list(self.products.keys())
payment_methods = ['credit_card', 'paypal', 'crypto']
while True:
# 🎲 Random order
product = random.choice(products)
quantity = random.randint(1, 5)
payment = random.choice(payment_methods)
self.process_order(product, quantity, payment)
# 😴 Wait before next order
time.sleep(random.uniform(1, 5))
# 🎮 Let's run it!
monitor = EcommerceMonitor()
# Initialize inventory gauges
for product, info in monitor.products.items():
inventory_level.labels(product_name=product).set(info['stock'])
# Start order simulation in background
order_thread = threading.Thread(target=monitor.simulate_orders, daemon=True)
order_thread.start()
# Keep the main thread alive
print("E-commerce monitoring started! Check metrics at http://localhost:8000 📊")
🎮 Example 2: Game Server Monitoring
Let’s monitor a multiplayer game server:
# 🏆 Game server monitoring
from prometheus_client import Counter, Histogram, Gauge, Enum
import time
import random
import threading
from dataclasses import dataclass
from typing import Dict, List
# 🎯 Game metrics
player_actions = Counter(
'game_player_actions_total',
'Player actions in game',
['action_type', 'player_level']
)
match_duration = Histogram(
'game_match_duration_seconds',
'Match duration in seconds',
buckets=[60, 120, 300, 600, 900, 1800]
)
active_players = Gauge(
'game_active_players',
'Currently active players'
)
server_status = Enum(
'game_server_status',
'Current server status',
states=['starting', 'running', 'maintenance', 'stopping']
)
queue_size = Gauge(
'game_matchmaking_queue_size',
'Players waiting in matchmaking queue',
['game_mode']
)
@dataclass
class Player:
id: str
name: str
level: int
emoji: str
score: int = 0
class GameServer:
def __init__(self):
self.players: Dict[str, Player] = {}
self.matches_played = 0
self.game_modes = ['battle_royale', 'team_deathmatch', 'capture_flag']
server_status.state('running')
# 🎮 Player joins the game
def player_join(self, player_id: str, name: str):
emojis = ['🦸', '🧙♂️', '🥷', '🤖', '👾']
level = random.randint(1, 50)
player = Player(
id=player_id,
name=name,
level=level,
emoji=random.choice(emojis)
)
self.players[player_id] = player
active_players.inc()
print(f"{player.emoji} {name} (Level {level}) joined the game! 🎉")
return player
# 🎯 Player performs action
def player_action(self, player_id: str, action: str):
if player_id not in self.players:
return
player = self.players[player_id]
# 📊 Track the action
player_actions.labels(
action_type=action,
player_level=f"level_{player.level//10}0s"
).inc()
# 🎯 Update player score
points = {
'attack': 10,
'defend': 5,
'heal': 3,
'special_move': 20
}
player.score += points.get(action, 1)
print(f"{player.emoji} {player.name} performed {action}! Score: {player.score}")
# 🏁 Run a match
def run_match(self, mode: str, players: List[str]):
print(f"\n🏁 Starting {mode} match with {len(players)} players!")
# 📊 Update queue metrics
queue_size.labels(game_mode=mode).set(0)
start_time = time.time()
match_length = random.uniform(180, 900) # 3-15 minutes
# 🔄 Simulate match actions
actions = ['attack', 'defend', 'heal', 'special_move']
action_count = 0
while time.time() - start_time < match_length:
# Random player performs random action
if players and random.random() < 0.3:
player_id = random.choice(players)
action = random.choice(actions)
self.player_action(player_id, action)
action_count += 1
time.sleep(0.5)
# 📊 Record match duration
duration = time.time() - start_time
match_duration.observe(duration)
self.matches_played += 1
print(f"🏆 Match ended! Duration: {duration:.1f}s, Actions: {action_count}")
# 👋 Players leave after match
for player_id in players[:]:
if random.random() < 0.3: # 30% chance to leave
self.player_leave(player_id)
# 👋 Player leaves
def player_leave(self, player_id: str):
if player_id in self.players:
player = self.players[player_id]
del self.players[player_id]
active_players.dec()
print(f"👋 {player.emoji} {player.name} left the game")
# 🎲 Simulate game activity
def simulate_activity(self):
player_names = ['DragonSlayer', 'NinjaWarrior', 'SpaceKnight',
'CyberMage', 'ShadowHunter', 'PhoenixRider']
while True:
# 🎮 New players join
if len(self.players) < 20 and random.random() < 0.3:
name = random.choice(player_names) + str(random.randint(100, 999))
self.player_join(f"player_{len(self.players)}", name)
# 🏁 Start matches
if len(self.players) >= 4:
mode = random.choice(self.game_modes)
# Add players to queue
queue_size.labels(game_mode=mode).set(len(self.players))
# Select players for match
num_players = min(random.randint(4, 10), len(self.players))
match_players = random.sample(list(self.players.keys()), num_players)
# Run the match
self.run_match(mode, match_players)
time.sleep(2)
# 🚀 Start the game server
server = GameServer()
# Start simulation in background
game_thread = threading.Thread(target=server.simulate_activity, daemon=True)
game_thread.start()
print("🎮 Game server monitoring active! Watch the action unfold! 🚀")
🚀 Advanced Concepts
🧙♂️ Custom Metrics and Exporters
When you’re ready to level up, create custom exporters:
# 🎯 Advanced custom exporter
from prometheus_client import CollectorRegistry, Gauge, generate_latest
import psutil
import requests
import json
class SystemHealthExporter:
def __init__(self):
self.registry = CollectorRegistry()
# 💻 System metrics
self.cpu_usage = Gauge(
'system_cpu_usage_percent',
'CPU usage percentage',
registry=self.registry
)
self.memory_usage = Gauge(
'system_memory_usage_percent',
'Memory usage percentage',
registry=self.registry
)
self.disk_usage = Gauge(
'system_disk_usage_percent',
'Disk usage percentage',
['mount_point'],
registry=self.registry
)
# 🌐 External service health
self.api_health = Gauge(
'external_api_health',
'External API health status',
['service_name'],
registry=self.registry
)
# 📊 Collect system metrics
def collect_system_metrics(self):
# CPU usage
self.cpu_usage.set(psutil.cpu_percent(interval=1))
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.percent)
# Disk usage for each partition
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
self.disk_usage.labels(
mount_point=partition.mountpoint
).set(usage.percent)
except:
pass # Skip inaccessible partitions
# 🌐 Check external services
def check_external_services(self):
services = {
'payment_api': 'https://api.example.com/health',
'database': 'http://localhost:5432/health',
'cache': 'http://localhost:6379/ping'
}
for service_name, url in services.items():
try:
response = requests.get(url, timeout=5)
# 1 = healthy, 0 = unhealthy
health_status = 1 if response.status_code == 200 else 0
except:
health_status = 0
self.api_health.labels(service_name=service_name).set(health_status)
# 📤 Export metrics
def get_metrics(self):
self.collect_system_metrics()
self.check_external_services()
return generate_latest(self.registry)
# 🪄 Using the custom exporter
exporter = SystemHealthExporter()
metrics_data = exporter.get_metrics()
print("Custom metrics collected! ✨")
🏗️ Alert Rules and Automation
For the brave developers, here’s how to set up automated alerts:
# 🚨 Alert manager integration
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import threading
class AlertManager:
def __init__(self):
self.alert_rules = []
self.alert_history = []
# 📋 Define alert rule
def add_rule(self, name, condition, message, severity='warning'):
rule = {
'name': name,
'condition': condition,
'message': message,
'severity': severity,
'emoji': self._get_severity_emoji(severity)
}
self.alert_rules.append(rule)
print(f"✅ Alert rule '{name}' added!")
def _get_severity_emoji(self, severity):
return {
'info': 'ℹ️',
'warning': '⚠️',
'critical': '🚨',
'disaster': '💥'
}.get(severity, '📢')
# 🔍 Check rules
def check_rules(self, metrics):
triggered_alerts = []
for rule in self.alert_rules:
if rule['condition'](metrics):
alert = {
'rule': rule['name'],
'message': rule['message'],
'severity': rule['severity'],
'emoji': rule['emoji'],
'timestamp': datetime.now()
}
triggered_alerts.append(alert)
self.alert_history.append(alert)
# 📧 Send notification
self._send_alert(alert)
return triggered_alerts
def _send_alert(self, alert):
print(f"\n{alert['emoji']} ALERT: {alert['rule']}")
print(f"Message: {alert['message']}")
print(f"Severity: {alert['severity']}")
print(f"Time: {alert['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}\n")
# 🎯 Example usage
alert_manager = AlertManager()
# Define alert rules
alert_manager.add_rule(
name='High CPU Usage',
condition=lambda m: m.get('cpu_usage', 0) > 80,
message='CPU usage is above 80%! Check for resource-intensive processes.',
severity='warning'
)
alert_manager.add_rule(
name='Low Disk Space',
condition=lambda m: m.get('disk_free_gb', 100) < 10,
message='Less than 10GB disk space remaining!',
severity='critical'
)
alert_manager.add_rule(
name='API Errors Spike',
condition=lambda m: m.get('error_rate', 0) > 5,
message='Error rate exceeded 5%! Investigate immediately!',
severity='critical'
)
# Simulate metrics and check alerts
test_metrics = {
'cpu_usage': 85,
'disk_free_gb': 8,
'error_rate': 2
}
triggered = alert_manager.check_rules(test_metrics)
⚠️ Common Pitfalls and Solutions
😱 Pitfall 1: Metric Explosion
# ❌ Wrong way - too many label combinations!
request_metric = Counter(
'requests_total',
'Total requests',
['user_id', 'ip_address', 'user_agent'] # 💥 Millions of combinations!
)
# ✅ Correct way - limited cardinality
request_metric = Counter(
'requests_total',
'Total requests',
['method', 'endpoint', 'status_code'] # ✅ Limited combinations
)
🤯 Pitfall 2: Missing Error Handling
# ❌ Dangerous - metrics server might crash!
def record_metric(value):
metric.observe(value) # 💥 What if metric is None?
# ✅ Safe - always handle errors in monitoring!
def record_metric(value):
try:
if value is not None and isinstance(value, (int, float)):
metric.observe(value)
else:
print(f"⚠️ Invalid metric value: {value}")
except Exception as e:
print(f"🚨 Error recording metric: {e}")
# Never let monitoring crash your app!
🛠️ Best Practices
- 🎯 Choose Meaningful Metrics: Track what matters to your business
- 📝 Use Consistent Naming:
service_component_unit
(e.g.,api_requests_total
) - 🛡️ Limit Label Cardinality: Keep label combinations under control
- 🎨 Create Beautiful Dashboards: Make data easy to understand
- ✨ Set Smart Alerts: Alert on symptoms, not just thresholds
🧪 Hands-On Exercise
🎯 Challenge: Build a Microservices Monitor
Create a monitoring system for a microservices architecture:
📋 Requirements:
- ✅ Track request rates between services
- 🏷️ Monitor service health and uptime
- 👤 Track user journey through services
- 📅 Calculate end-to-end latency
- 🎨 Create service dependency visualization
🚀 Bonus Points:
- Add circuit breaker metrics
- Implement SLA tracking
- Create automated remediation
💡 Solution
🔍 Click to see solution
# 🎯 Microservices monitoring solution!
from prometheus_client import Counter, Histogram, Gauge, Enum
import time
import random
import threading
from datetime import datetime, timedelta
class MicroserviceMonitor:
def __init__(self):
# 📊 Service communication metrics
self.service_requests = Counter(
'service_requests_total',
'Requests between services',
['from_service', 'to_service', 'status']
)
self.request_duration = Histogram(
'service_request_duration_seconds',
'Request duration between services',
['from_service', 'to_service'],
buckets=[0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5]
)
# 🏥 Health metrics
self.service_health = Gauge(
'service_health_score',
'Service health score (0-100)',
['service']
)
self.service_uptime = Gauge(
'service_uptime_seconds',
'Service uptime in seconds',
['service']
)
# 🔄 Circuit breaker metrics
self.circuit_breaker_state = Enum(
'circuit_breaker_state',
'Circuit breaker state',
['service'],
states=['closed', 'open', 'half_open']
)
self.failed_requests = Counter(
'circuit_breaker_failed_requests',
'Failed requests triggering circuit breaker',
['service']
)
# 📈 SLA metrics
self.sla_compliance = Gauge(
'service_sla_compliance_percent',
'SLA compliance percentage',
['service', 'sla_type']
)
# 🏗️ Service definitions
self.services = {
'api_gateway': {'emoji': '🚪', 'start_time': datetime.now()},
'user_service': {'emoji': '👤', 'start_time': datetime.now()},
'order_service': {'emoji': '🛒', 'start_time': datetime.now()},
'payment_service': {'emoji': '💳', 'start_time': datetime.now()},
'notification_service': {'emoji': '📧', 'start_time': datetime.now()}
}
# Initialize circuit breakers
for service in self.services:
self.circuit_breaker_state.labels(service=service).state('closed')
# 🔄 Simulate service communication
def service_call(self, from_service, to_service):
start_time = time.time()
# Simulate different scenarios
scenarios = [
('success', 0.85, lambda: random.uniform(0.01, 0.1)),
('timeout', 0.05, lambda: 5.0),
('error', 0.05, lambda: random.uniform(0.001, 0.01)),
('slow', 0.05, lambda: random.uniform(0.5, 2.0))
]
# Pick a scenario
rand = random.random()
cumulative = 0
for status, probability, duration_func in scenarios:
cumulative += probability
if rand <= cumulative:
duration = duration_func()
break
# Record metrics
self.service_requests.labels(
from_service=from_service,
to_service=to_service,
status=status
).inc()
self.request_duration.labels(
from_service=from_service,
to_service=to_service
).observe(duration)
# Handle circuit breaker
if status in ['timeout', 'error']:
self.failed_requests.labels(service=to_service).inc()
self._check_circuit_breaker(to_service)
print(f"{self.services[from_service]['emoji']} → "
f"{self.services[to_service]['emoji']} "
f"{status} ({duration:.3f}s)")
return status, duration
# 🔌 Circuit breaker logic
def _check_circuit_breaker(self, service):
# Simplified circuit breaker (opens after 5 failures)
if random.random() < 0.2: # 20% chance to open
self.circuit_breaker_state.labels(service=service).state('open')
print(f"⚡ Circuit breaker OPEN for {service}!")
# Schedule half-open after 30 seconds
def half_open():
time.sleep(30)
self.circuit_breaker_state.labels(service=service).state('half_open')
print(f"🔄 Circuit breaker HALF-OPEN for {service}")
threading.Thread(target=half_open, daemon=True).start()
# 📊 Update service health
def update_health_metrics(self):
for service, info in self.services.items():
# Calculate health score (simplified)
health_score = random.uniform(85, 100)
if self.circuit_breaker_state._value.get(service) == 'open':
health_score = random.uniform(20, 40)
self.service_health.labels(service=service).set(health_score)
# Update uptime
uptime = (datetime.now() - info['start_time']).total_seconds()
self.service_uptime.labels(service=service).set(uptime)
# SLA compliance
self.sla_compliance.labels(
service=service,
sla_type='availability'
).set(min(99.9, health_score + random.uniform(0, 5)))
self.sla_compliance.labels(
service=service,
sla_type='response_time'
).set(random.uniform(95, 99.9))
# 🎭 Simulate user journey
def simulate_user_journey(self):
journeys = [
# Login flow
[('api_gateway', 'user_service'),
('user_service', 'notification_service')],
# Order flow
[('api_gateway', 'order_service'),
('order_service', 'user_service'),
('order_service', 'payment_service'),
('payment_service', 'notification_service')],
# Browse flow
[('api_gateway', 'order_service'),
('order_service', 'user_service')]
]
journey = random.choice(journeys)
print(f"\n🚶 Starting user journey with {len(journey)} steps")
total_duration = 0
for from_svc, to_svc in journey:
status, duration = self.service_call(from_svc, to_svc)
total_duration += duration
if status in ['timeout', 'error']:
print(f"❌ Journey failed at {to_svc}!")
break
time.sleep(0.1) # Small delay between calls
else:
print(f"✅ Journey completed in {total_duration:.3f}s!")
# 🏃 Run monitoring simulation
def run_simulation(self):
while True:
# User journeys
if random.random() < 0.7:
self.simulate_user_journey()
# Update health metrics
self.update_health_metrics()
# Random service-to-service calls
if random.random() < 0.3:
services = list(self.services.keys())
from_svc = random.choice(services)
to_svc = random.choice([s for s in services if s != from_svc])
self.service_call(from_svc, to_svc)
time.sleep(random.uniform(0.5, 2))
# 🚀 Start microservices monitoring
monitor = MicroserviceMonitor()
# Run simulation
sim_thread = threading.Thread(target=monitor.run_simulation, daemon=True)
sim_thread.start()
print("🎯 Microservices monitoring active!")
print("📊 Watch service interactions and health metrics!")
🎓 Key Takeaways
You’ve learned so much! Here’s what you can now do:
- ✅ Set up Prometheus to collect metrics from Python apps 💪
- ✅ Create custom metrics for your specific needs 🛡️
- ✅ Build beautiful Grafana dashboards that impress 🎯
- ✅ Configure alerts to catch issues early 🐛
- ✅ Monitor complex systems like microservices 🚀
Remember: Good monitoring is like having X-ray vision for your applications! It helps you see problems before your users do. 🦸♂️
🤝 Next Steps
Congratulations! 🎉 You’ve mastered monitoring with Prometheus and Grafana!
Here’s what to do next:
- 💻 Set up Prometheus and Grafana locally using Docker
- 🏗️ Add monitoring to one of your existing projects
- 📚 Explore PromQL (Prometheus Query Language) for advanced queries
- 🌟 Share your beautiful dashboards with your team!
Remember: Every DevOps expert started by monitoring their first metric. Keep experimenting, keep measuring, and most importantly, have fun building observable systems! 🚀
Happy monitoring! 🎉🚀✨