Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the world of security in Python! ๐ Ever wondered how websites know who you are when you log in? Or how they decide if youโre allowed to delete a post or not? Thatโs the magic of authentication and authorization! ๐
Think of it like a VIP nightclub ๐ช:
- Authentication is the bouncer checking your ID at the door (โAre you who you say you are?โ) ๐
- Authorization is checking if you have a VIP pass for certain areas (โAre you allowed to be here?โ) ๐ซ
Today, weโll master these essential security concepts to build secure Python applications that keep the bad guys out! ๐ช
๐ Understanding Authentication and Authorization
Letโs break down these two crucial concepts that often get confused:
Authentication (Who are you?) ๐ค
Authentication is verifying someoneโs identity. Itโs like when you enter your password to unlock your phone ๐ฑ - youโre proving youโre the rightful owner!
Common authentication methods:
- Passwords - Something you know ๐
- Biometrics - Something you are (fingerprint, face) ๐
- Tokens - Something you have (access card, phone) ๐ฒ
Authorization (What can you do?) ๐ฏ
Authorization determines what resources a user can access. Itโs like having different keys for different rooms in a building ๐ข - your key might open your office but not the server room!
Common authorization patterns:
- Role-Based Access Control (RBAC) - Access based on job roles ๐
- Permission-Based - Specific permissions for actions โ
- Attribute-Based - Access based on attributes (time, location) ๐
๐ง Basic Syntax and Usage
Letโs start with a simple authentication system:
import hashlib
import secrets
import time
class SimpleAuth:
def __init__(self):
self.users = {} # ๐ Our user database
self.sessions = {} # ๐ซ Active sessions
def hash_password(self, password, salt=None):
"""Hash a password with salt ๐ง"""
if salt is None:
salt = secrets.token_hex(16) # ๐ฒ Generate random salt
# ๐ Create secure hash
hash_obj = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100_000 # ๐ช 100k iterations for security
)
return salt + ':' + hash_obj.hex()
def register(self, username, password):
"""Register a new user ๐"""
if username in self.users:
return False, "User already exists! ๐"
# ๐ Store hashed password
self.users[username] = {
'password': self.hash_password(password),
'role': 'user' # ๐ค Default role
}
return True, "Registration successful! ๐"
def login(self, username, password):
"""Authenticate a user ๐"""
if username not in self.users:
return None, "Invalid credentials! โ"
stored = self.users[username]['password']
salt = stored.split(':')[0]
# ๐ Verify password
if self.hash_password(password, salt) == stored:
# ๐ซ Create session token
token = secrets.token_urlsafe(32)
self.sessions[token] = {
'username': username,
'expires': time.time() + 3600 # โฐ 1 hour
}
return token, "Login successful! ๐"
return None, "Invalid credentials! โ"
# ๐ฎ Let's try it out!
auth = SimpleAuth()
# Register users
print(auth.register("alice", "super_secret123")) # โ
print(auth.register("bob", "password456")) # โ
# Login attempts
token, msg = auth.login("alice", "super_secret123")
print(f"Alice: {msg}") # ๐ Success!
token, msg = auth.login("alice", "wrong_password")
print(f"Alice (wrong): {msg}") # โ Failed!
๐ก Practical Examples
Example 1: Role-Based Access Control (RBAC) System ๐ข
Letโs build a company system where employees have different access levels:
from functools import wraps
from datetime import datetime, timedelta
class CompanyPortal:
def __init__(self):
self.users = {}
self.sessions = {}
self.roles = {
'admin': ['read', 'write', 'delete', 'manage_users'], # ๐
'manager': ['read', 'write', 'delete'], # ๐ผ
'employee': ['read', 'write'], # ๐
'intern': ['read'] # ๐
}
def add_user(self, username, password, role='employee'):
"""Add a new user with a role ๐ค"""
salt = secrets.token_hex(16)
hashed = hashlib.sha256(
(password + salt).encode()
).hexdigest()
self.users[username] = {
'password_hash': hashed,
'salt': salt,
'role': role,
'created': datetime.now()
}
print(f"โ
User {username} added as {role}!")
def authenticate(self, username, password):
"""Verify user credentials ๐"""
if username not in self.users:
return None
user = self.users[username]
hashed = hashlib.sha256(
(password + user['salt']).encode()
).hexdigest()
if hashed == user['password_hash']:
# ๐ซ Create session
session_id = secrets.token_urlsafe(32)
self.sessions[session_id] = {
'username': username,
'role': user['role'],
'expires': datetime.now() + timedelta(hours=8)
}
return session_id
return None
def check_permission(self, session_id, permission):
"""Check if user has permission ๐"""
if session_id not in self.sessions:
return False
session = self.sessions[session_id]
if datetime.now() > session['expires']:
del self.sessions[session_id] # ๐๏ธ Clean expired
return False
user_role = session['role']
return permission in self.roles.get(user_role, [])
# ๐จ Decorator for protecting functions
def require_permission(self, permission):
def decorator(func):
@wraps(func)
def wrapper(session_id, *args, **kwargs):
if not self.check_permission(session_id, permission):
return f"โ Access denied! You need '{permission}' permission."
return func(session_id, *args, **kwargs)
return wrapper
return decorator
# ๐๏ธ Create our portal
portal = CompanyPortal()
# Add users with different roles
portal.add_user("alice", "admin123", "admin") # ๐ Admin
portal.add_user("bob", "manager456", "manager") # ๐ผ Manager
portal.add_user("charlie", "emp789", "employee") # ๐ Employee
portal.add_user("diana", "intern321", "intern") # ๐ Intern
# ๐ Protected functions
@portal.require_permission('read')
def view_documents(session_id):
return "๐ Here are the documents!"
@portal.require_permission('write')
def edit_document(session_id, doc_name):
return f"โ๏ธ Editing {doc_name}..."
@portal.require_permission('delete')
def delete_document(session_id, doc_name):
return f"๐๏ธ Deleted {doc_name}!"
@portal.require_permission('manage_users')
def manage_users(session_id):
return "๐ฅ User management panel"
# ๐ฎ Test different access levels
alice_session = portal.authenticate("alice", "admin123")
print(f"Alice can manage users: {manage_users(alice_session)}") # โ
diana_session = portal.authenticate("diana", "intern321")
print(f"Diana tries to delete: {delete_document(diana_session, 'report.pdf')}") # โ
Example 2: JWT-Based Authentication ๐ซ
Modern web apps often use JSON Web Tokens (JWT) for stateless authentication:
import jwt
import json
from datetime import datetime, timedelta, timezone
class JWTAuthenticator:
def __init__(self, secret_key='super_secret_key_๐'):
self.secret_key = secret_key
self.users = {}
def register_user(self, username, password, email):
"""Register a new user ๐"""
if username in self.users:
return {"error": "User exists! ๐"}
# ๐ง Salt and hash password
salt = secrets.token_hex(16)
password_hash = hashlib.sha256(
(password + salt).encode()
).hexdigest()
self.users[username] = {
'password_hash': password_hash,
'salt': salt,
'email': email,
'created': datetime.now(timezone.utc).isoformat(),
'permissions': ['read'] # ๐ Default permissions
}
return {"message": f"Welcome {username}! ๐"}
def create_token(self, username):
"""Create a JWT token ๐ซ"""
payload = {
'username': username,
'email': self.users[username]['email'],
'permissions': self.users[username]['permissions'],
'exp': datetime.now(timezone.utc) + timedelta(hours=24),
'iat': datetime.now(timezone.utc)
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
return token
def verify_token(self, token):
"""Verify and decode JWT token ๐"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=['HS256']
)
return {'valid': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'valid': False, 'error': 'Token expired! โฐ'}
except jwt.InvalidTokenError:
return {'valid': False, 'error': 'Invalid token! โ'}
def login(self, username, password):
"""Authenticate and return JWT ๐"""
if username not in self.users:
return {"error": "Invalid credentials! โ"}
user = self.users[username]
password_hash = hashlib.sha256(
(password + user['salt']).encode()
).hexdigest()
if password_hash == user['password_hash']:
token = self.create_token(username)
return {
"message": "Login successful! ๐",
"token": token
}
return {"error": "Invalid credentials! โ"}
def authorize(self, token, required_permission):
"""Check if token has permission ๐ฏ"""
result = self.verify_token(token)
if not result['valid']:
return False, result['error']
user_permissions = result['data'].get('permissions', [])
if required_permission in user_permissions:
return True, "Access granted! โ
"
return False, f"Missing permission: {required_permission} ๐ซ"
# ๐ฎ Let's test JWT authentication!
jwt_auth = JWTAuthenticator()
# Register users
print(jwt_auth.register_user("alice", "secret123", "[email protected]"))
print(jwt_auth.register_user("bob", "password456", "[email protected]"))
# Login and get tokens
alice_result = jwt_auth.login("alice", "secret123")
print(f"Alice login: {alice_result}")
if 'token' in alice_result:
# ๐ Verify token
token_info = jwt_auth.verify_token(alice_result['token'])
print(f"Token valid: {token_info['valid']}")
print(f"User data: {token_info['data']['username']} ๐ค")
# ๐ฏ Check authorization
can_read, msg = jwt_auth.authorize(alice_result['token'], 'read')
print(f"Can read: {msg}")
can_write, msg = jwt_auth.authorize(alice_result['token'], 'write')
print(f"Can write: {msg}")
Example 3: OAuth-Style Token System ๐
Letโs build a simplified OAuth-like system for API authentication:
import uuid
from datetime import datetime, timedelta
class OAuthProvider:
def __init__(self):
self.apps = {} # ๐ฑ Registered applications
self.users = {} # ๐ฅ User accounts
self.tokens = {} # ๐ซ Access tokens
self.codes = {} # ๐ Authorization codes
def register_app(self, app_name, redirect_uri):
"""Register a new application ๐ฑ"""
client_id = str(uuid.uuid4())
client_secret = secrets.token_urlsafe(32)
self.apps[client_id] = {
'name': app_name,
'secret': client_secret,
'redirect_uri': redirect_uri,
'scopes': ['read', 'write'] # ๐ Available scopes
}
return {
'client_id': client_id,
'client_secret': client_secret,
'message': f"App '{app_name}' registered! ๐"
}
def create_user(self, username, password):
"""Create a user account ๐ค"""
salt = secrets.token_hex(16)
password_hash = hashlib.sha256(
(password + salt).encode()
).hexdigest()
self.users[username] = {
'password_hash': password_hash,
'salt': salt,
'authorized_apps': {} # ๐ Apps user has authorized
}
return f"User {username} created! ๐"
def authorize_app(self, username, password, client_id, scopes):
"""User authorizes an app ๐ค"""
# Verify user
if username not in self.users:
return {"error": "Invalid user! โ"}
user = self.users[username]
password_hash = hashlib.sha256(
(password + user['salt']).encode()
).hexdigest()
if password_hash != user['password_hash']:
return {"error": "Invalid password! โ"}
# Verify app
if client_id not in self.apps:
return {"error": "Invalid app! โ"}
# ๐ซ Generate authorization code
auth_code = secrets.token_urlsafe(32)
self.codes[auth_code] = {
'username': username,
'client_id': client_id,
'scopes': scopes,
'expires': datetime.now() + timedelta(minutes=10)
}
# ๐ Record authorization
self.users[username]['authorized_apps'][client_id] = scopes
return {
'code': auth_code,
'message': f"App authorized! ๐ฏ"
}
def exchange_code(self, auth_code, client_id, client_secret):
"""Exchange auth code for access token ๐"""
# Verify code exists and not expired
if auth_code not in self.codes:
return {"error": "Invalid code! โ"}
code_data = self.codes[auth_code]
if datetime.now() > code_data['expires']:
del self.codes[auth_code]
return {"error": "Code expired! โฐ"}
# Verify client credentials
if (code_data['client_id'] != client_id or
self.apps[client_id]['secret'] != client_secret):
return {"error": "Invalid client credentials! โ"}
# ๐ซ Generate access token
access_token = secrets.token_urlsafe(32)
self.tokens[access_token] = {
'username': code_data['username'],
'client_id': client_id,
'scopes': code_data['scopes'],
'expires': datetime.now() + timedelta(hours=1)
}
# ๐๏ธ Delete used code
del self.codes[auth_code]
return {
'access_token': access_token,
'token_type': 'Bearer',
'expires_in': 3600,
'message': 'Token issued! ๐'
}
def validate_token(self, token, required_scope=None):
"""Validate access token ๐"""
if token not in self.tokens:
return False, "Invalid token! โ"
token_data = self.tokens[token]
if datetime.now() > token_data['expires']:
del self.tokens[token]
return False, "Token expired! โฐ"
if required_scope and required_scope not in token_data['scopes']:
return False, f"Missing scope: {required_scope} ๐ซ"
return True, token_data
# ๐ฎ OAuth flow demonstration
oauth = OAuthProvider()
# 1๏ธโฃ Register an app
app_info = oauth.register_app("CoolApp", "https://coolapp.com/callback")
print(f"App registered: {app_info}")
# 2๏ธโฃ Create users
print(oauth.create_user("alice", "secure123"))
print(oauth.create_user("bob", "password456"))
# 3๏ธโฃ User authorizes app
auth_result = oauth.authorize_app(
"alice",
"secure123",
app_info['client_id'],
['read', 'write']
)
print(f"Authorization: {auth_result}")
# 4๏ธโฃ App exchanges code for token
if 'code' in auth_result:
token_result = oauth.exchange_code(
auth_result['code'],
app_info['client_id'],
app_info['client_secret']
)
print(f"Token exchange: {token_result}")
# 5๏ธโฃ Use the token
if 'access_token' in token_result:
valid, data = oauth.validate_token(
token_result['access_token'],
'read'
)
if valid:
print(f"โ
Token valid for user: {data['username']}")
print(f"๐ Scopes: {data['scopes']}")
๐ Advanced Concepts
Multi-Factor Authentication (MFA) ๐
Letโs implement a complete MFA system:
import pyotp # pip install pyotp
import qrcode # pip install qrcode
from io import BytesIO
class MFASystem:
def __init__(self):
self.users = {}
self.backup_codes = {}
def setup_mfa(self, username):
"""Setup MFA for a user ๐ฑ"""
# Generate secret key
secret = pyotp.random_base32()
# Store secret
if username not in self.users:
self.users[username] = {}
self.users[username]['mfa_secret'] = secret
self.users[username]['mfa_enabled'] = False
# Generate backup codes
backup = [secrets.token_hex(4) for _ in range(10)]
self.backup_codes[username] = backup.copy()
# Create TOTP URI for QR code
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=username,
issuer_name='SecureApp'
)
return {
'secret': secret,
'uri': totp_uri,
'backup_codes': backup,
'message': 'MFA setup ready! ๐ฒ'
}
def enable_mfa(self, username, token):
"""Verify and enable MFA ๐"""
if username not in self.users:
return False, "User not found! โ"
secret = self.users[username]['mfa_secret']
totp = pyotp.TOTP(secret)
if totp.verify(token, valid_window=1):
self.users[username]['mfa_enabled'] = True
return True, "MFA enabled successfully! ๐"
return False, "Invalid token! โ"
def verify_mfa(self, username, token):
"""Verify MFA token ๐"""
if username not in self.users:
return False, "User not found! โ"
if not self.users[username].get('mfa_enabled', False):
return False, "MFA not enabled! ๐ซ"
# Check if it's a backup code
if username in self.backup_codes:
if token in self.backup_codes[username]:
self.backup_codes[username].remove(token)
return True, "Backup code used! ๐ซ"
# Verify TOTP
secret = self.users[username]['mfa_secret']
totp = pyotp.TOTP(secret)
if totp.verify(token, valid_window=1):
return True, "MFA verified! โ
"
return False, "Invalid token! โ"
# ๐ฎ Test MFA system
mfa = MFASystem()
# Setup MFA for Alice
setup_result = mfa.setup_mfa("alice")
print(f"MFA Setup: {setup_result['message']}")
print(f"Secret: {setup_result['secret']}")
print(f"Backup codes: {setup_result['backup_codes'][:3]}... (showing first 3)")
# Generate current token (in real app, user would use authenticator app)
totp = pyotp.TOTP(setup_result['secret'])
current_token = totp.now()
print(f"Current token: {current_token}")
# Enable MFA
enabled, msg = mfa.enable_mfa("alice", current_token)
print(f"Enable MFA: {msg}")
# Verify with MFA
time.sleep(2) # Wait to ensure we get a different token
new_token = totp.now()
verified, msg = mfa.verify_mfa("alice", new_token)
print(f"Verify MFA: {msg}")
API Key Management System ๐
Build a complete API key management system:
class APIKeyManager:
def __init__(self):
self.api_keys = {}
self.rate_limits = {}
self.usage_stats = {}
def generate_api_key(self, user_id, name, permissions=None):
"""Generate a new API key ๐ฏ"""
# Create unique key
key_id = str(uuid.uuid4())
api_key = f"sk_{secrets.token_urlsafe(32)}"
# Store key data
self.api_keys[api_key] = {
'key_id': key_id,
'user_id': user_id,
'name': name,
'permissions': permissions or ['read'],
'created': datetime.now(),
'last_used': None,
'active': True,
'rate_limit': 1000 # ๐ฆ Requests per hour
}
# Initialize usage stats
self.usage_stats[api_key] = {
'total_requests': 0,
'hourly_requests': [],
'endpoints': {}
}
return {
'api_key': api_key,
'key_id': key_id,
'message': f"API key '{name}' created! ๐"
}
def validate_request(self, api_key, endpoint, permission='read'):
"""Validate API request ๐"""
# Check if key exists
if api_key not in self.api_keys:
return False, "Invalid API key! โ", None
key_data = self.api_keys[api_key]
# Check if active
if not key_data['active']:
return False, "API key is inactive! ๐ซ", None
# Check permissions
if permission not in key_data['permissions']:
return False, f"Missing permission: {permission} ๐ซ", None
# Check rate limit
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
hourly_count = self._get_hourly_count(api_key, current_hour)
if hourly_count >= key_data['rate_limit']:
return False, "Rate limit exceeded! ๐ฆ", None
# Update usage
self._update_usage(api_key, endpoint)
return True, "Request authorized! โ
", key_data['user_id']
def _get_hourly_count(self, api_key, hour):
"""Get request count for current hour ๐"""
stats = self.usage_stats[api_key]
hourly = stats['hourly_requests']
# Clean old entries
hourly[:] = [
(h, count) for h, count in hourly
if h >= hour - timedelta(hours=1)
]
# Count current hour
return sum(count for h, count in hourly if h == hour)
def _update_usage(self, api_key, endpoint):
"""Update usage statistics ๐"""
stats = self.usage_stats[api_key]
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
# Update total
stats['total_requests'] += 1
# Update hourly
hourly = stats['hourly_requests']
if hourly and hourly[-1][0] == current_hour:
hourly[-1] = (current_hour, hourly[-1][1] + 1)
else:
hourly.append((current_hour, 1))
# Update endpoints
stats['endpoints'][endpoint] = stats['endpoints'].get(endpoint, 0) + 1
# Update last used
self.api_keys[api_key]['last_used'] = datetime.now()
def get_usage_report(self, api_key):
"""Get usage report for API key ๐"""
if api_key not in self.api_keys:
return None
key_data = self.api_keys[api_key]
stats = self.usage_stats[api_key]
return {
'key_name': key_data['name'],
'total_requests': stats['total_requests'],
'rate_limit': key_data['rate_limit'],
'top_endpoints': sorted(
stats['endpoints'].items(),
key=lambda x: x[1],
reverse=True
)[:5],
'last_used': key_data['last_used'],
'status': 'Active โ
' if key_data['active'] else 'Inactive ๐ซ'
}
# ๐ฎ Test API key system
api_mgr = APIKeyManager()
# Generate API keys
key1 = api_mgr.generate_api_key("user123", "Production Key", ['read', 'write'])
key2 = api_mgr.generate_api_key("user123", "Testing Key", ['read'])
print(f"Key 1: {key1}")
# Simulate API requests
api_key = key1['api_key']
# Valid requests
for i in range(5):
valid, msg, user = api_mgr.validate_request(api_key, '/api/users', 'read')
print(f"Request {i+1}: {msg}")
# Try write permission
valid, msg, user = api_mgr.validate_request(api_key, '/api/users/create', 'write')
print(f"Write request: {msg}")
# Get usage report
report = api_mgr.get_usage_report(api_key)
print(f"\n๐ Usage Report:")
print(f"Total requests: {report['total_requests']}")
print(f"Top endpoints: {report['top_endpoints']}")
โ ๏ธ Common Pitfalls and Solutions
1. Storing Passwords in Plain Text โ
# โ NEVER DO THIS!
def bad_store_password(username, password):
users[username] = password # ๐ฑ Plain text!
# โ
Always hash passwords
def good_store_password(username, password):
salt = secrets.token_hex(16)
hashed = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100_000
)
users[username] = {
'salt': salt,
'hash': hashed.hex()
}
2. Weak Session Management ๐ช
# โ Predictable session IDs
session_id = f"session_{user_id}_{timestamp}" # ๐ฑ Guessable!
# โ
Use cryptographically secure tokens
session_id = secrets.token_urlsafe(32) # ๐ Secure!
3. Not Validating Token Expiration โฐ
# โ Tokens that never expire
def validate_token(token):
return token in valid_tokens # ๐ฑ No expiration check!
# โ
Always check expiration
def validate_token(token):
if token not in valid_tokens:
return False
token_data = valid_tokens[token]
if datetime.now() > token_data['expires']:
del valid_tokens[token] # ๐๏ธ Clean up
return False
return True
4. Timing Attacks in Authentication ๐
# โ Early return reveals information
def authenticate(username, password):
if username not in users:
return False # ๐ฑ Returns faster for invalid users!
return check_password(users[username], password)
# โ
Constant-time comparison
def authenticate(username, password):
# Always do the same work
dummy_hash = "$2b$12$dummy.hash.for.timing.attack.prevention"
if username in users:
stored_hash = users[username]['password']
else:
stored_hash = dummy_hash # ๐ก๏ธ Same computation time
# Always verify (even with dummy)
result = verify_password(password, stored_hash)
# Only return true if user exists AND password matches
return username in users and result
๐ ๏ธ Best Practices
1. Use Established Libraries ๐
# โ
Use well-tested libraries
from passlib.hash import bcrypt # Password hashing
import jwt # JSON Web Tokens
from cryptography.fernet import Fernet # Encryption
# Example with bcrypt
password_hash = bcrypt.hash("user_password")
is_valid = bcrypt.verify("user_password", password_hash)
2. Implement Proper Logging ๐
import logging
class SecureAuthLogger:
def __init__(self):
self.logger = logging.getLogger('auth')
def log_auth_attempt(self, username, success, ip_address):
"""Log authentication attempts ๐"""
if success:
self.logger.info(
f"โ
Successful login: {username} from {ip_address}"
)
else:
self.logger.warning(
f"โ Failed login attempt: {username} from {ip_address}"
)
def log_permission_denied(self, user, resource, permission):
"""Log authorization failures ๐ซ"""
self.logger.warning(
f"๐ซ Permission denied: {user} tried to {permission} on {resource}"
)
3. Implement Account Lockout ๐
class AccountLockout:
def __init__(self, max_attempts=5, lockout_duration=300):
self.max_attempts = max_attempts
self.lockout_duration = lockout_duration # seconds
self.attempts = {}
def check_lockout(self, username):
"""Check if account is locked ๐"""
if username not in self.attempts:
return False, 0
data = self.attempts[username]
if data['locked_until'] and datetime.now() < data['locked_until']:
remaining = (data['locked_until'] - datetime.now()).seconds
return True, remaining
return False, 0
def record_attempt(self, username, success):
"""Record login attempt ๐"""
if username not in self.attempts:
self.attempts[username] = {
'count': 0,
'locked_until': None
}
if success:
# Reset on success
self.attempts[username] = {
'count': 0,
'locked_until': None
}
else:
# Increment failed attempts
self.attempts[username]['count'] += 1
if self.attempts[username]['count'] >= self.max_attempts:
self.attempts[username]['locked_until'] = (
datetime.now() + timedelta(seconds=self.lockout_duration)
)
4. Secure Token Storage ๐๏ธ
class SecureTokenStore:
def __init__(self, encryption_key=None):
if encryption_key is None:
encryption_key = Fernet.generate_key()
self.cipher = Fernet(encryption_key)
self.tokens = {}
def store_token(self, token_id, sensitive_data):
"""Store encrypted token data ๐"""
# Encrypt sensitive data
encrypted = self.cipher.encrypt(
json.dumps(sensitive_data).encode()
)
self.tokens[token_id] = {
'encrypted_data': encrypted,
'created': datetime.now(),
'expires': datetime.now() + timedelta(hours=1)
}
def retrieve_token(self, token_id):
"""Retrieve and decrypt token data ๐"""
if token_id not in self.tokens:
return None
token_data = self.tokens[token_id]
# Check expiration
if datetime.now() > token_data['expires']:
del self.tokens[token_id]
return None
# Decrypt data
decrypted = self.cipher.decrypt(token_data['encrypted_data'])
return json.loads(decrypted.decode())
๐งช Hands-On Exercise
Build a secure blog API with authentication and authorization! ๐
Requirements:
- User registration and login with secure passwords ๐
- JWT tokens for API authentication ๐ซ
- Role-based access (admin, author, reader) ๐ฅ
- Permissions: readers can read, authors can write, admins can delete ๐
- Rate limiting per API key ๐ฆ
๐ก Click here for the solution
import jwt
import hashlib
import secrets
from datetime import datetime, timedelta
from functools import wraps
class BlogAPI:
def __init__(self):
self.users = {}
self.posts = {}
self.tokens = {}
self.secret_key = "blog_secret_key_๐"
self.rate_limits = {}
# ๐ค User Management
def register(self, username, password, email, role='reader'):
"""Register a new user ๐"""
if username in self.users:
return {"error": "Username taken! ๐"}
# Hash password
salt = secrets.token_hex(16)
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100_000
)
self.users[username] = {
'password_hash': password_hash.hex(),
'salt': salt,
'email': email,
'role': role,
'created': datetime.now()
}
return {"message": f"Welcome {username}! ๐", "role": role}
def login(self, username, password):
"""Authenticate user and return JWT ๐"""
if username not in self.users:
return {"error": "Invalid credentials! โ"}
user = self.users[username]
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
user['salt'].encode('utf-8'),
100_000
).hex()
if password_hash != user['password_hash']:
return {"error": "Invalid credentials! โ"}
# Generate JWT
payload = {
'username': username,
'role': user['role'],
'exp': datetime.utcnow() + timedelta(hours=24),
'iat': datetime.utcnow()
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
return {
"message": "Login successful! ๐",
"token": token,
"role": user['role']
}
# ๐ Authorization Decorator
def require_role(self, allowed_roles):
def decorator(func):
@wraps(func)
def wrapper(token, *args, **kwargs):
# Verify token
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=['HS256']
)
except jwt.ExpiredSignatureError:
return {"error": "Token expired! โฐ"}
except jwt.InvalidTokenError:
return {"error": "Invalid token! โ"}
# Check role
user_role = payload.get('role')
if user_role not in allowed_roles:
return {"error": f"Access denied! Need role: {allowed_roles} ๐ซ"}
# Check rate limit
username = payload.get('username')
if not self._check_rate_limit(username):
return {"error": "Rate limit exceeded! ๐ฆ"}
# Call original function with user info
return func(payload, *args, **kwargs)
return wrapper
return decorator
def _check_rate_limit(self, username):
"""Check rate limit (100 requests per hour) ๐ฆ"""
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
if username not in self.rate_limits:
self.rate_limits[username] = {}
user_limits = self.rate_limits[username]
if current_hour not in user_limits:
user_limits[current_hour] = 0
user_limits[current_hour] += 1
# Clean old entries
for hour in list(user_limits.keys()):
if hour < current_hour - timedelta(hours=1):
del user_limits[hour]
return user_limits[current_hour] <= 100
# ๐ Blog Operations
@require_role(['reader', 'author', 'admin'])
def get_posts(self, user_info):
"""Get all posts ๐"""
return {
"posts": list(self.posts.values()),
"count": len(self.posts)
}
@require_role(['author', 'admin'])
def create_post(self, user_info, title, content):
"""Create a new post โ๏ธ"""
post_id = str(len(self.posts) + 1)
self.posts[post_id] = {
'id': post_id,
'title': title,
'content': content,
'author': user_info['username'],
'created': datetime.now().isoformat(),
'updated': datetime.now().isoformat()
}
return {
"message": "Post created! ๐",
"post_id": post_id
}
@require_role(['author', 'admin'])
def update_post(self, user_info, post_id, title=None, content=None):
"""Update a post โ๏ธ"""
if post_id not in self.posts:
return {"error": "Post not found! ๐"}
post = self.posts[post_id]
# Authors can only edit their own posts
if (user_info['role'] == 'author' and
post['author'] != user_info['username']):
return {"error": "You can only edit your own posts! ๐ซ"}
if title:
post['title'] = title
if content:
post['content'] = content
post['updated'] = datetime.now().isoformat()
return {"message": "Post updated! โจ"}
@require_role(['admin'])
def delete_post(self, user_info, post_id):
"""Delete a post ๐๏ธ"""
if post_id not in self.posts:
return {"error": "Post not found! ๐"}
del self.posts[post_id]
return {"message": "Post deleted! ๐๏ธ"}
# ๐ฎ Test the Blog API
blog = BlogAPI()
# Register users
print("=== User Registration ===")
print(blog.register("alice", "admin123", "[email protected]", "admin"))
print(blog.register("bob", "author456", "[email protected]", "author"))
print(blog.register("charlie", "reader789", "[email protected]", "reader"))
# Login
print("\n=== User Login ===")
alice_login = blog.login("alice", "admin123")
bob_login = blog.login("bob", "author456")
charlie_login = blog.login("charlie", "reader789")
print(f"Alice (admin): {alice_login['message']}")
print(f"Bob (author): {bob_login['message']}")
print(f"Charlie (reader): {charlie_login['message']}")
# Test operations
print("\n=== Blog Operations ===")
# Charlie tries to create a post (should fail)
result = blog.create_post(
charlie_login['token'],
"My First Post",
"Hello World!"
)
print(f"Charlie create: {result}")
# Bob creates a post (should succeed)
result = blog.create_post(
bob_login['token'],
"Bob's Post",
"This is Bob's awesome post! ๐"
)
print(f"Bob create: {result}")
# Alice creates a post
result = blog.create_post(
alice_login['token'],
"Admin Announcement",
"Important news from admin! ๐ข"
)
print(f"Alice create: {result}")
# Everyone can read
result = blog.get_posts(charlie_login['token'])
print(f"\nAll posts: {len(result['posts'])} posts found")
# Bob tries to delete (should fail)
result = blog.delete_post(bob_login['token'], "1")
print(f"Bob delete: {result}")
# Alice deletes (should succeed)
result = blog.delete_post(alice_login['token'], "1")
print(f"Alice delete: {result}")
๐ Key Takeaways
Youโve mastered authentication and authorization in Python! Hereโs what you learned:
-
Authentication vs Authorization ๐
- Authentication = Who are you?
- Authorization = What can you do?
-
Password Security ๐ก๏ธ
- Always hash with salt
- Use strong algorithms (bcrypt, pbkdf2)
- Never store plain text!
-
Token Management ๐ซ
- JWTs for stateless auth
- Session tokens for stateful
- Always set expiration
-
Access Control ๐ฅ
- Role-based (RBAC)
- Permission-based
- Always validate!
-
Best Practices ๐
- Use established libraries
- Implement rate limiting
- Log security events
- Handle errors securely
๐ค Next Steps
Congratulations on mastering security fundamentals! ๐ Youโre now equipped to build secure Python applications that protect user data and control access properly.
Continue your security journey:
- Explore OAuth 2.0 and OpenID Connect ๐
- Learn about HTTPS and TLS ๐
- Study OWASP security guidelines ๐
- Practice with security challenges ๐
Remember: Security is not a feature, itโs a mindset! Always think about what could go wrong and protect against it. Keep learning, keep securing, and keep building amazing things! ๐
Happy secure coding! ๐ช