Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on the ELK Stack with Python! ๐ In this guide, weโll explore how to implement powerful centralized logging for your Python applications using Elasticsearch, Logstash, and Kibana.
Youโll discover how the ELK Stack can transform your logging experience from scattered log files into a powerful, searchable, and visualizable system. Whether youโre building microservices ๐, debugging production issues ๐, or monitoring application health ๐, understanding ELK Stack integration is essential for modern Python development.
By the end of this tutorial, youโll feel confident implementing comprehensive logging solutions in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding the ELK Stack
๐ค What is the ELK Stack?
The ELK Stack is like having a super-powered detective agency for your logs ๐ต๏ธโโ๏ธ. Think of it as a three-person team where each member has a special skill:
- Elasticsearch ๐: The search expert who can find any log instantly
- Logstash ๐ฅ: The organizer who collects and processes logs
- Kibana ๐: The artist who creates beautiful visualizations
In Python terms, the ELK Stack helps you:
- โจ Centralize logs from multiple applications
- ๐ Search through millions of logs in milliseconds
- ๐ก๏ธ Monitor application health in real-time
- ๐ Create dashboards and alerts
- ๐ฏ Debug issues faster with powerful queries
๐ก Why Use ELK Stack with Python?
Hereโs why developers love ELK Stack for logging:
- Scalability ๐: Handle logs from one app or thousands
- Real-time Processing โก: See logs as they happen
- Powerful Search ๐: Find specific logs instantly
- Beautiful Dashboards ๐: Visualize trends and patterns
- Alerting ๐จ: Get notified when things go wrong
Real-world example: Imagine monitoring an e-commerce platform ๐. With ELK Stack, you can track user actions, system errors, and performance metrics all in one place!
๐ง Basic Syntax and Usage
๐ Setting Up Python for ELK
Letโs start with a friendly example of sending logs to Elasticsearch:
# ๐ Hello, ELK Stack!
import logging
from elasticsearch import Elasticsearch
from pythonjsonlogger import jsonlogger
import datetime
# ๐จ Create Elasticsearch connection
es = Elasticsearch(['localhost:9200'])
# ๐ ๏ธ Custom handler for Elasticsearch
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_client, index_name='python-logs'):
super().__init__()
self.es_client = es_client
self.index_name = index_name
def emit(self, record):
# ๐ Convert log record to dict
log_entry = {
'timestamp': datetime.datetime.utcnow(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
# ๐ Send to Elasticsearch
self.es_client.index(
index=f"{self.index_name}-{datetime.date.today()}",
body=log_entry
)
# ๐ฏ Set up logging
logger = logging.getLogger('my_app')
logger.setLevel(logging.INFO)
# โ Add Elasticsearch handler
es_handler = ElasticsearchHandler(es)
logger.addHandler(es_handler)
# ๐ Log some messages!
logger.info("Application started! ๐")
logger.warning("This is a warning โ ๏ธ")
๐ก Explanation: We create a custom handler that sends each log entry to Elasticsearch with timestamps and metadata!
๐ฏ Using Python-Logstash
Hereโs how to send logs to Logstash:
# ๐๏ธ Using python-logstash library
import logstash
import logging
# ๐จ Create logger
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(logging.INFO)
# ๐ Add Logstash handler
logstash_handler = logstash.TCPLogstashHandler(
host='localhost',
port=5959,
version=1 # ๐ Logstash version
)
logger.addHandler(logstash_handler)
# ๐ Log with extra fields
extra = {
'user_id': '12345',
'action': 'purchase',
'product': 'Python Book ๐',
'price': 29.99
}
logger.info('User made a purchase! ๐', extra=extra)
๐ก Practical Examples
๐ Example 1: E-commerce Application Logging
Letโs build a comprehensive logging system for an online store:
# ๐๏ธ E-commerce logging system
import logging
import json
from datetime import datetime
from elasticsearch import Elasticsearch
import logstash
class EcommerceLogger:
def __init__(self):
# ๐จ Set up Elasticsearch
self.es = Elasticsearch(['localhost:9200'])
# ๐ Create logger
self.logger = logging.getLogger('ecommerce')
self.logger.setLevel(logging.INFO)
# ๐ Add Logstash handler
logstash_handler = logstash.TCPLogstashHandler(
host='localhost',
port=5959
)
self.logger.addHandler(logstash_handler)
def log_user_action(self, user_id, action, details):
# ๐ฏ Log user activities
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'action': action,
'details': details,
'session_id': self._get_session_id()
}
self.logger.info(f"User action: {action} ๐ฏ", extra=log_data)
def log_purchase(self, user_id, items, total):
# ๐ฐ Log purchase events
purchase_data = {
'user_id': user_id,
'items': items,
'total': total,
'timestamp': datetime.utcnow().isoformat()
}
# ๐ Send to Elasticsearch for analytics
self.es.index(
index='purchases',
body=purchase_data
)
self.logger.info(f"Purchase completed! ๐ Total: ${total}",
extra=purchase_data)
def log_error(self, error_type, message, stack_trace=None):
# ๐จ Log errors with context
error_data = {
'error_type': error_type,
'message': message,
'stack_trace': stack_trace,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.error(f"Error occurred: {error_type} ๐ฅ",
extra=error_data)
def _get_session_id(self):
# ๐ฒ Simulate session ID
return f"session_{datetime.now().timestamp()}"
# ๐ฎ Let's use it!
logger = EcommerceLogger()
# ๐ค Log user browsing
logger.log_user_action(
user_id="user_123",
action="view_product",
details={'product_id': 'py_book_001', 'category': 'books'}
)
# ๐ Log purchase
items = [
{'name': 'Python Cookbook ๐', 'price': 45.99},
{'name': 'ELK Stack Guide ๐', 'price': 39.99}
]
logger.log_purchase(
user_id="user_123",
items=items,
total=85.98
)
๐ฏ Try it yourself: Add cart abandonment tracking and performance metrics logging!
๐ฎ Example 2: Microservices Log Aggregation
Letโs create a logging system for microservices:
# ๐ Microservices logging with correlation
import logging
import uuid
from contextvars import ContextVar
from pythonjsonlogger import jsonlogger
import logstash
# ๐ฏ Correlation ID for request tracking
correlation_id = ContextVar('correlation_id', default=None)
class MicroserviceLogger:
def __init__(self, service_name):
self.service_name = service_name
self.logger = self._setup_logger()
def _setup_logger(self):
# ๐ Create logger with JSON formatter
logger = logging.getLogger(self.service_name)
logger.setLevel(logging.INFO)
# ๐จ JSON formatter for structured logs
json_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
json_handler.setFormatter(formatter)
# ๐ Logstash handler
logstash_handler = logstash.TCPLogstashHandler(
host='logstash.internal',
port=5959
)
logger.addHandler(json_handler)
logger.addHandler(logstash_handler)
return logger
def _get_base_fields(self):
# ๐ท๏ธ Add common fields to all logs
return {
'service': self.service_name,
'correlation_id': correlation_id.get() or str(uuid.uuid4()),
'environment': 'production',
'version': '1.0.0'
}
def info(self, message, **kwargs):
# โจ Log info with context
extra = {**self._get_base_fields(), **kwargs}
self.logger.info(message, extra=extra)
def error(self, message, exception=None, **kwargs):
# ๐จ Log errors with exception details
extra = {**self._get_base_fields(), **kwargs}
if exception:
extra['exception_type'] = type(exception).__name__
extra['exception_message'] = str(exception)
self.logger.error(message, extra=extra, exc_info=exception)
def log_api_request(self, method, path, duration_ms, status_code):
# ๐ Log API metrics
self.info(
f"API Request: {method} {path}",
method=method,
path=path,
duration_ms=duration_ms,
status_code=status_code,
request_type='api'
)
def log_database_query(self, query, duration_ms, rows_affected):
# ๐๏ธ Log database operations
self.info(
"Database query executed",
query=query[:100], # ๐ Truncate long queries
duration_ms=duration_ms,
rows_affected=rows_affected,
operation_type='database'
)
# ๐ฎ Example usage in a Flask microservice
from flask import Flask, request, g
import time
app = Flask(__name__)
logger = MicroserviceLogger('user-service')
@app.before_request
def before_request():
# ๐ฏ Set correlation ID for request
request_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
correlation_id.set(request_id)
g.start_time = time.time()
logger.info(
"Request started",
method=request.method,
path=request.path,
remote_addr=request.remote_addr
)
@app.after_request
def after_request(response):
# ๐ Log request completion
duration = (time.time() - g.start_time) * 1000
logger.log_api_request(
method=request.method,
path=request.path,
duration_ms=duration,
status_code=response.status_code
)
return response
@app.route('/users/<user_id>')
def get_user(user_id):
try:
# ๐ Log business logic
logger.info(f"Fetching user ๐ค", user_id=user_id)
# ๐๏ธ Simulate database query
start = time.time()
user = fetch_user_from_db(user_id) # Your DB function
duration = (time.time() - start) * 1000
logger.log_database_query(
query=f"SELECT * FROM users WHERE id = {user_id}",
duration_ms=duration,
rows_affected=1
)
return {'user': user, 'status': 'success โ
'}
except Exception as e:
# ๐จ Log errors with full context
logger.error(
f"Failed to fetch user ๐ฅ",
exception=e,
user_id=user_id
)
return {'error': 'User not found'}, 404
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Custom Log Enrichment
When youโre ready to level up, try this advanced pattern:
# ๐ฏ Advanced log enrichment with context
import psutil
import platform
from functools import wraps
class EnrichedLogger:
def __init__(self, service_name):
self.service_name = service_name
self.logger = self._setup_logger()
self.enrichers = []
def add_enricher(self, enricher_func):
# โ Add custom enrichment functions
self.enrichers.append(enricher_func)
def _enrich_log_data(self, data):
# โจ Apply all enrichers
enriched = data.copy()
# ๐ System metrics
enriched.update({
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'hostname': platform.node(),
'python_version': platform.python_version()
})
# ๐จ Apply custom enrichers
for enricher in self.enrichers:
enriched.update(enricher())
return enriched
def log_with_timing(self, func):
# โฑ๏ธ Decorator for automatic timing
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = None
error = None
try:
result = func(*args, **kwargs)
return result
except Exception as e:
error = e
raise
finally:
duration = (time.time() - start) * 1000
log_data = {
'function': func.__name__,
'duration_ms': duration,
'success': error is None
}
if error:
self.error(
f"Function {func.__name__} failed ๐ฅ",
exception=error,
**log_data
)
else:
self.info(
f"Function {func.__name__} completed โ
",
**log_data
)
return wrapper
# ๐ช Using the enriched logger
logger = EnrichedLogger('analytics-service')
# ๐จ Add custom enrichers
def user_context_enricher():
# ๐ค Add user context to logs
return {
'user_tier': get_current_user_tier(),
'feature_flags': get_active_feature_flags()
}
logger.add_enricher(user_context_enricher)
# โฑ๏ธ Use timing decorator
@logger.log_with_timing
def process_analytics_batch(batch_id):
# ๐ Process analytics data
logger.info(f"Processing batch ๐ฆ", batch_id=batch_id)
# ... processing logic ...
return "processed"
๐๏ธ Advanced Topic 2: Log Pipeline with Filters
For production-ready logging:
# ๐ Production log pipeline
import re
from typing import Dict, Any
class LogPipeline:
def __init__(self):
self.filters = []
self.transformers = []
self.destinations = []
def add_filter(self, filter_func):
# ๐ Add log filters
self.filters.append(filter_func)
def add_transformer(self, transformer_func):
# ๐ Add log transformers
self.transformers.append(transformer_func)
def add_destination(self, destination):
# ๐ Add log destinations
self.destinations.append(destination)
def process_log(self, log_data: Dict[str, Any]):
# ๐ฏ Process log through pipeline
# 1๏ธโฃ Apply filters
for filter_func in self.filters:
if not filter_func(log_data):
return # ๐ซ Log filtered out
# 2๏ธโฃ Apply transformations
transformed = log_data
for transformer in self.transformers:
transformed = transformer(transformed)
# 3๏ธโฃ Send to destinations
for destination in self.destinations:
destination.send(transformed)
# ๐ก๏ธ Security filter
def security_filter(log_data):
# ๐ Remove sensitive data
sensitive_patterns = [
r'password=\S+',
r'api_key=\S+',
r'token=\S+',
r'\b\d{16}\b' # Credit card numbers
]
message = log_data.get('message', '')
for pattern in sensitive_patterns:
message = re.sub(pattern, '[REDACTED]', message)
log_data['message'] = message
return True
# ๐ Metrics transformer
def metrics_transformer(log_data):
# ๐ Add performance metrics
if 'duration_ms' in log_data:
log_data['performance_category'] = (
'fast' if log_data['duration_ms'] < 100
else 'normal' if log_data['duration_ms'] < 1000
else 'slow'
)
return log_data
# ๐ฎ Set up pipeline
pipeline = LogPipeline()
pipeline.add_filter(security_filter)
pipeline.add_transformer(metrics_transformer)
pipeline.add_destination(ElasticsearchDestination())
pipeline.add_destination(S3BackupDestination())
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Logging Sensitive Data
# โ Wrong way - logging passwords!
logger.info(f"User login attempt",
username=username,
password=password) # ๐ฐ Never log passwords!
# โ
Correct way - log safely!
logger.info(f"User login attempt",
username=username,
success=True) # ๐ก๏ธ Log result, not credentials!
๐คฏ Pitfall 2: Blocking on Log Writes
# โ Dangerous - blocking I/O!
class BlockingLogger:
def log(self, message):
# ๐ฅ This blocks the entire application!
response = requests.post('http://logging-server',
json={'message': message})
# โ
Safe - async logging!
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncLogger:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=5)
self.queue = asyncio.Queue()
async def log(self, message):
# โจ Non-blocking logging
await self.queue.put(message)
async def _process_logs(self):
# ๐ Background log processing
while True:
message = await self.queue.get()
await asyncio.get_event_loop().run_in_executor(
self.executor,
self._send_log,
message
)
๐ ๏ธ Best Practices
- ๐ฏ Structure Your Logs: Use consistent field names across services
- ๐ Log at the Right Level: INFO for business events, ERROR for failures
- ๐ก๏ธ Never Log Sensitive Data: Passwords, tokens, PII must be filtered
- ๐จ Use Correlation IDs: Track requests across microservices
- โจ Keep Logs Actionable: Include context for debugging
- ๐ Set Up Retention Policies: Donโt keep logs forever
- ๐ Use Bulk Operations: Send logs in batches for performance
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Complete Logging System
Create a production-ready logging system with these features:
๐ Requirements:
- โ Send logs to both Elasticsearch and file backup
- ๐ท๏ธ Add request correlation across services
- ๐ค Include user context in all logs
- ๐ Implement log rotation and retention
- ๐จ Create custom Kibana dashboards
๐ Bonus Points:
- Add anomaly detection for error spikes
- Implement log sampling for high-traffic endpoints
- Create alerts for critical errors
๐ก Solution
๐ Click to see solution
# ๐ฏ Complete ELK logging solution!
import logging
import json
import os
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch
from logging.handlers import RotatingFileHandler
import logstash
from contextlib import contextmanager
import threading
class ProductionLogger:
def __init__(self, service_name, environment='production'):
self.service_name = service_name
self.environment = environment
self.es = Elasticsearch(['localhost:9200'])
self.logger = self._setup_logger()
self._local = threading.local()
def _setup_logger(self):
# ๐ Create main logger
logger = logging.getLogger(self.service_name)
logger.setLevel(logging.INFO)
# 1๏ธโฃ Elasticsearch handler
es_handler = ElasticsearchHandler(self.es, self.service_name)
es_handler.setLevel(logging.INFO)
# 2๏ธโฃ File backup handler
file_handler = RotatingFileHandler(
f'logs/{self.service_name}.log',
maxBytes=100*1024*1024, # 100MB
backupCount=10
)
file_handler.setLevel(logging.WARNING)
# 3๏ธโฃ Logstash handler
logstash_handler = logstash.TCPLogstashHandler(
host='localhost',
port=5959
)
# ๐จ Add all handlers
logger.addHandler(es_handler)
logger.addHandler(file_handler)
logger.addHandler(logstash_handler)
return logger
@contextmanager
def correlation_context(self, correlation_id):
# ๐ท๏ธ Set correlation ID for request
old_id = getattr(self._local, 'correlation_id', None)
self._local.correlation_id = correlation_id
try:
yield
finally:
self._local.correlation_id = old_id
def _get_context(self):
# ๐ Get current context
return {
'service': self.service_name,
'environment': self.environment,
'correlation_id': getattr(self._local, 'correlation_id', None),
'timestamp': datetime.utcnow().isoformat(),
'host': os.environ.get('HOSTNAME', 'unknown')
}
def info(self, message, **kwargs):
# โ
Log info with context
context = {**self._get_context(), **kwargs}
self.logger.info(message, extra={'context': context})
def error(self, message, exception=None, **kwargs):
# ๐จ Log error with alert
context = {**self._get_context(), **kwargs}
if exception:
context['exception'] = {
'type': type(exception).__name__,
'message': str(exception)
}
self.logger.error(message, extra={'context': context})
# ๐ Send alert for critical errors
if context.get('critical', False):
self._send_alert(message, context)
def _send_alert(self, message, context):
# ๐จ Send alerts (implement your alerting)
alert_data = {
'service': self.service_name,
'message': message,
'context': context,
'alert_time': datetime.utcnow().isoformat()
}
# Send to alerting service
def create_dashboard(self):
# ๐ Create Kibana dashboard config
dashboard_config = {
"version": "7.10.0",
"objects": [
{
"attributes": {
"title": f"{self.service_name} Dashboard",
"type": "dashboard",
"description": f"Monitoring dashboard for {self.service_name}"
},
"references": []
}
]
}
# Save to Kibana
return dashboard_config
# ๐ฎ Example usage
logger = ProductionLogger('payment-service')
# ๐ท๏ธ Use correlation context
with logger.correlation_context('req-123-456'):
logger.info("Processing payment ๐ณ",
user_id='user_789',
amount=99.99,
currency='USD')
try:
# Process payment...
logger.info("Payment successful โ
",
transaction_id='txn_abc123')
except Exception as e:
logger.error("Payment failed ๐ฅ",
exception=e,
critical=True)
# ๐ Set up log retention
def cleanup_old_logs():
# ๐๏ธ Delete logs older than 30 days
cutoff_date = datetime.utcnow() - timedelta(days=30)
logger.es.delete_by_query(
index=f"{logger.service_name}-*",
body={
"query": {
"range": {
"timestamp": {
"lt": cutoff_date.isoformat()
}
}
}
}
)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Set up ELK Stack for Python applications ๐ช
- โ Send structured logs to Elasticsearch and Logstash ๐ก๏ธ
- โ Create powerful dashboards in Kibana ๐ฏ
- โ Implement correlation IDs for distributed tracing ๐
- โ Build production-ready logging pipelines! ๐
Remember: Good logging is like having a time machine for debugging - it lets you see exactly what happened! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered ELK Stack logging with Python!
Hereโs what to do next:
- ๐ป Set up a local ELK Stack using Docker Compose
- ๐๏ธ Implement logging in your current project
- ๐ Move on to our next tutorial: [Monitoring: Prometheus and Grafana]
- ๐ Create custom Kibana dashboards for your apps!
Remember: Every debugging session becomes easier with good logs. Keep logging, keep learning, and most importantly, have fun! ๐
Happy coding! ๐๐โจ