+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 190 of 343

๐Ÿ“˜ Security: Authentication and Authorization

Master security: authentication and authorization in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand the concept fundamentals ๐ŸŽฏ
  • Apply the concept in real projects ๐Ÿ—๏ธ
  • Debug common issues ๐Ÿ›
  • Write clean, Pythonic code โœจ

๐ŸŽฏ Introduction

Welcome to the world of security in Python! ๐ŸŽ‰ Ever wondered how websites know who you are when you log in? Or how they decide if youโ€™re allowed to delete a post or not? Thatโ€™s the magic of authentication and authorization! ๐Ÿ”

Think of it like a VIP nightclub ๐ŸŽช:

  • Authentication is the bouncer checking your ID at the door (โ€œAre you who you say you are?โ€) ๐Ÿ†”
  • Authorization is checking if you have a VIP pass for certain areas (โ€œAre you allowed to be here?โ€) ๐ŸŽซ

Today, weโ€™ll master these essential security concepts to build secure Python applications that keep the bad guys out! ๐Ÿ’ช

๐Ÿ“š Understanding Authentication and Authorization

Letโ€™s break down these two crucial concepts that often get confused:

Authentication (Who are you?) ๐Ÿค”

Authentication is verifying someoneโ€™s identity. Itโ€™s like when you enter your password to unlock your phone ๐Ÿ“ฑ - youโ€™re proving youโ€™re the rightful owner!

Common authentication methods:

  • Passwords - Something you know ๐Ÿ”‘
  • Biometrics - Something you are (fingerprint, face) ๐Ÿ‘†
  • Tokens - Something you have (access card, phone) ๐Ÿ“ฒ

Authorization (What can you do?) ๐ŸŽฏ

Authorization determines what resources a user can access. Itโ€™s like having different keys for different rooms in a building ๐Ÿข - your key might open your office but not the server room!

Common authorization patterns:

  • Role-Based Access Control (RBAC) - Access based on job roles ๐Ÿ‘”
  • Permission-Based - Specific permissions for actions โœ…
  • Attribute-Based - Access based on attributes (time, location) ๐Ÿ•

๐Ÿ”ง Basic Syntax and Usage

Letโ€™s start with a simple authentication system:

import hashlib
import secrets
import time

class SimpleAuth:
    def __init__(self):
        self.users = {}  # ๐Ÿ“š Our user database
        self.sessions = {}  # ๐ŸŽซ Active sessions
        
    def hash_password(self, password, salt=None):
        """Hash a password with salt ๐Ÿง‚"""
        if salt is None:
            salt = secrets.token_hex(16)  # ๐ŸŽฒ Generate random salt
        
        # ๐Ÿ” Create secure hash
        hash_obj = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100_000  # ๐Ÿ’ช 100k iterations for security
        )
        return salt + ':' + hash_obj.hex()
    
    def register(self, username, password):
        """Register a new user ๐Ÿ“"""
        if username in self.users:
            return False, "User already exists! ๐Ÿ˜•"
        
        # ๐ŸŽ‰ Store hashed password
        self.users[username] = {
            'password': self.hash_password(password),
            'role': 'user'  # ๐Ÿ‘ค Default role
        }
        return True, "Registration successful! ๐ŸŽŠ"
    
    def login(self, username, password):
        """Authenticate a user ๐Ÿ”‘"""
        if username not in self.users:
            return None, "Invalid credentials! โŒ"
        
        stored = self.users[username]['password']
        salt = stored.split(':')[0]
        
        # ๐Ÿ” Verify password
        if self.hash_password(password, salt) == stored:
            # ๐ŸŽซ Create session token
            token = secrets.token_urlsafe(32)
            self.sessions[token] = {
                'username': username,
                'expires': time.time() + 3600  # โฐ 1 hour
            }
            return token, "Login successful! ๐Ÿš€"
        
        return None, "Invalid credentials! โŒ"

# ๐ŸŽฎ Let's try it out!
auth = SimpleAuth()

# Register users
print(auth.register("alice", "super_secret123"))  # โœ…
print(auth.register("bob", "password456"))        # โœ…

# Login attempts
token, msg = auth.login("alice", "super_secret123")
print(f"Alice: {msg}")  # ๐ŸŽ‰ Success!

token, msg = auth.login("alice", "wrong_password")
print(f"Alice (wrong): {msg}")  # โŒ Failed!

๐Ÿ’ก Practical Examples

Example 1: Role-Based Access Control (RBAC) System ๐Ÿข

Letโ€™s build a company system where employees have different access levels:

from functools import wraps
from datetime import datetime, timedelta

class CompanyPortal:
    def __init__(self):
        self.users = {}
        self.sessions = {}
        self.roles = {
            'admin': ['read', 'write', 'delete', 'manage_users'],  # ๐Ÿ‘‘
            'manager': ['read', 'write', 'delete'],                # ๐Ÿ’ผ
            'employee': ['read', 'write'],                         # ๐Ÿ‘”
            'intern': ['read']                                     # ๐Ÿ“š
        }
    
    def add_user(self, username, password, role='employee'):
        """Add a new user with a role ๐Ÿ‘ค"""
        salt = secrets.token_hex(16)
        hashed = hashlib.sha256(
            (password + salt).encode()
        ).hexdigest()
        
        self.users[username] = {
            'password_hash': hashed,
            'salt': salt,
            'role': role,
            'created': datetime.now()
        }
        print(f"โœ… User {username} added as {role}!")
    
    def authenticate(self, username, password):
        """Verify user credentials ๐Ÿ”"""
        if username not in self.users:
            return None
        
        user = self.users[username]
        hashed = hashlib.sha256(
            (password + user['salt']).encode()
        ).hexdigest()
        
        if hashed == user['password_hash']:
            # ๐ŸŽซ Create session
            session_id = secrets.token_urlsafe(32)
            self.sessions[session_id] = {
                'username': username,
                'role': user['role'],
                'expires': datetime.now() + timedelta(hours=8)
            }
            return session_id
        return None
    
    def check_permission(self, session_id, permission):
        """Check if user has permission ๐Ÿ”"""
        if session_id not in self.sessions:
            return False
        
        session = self.sessions[session_id]
        if datetime.now() > session['expires']:
            del self.sessions[session_id]  # ๐Ÿ—‘๏ธ Clean expired
            return False
        
        user_role = session['role']
        return permission in self.roles.get(user_role, [])
    
    # ๐ŸŽจ Decorator for protecting functions
    def require_permission(self, permission):
        def decorator(func):
            @wraps(func)
            def wrapper(session_id, *args, **kwargs):
                if not self.check_permission(session_id, permission):
                    return f"โŒ Access denied! You need '{permission}' permission."
                return func(session_id, *args, **kwargs)
            return wrapper
        return decorator

# ๐Ÿ—๏ธ Create our portal
portal = CompanyPortal()

# Add users with different roles
portal.add_user("alice", "admin123", "admin")      # ๐Ÿ‘‘ Admin
portal.add_user("bob", "manager456", "manager")    # ๐Ÿ’ผ Manager
portal.add_user("charlie", "emp789", "employee")   # ๐Ÿ‘” Employee
portal.add_user("diana", "intern321", "intern")    # ๐Ÿ“š Intern

# ๐Ÿ“‹ Protected functions
@portal.require_permission('read')
def view_documents(session_id):
    return "๐Ÿ“„ Here are the documents!"

@portal.require_permission('write')
def edit_document(session_id, doc_name):
    return f"โœ๏ธ Editing {doc_name}..."

@portal.require_permission('delete')
def delete_document(session_id, doc_name):
    return f"๐Ÿ—‘๏ธ Deleted {doc_name}!"

@portal.require_permission('manage_users')
def manage_users(session_id):
    return "๐Ÿ‘ฅ User management panel"

# ๐ŸŽฎ Test different access levels
alice_session = portal.authenticate("alice", "admin123")
print(f"Alice can manage users: {manage_users(alice_session)}")  # โœ…

diana_session = portal.authenticate("diana", "intern321")
print(f"Diana tries to delete: {delete_document(diana_session, 'report.pdf')}")  # โŒ

Example 2: JWT-Based Authentication ๐ŸŽซ

Modern web apps often use JSON Web Tokens (JWT) for stateless authentication:

import jwt
import json
from datetime import datetime, timedelta, timezone

class JWTAuthenticator:
    def __init__(self, secret_key='super_secret_key_๐Ÿ”'):
        self.secret_key = secret_key
        self.users = {}
        
    def register_user(self, username, password, email):
        """Register a new user ๐Ÿ“"""
        if username in self.users:
            return {"error": "User exists! ๐Ÿ˜•"}
        
        # ๐Ÿง‚ Salt and hash password
        salt = secrets.token_hex(16)
        password_hash = hashlib.sha256(
            (password + salt).encode()
        ).hexdigest()
        
        self.users[username] = {
            'password_hash': password_hash,
            'salt': salt,
            'email': email,
            'created': datetime.now(timezone.utc).isoformat(),
            'permissions': ['read']  # ๐Ÿ“– Default permissions
        }
        
        return {"message": f"Welcome {username}! ๐ŸŽ‰"}
    
    def create_token(self, username):
        """Create a JWT token ๐ŸŽซ"""
        payload = {
            'username': username,
            'email': self.users[username]['email'],
            'permissions': self.users[username]['permissions'],
            'exp': datetime.now(timezone.utc) + timedelta(hours=24),
            'iat': datetime.now(timezone.utc)
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        return token
    
    def verify_token(self, token):
        """Verify and decode JWT token ๐Ÿ”"""
        try:
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=['HS256']
            )
            return {'valid': True, 'data': payload}
        except jwt.ExpiredSignatureError:
            return {'valid': False, 'error': 'Token expired! โฐ'}
        except jwt.InvalidTokenError:
            return {'valid': False, 'error': 'Invalid token! โŒ'}
    
    def login(self, username, password):
        """Authenticate and return JWT ๐Ÿ”‘"""
        if username not in self.users:
            return {"error": "Invalid credentials! โŒ"}
        
        user = self.users[username]
        password_hash = hashlib.sha256(
            (password + user['salt']).encode()
        ).hexdigest()
        
        if password_hash == user['password_hash']:
            token = self.create_token(username)
            return {
                "message": "Login successful! ๐Ÿš€",
                "token": token
            }
        
        return {"error": "Invalid credentials! โŒ"}
    
    def authorize(self, token, required_permission):
        """Check if token has permission ๐ŸŽฏ"""
        result = self.verify_token(token)
        
        if not result['valid']:
            return False, result['error']
        
        user_permissions = result['data'].get('permissions', [])
        if required_permission in user_permissions:
            return True, "Access granted! โœ…"
        
        return False, f"Missing permission: {required_permission} ๐Ÿšซ"

# ๐ŸŽฎ Let's test JWT authentication!
jwt_auth = JWTAuthenticator()

# Register users
print(jwt_auth.register_user("alice", "secret123", "[email protected]"))
print(jwt_auth.register_user("bob", "password456", "[email protected]"))

# Login and get tokens
alice_result = jwt_auth.login("alice", "secret123")
print(f"Alice login: {alice_result}")

if 'token' in alice_result:
    # ๐Ÿ” Verify token
    token_info = jwt_auth.verify_token(alice_result['token'])
    print(f"Token valid: {token_info['valid']}")
    print(f"User data: {token_info['data']['username']} ๐Ÿ‘ค")
    
    # ๐ŸŽฏ Check authorization
    can_read, msg = jwt_auth.authorize(alice_result['token'], 'read')
    print(f"Can read: {msg}")
    
    can_write, msg = jwt_auth.authorize(alice_result['token'], 'write')
    print(f"Can write: {msg}")

Example 3: OAuth-Style Token System ๐ŸŒ

Letโ€™s build a simplified OAuth-like system for API authentication:

import uuid
from datetime import datetime, timedelta

class OAuthProvider:
    def __init__(self):
        self.apps = {}      # ๐Ÿ“ฑ Registered applications
        self.users = {}     # ๐Ÿ‘ฅ User accounts
        self.tokens = {}    # ๐ŸŽซ Access tokens
        self.codes = {}     # ๐Ÿ”‘ Authorization codes
        
    def register_app(self, app_name, redirect_uri):
        """Register a new application ๐Ÿ“ฑ"""
        client_id = str(uuid.uuid4())
        client_secret = secrets.token_urlsafe(32)
        
        self.apps[client_id] = {
            'name': app_name,
            'secret': client_secret,
            'redirect_uri': redirect_uri,
            'scopes': ['read', 'write']  # ๐Ÿ“‹ Available scopes
        }
        
        return {
            'client_id': client_id,
            'client_secret': client_secret,
            'message': f"App '{app_name}' registered! ๐ŸŽ‰"
        }
    
    def create_user(self, username, password):
        """Create a user account ๐Ÿ‘ค"""
        salt = secrets.token_hex(16)
        password_hash = hashlib.sha256(
            (password + salt).encode()
        ).hexdigest()
        
        self.users[username] = {
            'password_hash': password_hash,
            'salt': salt,
            'authorized_apps': {}  # ๐Ÿ” Apps user has authorized
        }
        return f"User {username} created! ๐ŸŽŠ"
    
    def authorize_app(self, username, password, client_id, scopes):
        """User authorizes an app ๐Ÿค"""
        # Verify user
        if username not in self.users:
            return {"error": "Invalid user! โŒ"}
        
        user = self.users[username]
        password_hash = hashlib.sha256(
            (password + user['salt']).encode()
        ).hexdigest()
        
        if password_hash != user['password_hash']:
            return {"error": "Invalid password! โŒ"}
        
        # Verify app
        if client_id not in self.apps:
            return {"error": "Invalid app! โŒ"}
        
        # ๐ŸŽซ Generate authorization code
        auth_code = secrets.token_urlsafe(32)
        self.codes[auth_code] = {
            'username': username,
            'client_id': client_id,
            'scopes': scopes,
            'expires': datetime.now() + timedelta(minutes=10)
        }
        
        # ๐Ÿ“ Record authorization
        self.users[username]['authorized_apps'][client_id] = scopes
        
        return {
            'code': auth_code,
            'message': f"App authorized! ๐ŸŽฏ"
        }
    
    def exchange_code(self, auth_code, client_id, client_secret):
        """Exchange auth code for access token ๐Ÿ”„"""
        # Verify code exists and not expired
        if auth_code not in self.codes:
            return {"error": "Invalid code! โŒ"}
        
        code_data = self.codes[auth_code]
        if datetime.now() > code_data['expires']:
            del self.codes[auth_code]
            return {"error": "Code expired! โฐ"}
        
        # Verify client credentials
        if (code_data['client_id'] != client_id or 
            self.apps[client_id]['secret'] != client_secret):
            return {"error": "Invalid client credentials! โŒ"}
        
        # ๐ŸŽซ Generate access token
        access_token = secrets.token_urlsafe(32)
        self.tokens[access_token] = {
            'username': code_data['username'],
            'client_id': client_id,
            'scopes': code_data['scopes'],
            'expires': datetime.now() + timedelta(hours=1)
        }
        
        # ๐Ÿ—‘๏ธ Delete used code
        del self.codes[auth_code]
        
        return {
            'access_token': access_token,
            'token_type': 'Bearer',
            'expires_in': 3600,
            'message': 'Token issued! ๐Ÿš€'
        }
    
    def validate_token(self, token, required_scope=None):
        """Validate access token ๐Ÿ”"""
        if token not in self.tokens:
            return False, "Invalid token! โŒ"
        
        token_data = self.tokens[token]
        if datetime.now() > token_data['expires']:
            del self.tokens[token]
            return False, "Token expired! โฐ"
        
        if required_scope and required_scope not in token_data['scopes']:
            return False, f"Missing scope: {required_scope} ๐Ÿšซ"
        
        return True, token_data

# ๐ŸŽฎ OAuth flow demonstration
oauth = OAuthProvider()

# 1๏ธโƒฃ Register an app
app_info = oauth.register_app("CoolApp", "https://coolapp.com/callback")
print(f"App registered: {app_info}")

# 2๏ธโƒฃ Create users
print(oauth.create_user("alice", "secure123"))
print(oauth.create_user("bob", "password456"))

# 3๏ธโƒฃ User authorizes app
auth_result = oauth.authorize_app(
    "alice", 
    "secure123",
    app_info['client_id'],
    ['read', 'write']
)
print(f"Authorization: {auth_result}")

# 4๏ธโƒฃ App exchanges code for token
if 'code' in auth_result:
    token_result = oauth.exchange_code(
        auth_result['code'],
        app_info['client_id'],
        app_info['client_secret']
    )
    print(f"Token exchange: {token_result}")
    
    # 5๏ธโƒฃ Use the token
    if 'access_token' in token_result:
        valid, data = oauth.validate_token(
            token_result['access_token'],
            'read'
        )
        if valid:
            print(f"โœ… Token valid for user: {data['username']}")
            print(f"๐Ÿ“‹ Scopes: {data['scopes']}")

๐Ÿš€ Advanced Concepts

Multi-Factor Authentication (MFA) ๐Ÿ”

Letโ€™s implement a complete MFA system:

import pyotp  # pip install pyotp
import qrcode  # pip install qrcode
from io import BytesIO

class MFASystem:
    def __init__(self):
        self.users = {}
        self.backup_codes = {}
        
    def setup_mfa(self, username):
        """Setup MFA for a user ๐Ÿ“ฑ"""
        # Generate secret key
        secret = pyotp.random_base32()
        
        # Store secret
        if username not in self.users:
            self.users[username] = {}
        
        self.users[username]['mfa_secret'] = secret
        self.users[username]['mfa_enabled'] = False
        
        # Generate backup codes
        backup = [secrets.token_hex(4) for _ in range(10)]
        self.backup_codes[username] = backup.copy()
        
        # Create TOTP URI for QR code
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=username,
            issuer_name='SecureApp'
        )
        
        return {
            'secret': secret,
            'uri': totp_uri,
            'backup_codes': backup,
            'message': 'MFA setup ready! ๐Ÿ“ฒ'
        }
    
    def enable_mfa(self, username, token):
        """Verify and enable MFA ๐Ÿ”“"""
        if username not in self.users:
            return False, "User not found! โŒ"
        
        secret = self.users[username]['mfa_secret']
        totp = pyotp.TOTP(secret)
        
        if totp.verify(token, valid_window=1):
            self.users[username]['mfa_enabled'] = True
            return True, "MFA enabled successfully! ๐ŸŽ‰"
        
        return False, "Invalid token! โŒ"
    
    def verify_mfa(self, username, token):
        """Verify MFA token ๐Ÿ”"""
        if username not in self.users:
            return False, "User not found! โŒ"
        
        if not self.users[username].get('mfa_enabled', False):
            return False, "MFA not enabled! ๐Ÿšซ"
        
        # Check if it's a backup code
        if username in self.backup_codes:
            if token in self.backup_codes[username]:
                self.backup_codes[username].remove(token)
                return True, "Backup code used! ๐ŸŽซ"
        
        # Verify TOTP
        secret = self.users[username]['mfa_secret']
        totp = pyotp.TOTP(secret)
        
        if totp.verify(token, valid_window=1):
            return True, "MFA verified! โœ…"
        
        return False, "Invalid token! โŒ"

# ๐ŸŽฎ Test MFA system
mfa = MFASystem()

# Setup MFA for Alice
setup_result = mfa.setup_mfa("alice")
print(f"MFA Setup: {setup_result['message']}")
print(f"Secret: {setup_result['secret']}")
print(f"Backup codes: {setup_result['backup_codes'][:3]}... (showing first 3)")

# Generate current token (in real app, user would use authenticator app)
totp = pyotp.TOTP(setup_result['secret'])
current_token = totp.now()
print(f"Current token: {current_token}")

# Enable MFA
enabled, msg = mfa.enable_mfa("alice", current_token)
print(f"Enable MFA: {msg}")

# Verify with MFA
time.sleep(2)  # Wait to ensure we get a different token
new_token = totp.now()
verified, msg = mfa.verify_mfa("alice", new_token)
print(f"Verify MFA: {msg}")

API Key Management System ๐Ÿ”‘

Build a complete API key management system:

class APIKeyManager:
    def __init__(self):
        self.api_keys = {}
        self.rate_limits = {}
        self.usage_stats = {}
        
    def generate_api_key(self, user_id, name, permissions=None):
        """Generate a new API key ๐ŸŽฏ"""
        # Create unique key
        key_id = str(uuid.uuid4())
        api_key = f"sk_{secrets.token_urlsafe(32)}"
        
        # Store key data
        self.api_keys[api_key] = {
            'key_id': key_id,
            'user_id': user_id,
            'name': name,
            'permissions': permissions or ['read'],
            'created': datetime.now(),
            'last_used': None,
            'active': True,
            'rate_limit': 1000  # ๐Ÿšฆ Requests per hour
        }
        
        # Initialize usage stats
        self.usage_stats[api_key] = {
            'total_requests': 0,
            'hourly_requests': [],
            'endpoints': {}
        }
        
        return {
            'api_key': api_key,
            'key_id': key_id,
            'message': f"API key '{name}' created! ๐Ÿ”‘"
        }
    
    def validate_request(self, api_key, endpoint, permission='read'):
        """Validate API request ๐Ÿ”"""
        # Check if key exists
        if api_key not in self.api_keys:
            return False, "Invalid API key! โŒ", None
        
        key_data = self.api_keys[api_key]
        
        # Check if active
        if not key_data['active']:
            return False, "API key is inactive! ๐Ÿšซ", None
        
        # Check permissions
        if permission not in key_data['permissions']:
            return False, f"Missing permission: {permission} ๐Ÿšซ", None
        
        # Check rate limit
        current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
        hourly_count = self._get_hourly_count(api_key, current_hour)
        
        if hourly_count >= key_data['rate_limit']:
            return False, "Rate limit exceeded! ๐Ÿšฆ", None
        
        # Update usage
        self._update_usage(api_key, endpoint)
        
        return True, "Request authorized! โœ…", key_data['user_id']
    
    def _get_hourly_count(self, api_key, hour):
        """Get request count for current hour ๐Ÿ“Š"""
        stats = self.usage_stats[api_key]
        hourly = stats['hourly_requests']
        
        # Clean old entries
        hourly[:] = [
            (h, count) for h, count in hourly 
            if h >= hour - timedelta(hours=1)
        ]
        
        # Count current hour
        return sum(count for h, count in hourly if h == hour)
    
    def _update_usage(self, api_key, endpoint):
        """Update usage statistics ๐Ÿ“ˆ"""
        stats = self.usage_stats[api_key]
        current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
        
        # Update total
        stats['total_requests'] += 1
        
        # Update hourly
        hourly = stats['hourly_requests']
        if hourly and hourly[-1][0] == current_hour:
            hourly[-1] = (current_hour, hourly[-1][1] + 1)
        else:
            hourly.append((current_hour, 1))
        
        # Update endpoints
        stats['endpoints'][endpoint] = stats['endpoints'].get(endpoint, 0) + 1
        
        # Update last used
        self.api_keys[api_key]['last_used'] = datetime.now()
    
    def get_usage_report(self, api_key):
        """Get usage report for API key ๐Ÿ“Š"""
        if api_key not in self.api_keys:
            return None
        
        key_data = self.api_keys[api_key]
        stats = self.usage_stats[api_key]
        
        return {
            'key_name': key_data['name'],
            'total_requests': stats['total_requests'],
            'rate_limit': key_data['rate_limit'],
            'top_endpoints': sorted(
                stats['endpoints'].items(),
                key=lambda x: x[1],
                reverse=True
            )[:5],
            'last_used': key_data['last_used'],
            'status': 'Active โœ…' if key_data['active'] else 'Inactive ๐Ÿšซ'
        }

# ๐ŸŽฎ Test API key system
api_mgr = APIKeyManager()

# Generate API keys
key1 = api_mgr.generate_api_key("user123", "Production Key", ['read', 'write'])
key2 = api_mgr.generate_api_key("user123", "Testing Key", ['read'])

print(f"Key 1: {key1}")

# Simulate API requests
api_key = key1['api_key']

# Valid requests
for i in range(5):
    valid, msg, user = api_mgr.validate_request(api_key, '/api/users', 'read')
    print(f"Request {i+1}: {msg}")

# Try write permission
valid, msg, user = api_mgr.validate_request(api_key, '/api/users/create', 'write')
print(f"Write request: {msg}")

# Get usage report
report = api_mgr.get_usage_report(api_key)
print(f"\n๐Ÿ“Š Usage Report:")
print(f"Total requests: {report['total_requests']}")
print(f"Top endpoints: {report['top_endpoints']}")

โš ๏ธ Common Pitfalls and Solutions

1. Storing Passwords in Plain Text โŒ

# โŒ NEVER DO THIS!
def bad_store_password(username, password):
    users[username] = password  # ๐Ÿ˜ฑ Plain text!

# โœ… Always hash passwords
def good_store_password(username, password):
    salt = secrets.token_hex(16)
    hashed = hashlib.pbkdf2_hmac(
        'sha256',
        password.encode('utf-8'),
        salt.encode('utf-8'),
        100_000
    )
    users[username] = {
        'salt': salt,
        'hash': hashed.hex()
    }

2. Weak Session Management ๐Ÿช

# โŒ Predictable session IDs
session_id = f"session_{user_id}_{timestamp}"  # ๐Ÿ˜ฑ Guessable!

# โœ… Use cryptographically secure tokens
session_id = secrets.token_urlsafe(32)  # ๐Ÿ” Secure!

3. Not Validating Token Expiration โฐ

# โŒ Tokens that never expire
def validate_token(token):
    return token in valid_tokens  # ๐Ÿ˜ฑ No expiration check!

# โœ… Always check expiration
def validate_token(token):
    if token not in valid_tokens:
        return False
    
    token_data = valid_tokens[token]
    if datetime.now() > token_data['expires']:
        del valid_tokens[token]  # ๐Ÿ—‘๏ธ Clean up
        return False
    
    return True

4. Timing Attacks in Authentication ๐Ÿ•

# โŒ Early return reveals information
def authenticate(username, password):
    if username not in users:
        return False  # ๐Ÿ˜ฑ Returns faster for invalid users!
    
    return check_password(users[username], password)

# โœ… Constant-time comparison
def authenticate(username, password):
    # Always do the same work
    dummy_hash = "$2b$12$dummy.hash.for.timing.attack.prevention"
    
    if username in users:
        stored_hash = users[username]['password']
    else:
        stored_hash = dummy_hash  # ๐Ÿ›ก๏ธ Same computation time
    
    # Always verify (even with dummy)
    result = verify_password(password, stored_hash)
    
    # Only return true if user exists AND password matches
    return username in users and result

๐Ÿ› ๏ธ Best Practices

1. Use Established Libraries ๐Ÿ“š

# โœ… Use well-tested libraries
from passlib.hash import bcrypt  # Password hashing
import jwt  # JSON Web Tokens
from cryptography.fernet import Fernet  # Encryption

# Example with bcrypt
password_hash = bcrypt.hash("user_password")
is_valid = bcrypt.verify("user_password", password_hash)

2. Implement Proper Logging ๐Ÿ“

import logging

class SecureAuthLogger:
    def __init__(self):
        self.logger = logging.getLogger('auth')
        
    def log_auth_attempt(self, username, success, ip_address):
        """Log authentication attempts ๐Ÿ“Š"""
        if success:
            self.logger.info(
                f"โœ… Successful login: {username} from {ip_address}"
            )
        else:
            self.logger.warning(
                f"โŒ Failed login attempt: {username} from {ip_address}"
            )
    
    def log_permission_denied(self, user, resource, permission):
        """Log authorization failures ๐Ÿšซ"""
        self.logger.warning(
            f"๐Ÿšซ Permission denied: {user} tried to {permission} on {resource}"
        )

3. Implement Account Lockout ๐Ÿ”’

class AccountLockout:
    def __init__(self, max_attempts=5, lockout_duration=300):
        self.max_attempts = max_attempts
        self.lockout_duration = lockout_duration  # seconds
        self.attempts = {}
        
    def check_lockout(self, username):
        """Check if account is locked ๐Ÿ”"""
        if username not in self.attempts:
            return False, 0
        
        data = self.attempts[username]
        if data['locked_until'] and datetime.now() < data['locked_until']:
            remaining = (data['locked_until'] - datetime.now()).seconds
            return True, remaining
        
        return False, 0
    
    def record_attempt(self, username, success):
        """Record login attempt ๐Ÿ“"""
        if username not in self.attempts:
            self.attempts[username] = {
                'count': 0,
                'locked_until': None
            }
        
        if success:
            # Reset on success
            self.attempts[username] = {
                'count': 0,
                'locked_until': None
            }
        else:
            # Increment failed attempts
            self.attempts[username]['count'] += 1
            
            if self.attempts[username]['count'] >= self.max_attempts:
                self.attempts[username]['locked_until'] = (
                    datetime.now() + timedelta(seconds=self.lockout_duration)
                )

4. Secure Token Storage ๐Ÿ—„๏ธ

class SecureTokenStore:
    def __init__(self, encryption_key=None):
        if encryption_key is None:
            encryption_key = Fernet.generate_key()
        self.cipher = Fernet(encryption_key)
        self.tokens = {}
        
    def store_token(self, token_id, sensitive_data):
        """Store encrypted token data ๐Ÿ”"""
        # Encrypt sensitive data
        encrypted = self.cipher.encrypt(
            json.dumps(sensitive_data).encode()
        )
        
        self.tokens[token_id] = {
            'encrypted_data': encrypted,
            'created': datetime.now(),
            'expires': datetime.now() + timedelta(hours=1)
        }
    
    def retrieve_token(self, token_id):
        """Retrieve and decrypt token data ๐Ÿ”“"""
        if token_id not in self.tokens:
            return None
        
        token_data = self.tokens[token_id]
        
        # Check expiration
        if datetime.now() > token_data['expires']:
            del self.tokens[token_id]
            return None
        
        # Decrypt data
        decrypted = self.cipher.decrypt(token_data['encrypted_data'])
        return json.loads(decrypted.decode())

๐Ÿงช Hands-On Exercise

Build a secure blog API with authentication and authorization! ๐Ÿ“

Requirements:

  1. User registration and login with secure passwords ๐Ÿ”
  2. JWT tokens for API authentication ๐ŸŽซ
  3. Role-based access (admin, author, reader) ๐Ÿ‘ฅ
  4. Permissions: readers can read, authors can write, admins can delete ๐Ÿ“‹
  5. Rate limiting per API key ๐Ÿšฆ
๐Ÿ’ก Click here for the solution
import jwt
import hashlib
import secrets
from datetime import datetime, timedelta
from functools import wraps

class BlogAPI:
    def __init__(self):
        self.users = {}
        self.posts = {}
        self.tokens = {}
        self.secret_key = "blog_secret_key_๐Ÿ”"
        self.rate_limits = {}
        
    # ๐Ÿ‘ค User Management
    def register(self, username, password, email, role='reader'):
        """Register a new user ๐Ÿ“"""
        if username in self.users:
            return {"error": "Username taken! ๐Ÿ˜•"}
        
        # Hash password
        salt = secrets.token_hex(16)
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100_000
        )
        
        self.users[username] = {
            'password_hash': password_hash.hex(),
            'salt': salt,
            'email': email,
            'role': role,
            'created': datetime.now()
        }
        
        return {"message": f"Welcome {username}! ๐ŸŽ‰", "role": role}
    
    def login(self, username, password):
        """Authenticate user and return JWT ๐Ÿ”‘"""
        if username not in self.users:
            return {"error": "Invalid credentials! โŒ"}
        
        user = self.users[username]
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            user['salt'].encode('utf-8'),
            100_000
        ).hex()
        
        if password_hash != user['password_hash']:
            return {"error": "Invalid credentials! โŒ"}
        
        # Generate JWT
        payload = {
            'username': username,
            'role': user['role'],
            'exp': datetime.utcnow() + timedelta(hours=24),
            'iat': datetime.utcnow()
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        
        return {
            "message": "Login successful! ๐Ÿš€",
            "token": token,
            "role": user['role']
        }
    
    # ๐Ÿ” Authorization Decorator
    def require_role(self, allowed_roles):
        def decorator(func):
            @wraps(func)
            def wrapper(token, *args, **kwargs):
                # Verify token
                try:
                    payload = jwt.decode(
                        token, 
                        self.secret_key, 
                        algorithms=['HS256']
                    )
                except jwt.ExpiredSignatureError:
                    return {"error": "Token expired! โฐ"}
                except jwt.InvalidTokenError:
                    return {"error": "Invalid token! โŒ"}
                
                # Check role
                user_role = payload.get('role')
                if user_role not in allowed_roles:
                    return {"error": f"Access denied! Need role: {allowed_roles} ๐Ÿšซ"}
                
                # Check rate limit
                username = payload.get('username')
                if not self._check_rate_limit(username):
                    return {"error": "Rate limit exceeded! ๐Ÿšฆ"}
                
                # Call original function with user info
                return func(payload, *args, **kwargs)
            return wrapper
        return decorator
    
    def _check_rate_limit(self, username):
        """Check rate limit (100 requests per hour) ๐Ÿšฆ"""
        current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
        
        if username not in self.rate_limits:
            self.rate_limits[username] = {}
        
        user_limits = self.rate_limits[username]
        
        if current_hour not in user_limits:
            user_limits[current_hour] = 0
        
        user_limits[current_hour] += 1
        
        # Clean old entries
        for hour in list(user_limits.keys()):
            if hour < current_hour - timedelta(hours=1):
                del user_limits[hour]
        
        return user_limits[current_hour] <= 100
    
    # ๐Ÿ“ Blog Operations
    @require_role(['reader', 'author', 'admin'])
    def get_posts(self, user_info):
        """Get all posts ๐Ÿ“š"""
        return {
            "posts": list(self.posts.values()),
            "count": len(self.posts)
        }
    
    @require_role(['author', 'admin'])
    def create_post(self, user_info, title, content):
        """Create a new post โœ๏ธ"""
        post_id = str(len(self.posts) + 1)
        
        self.posts[post_id] = {
            'id': post_id,
            'title': title,
            'content': content,
            'author': user_info['username'],
            'created': datetime.now().isoformat(),
            'updated': datetime.now().isoformat()
        }
        
        return {
            "message": "Post created! ๐Ÿ“",
            "post_id": post_id
        }
    
    @require_role(['author', 'admin'])
    def update_post(self, user_info, post_id, title=None, content=None):
        """Update a post โœ๏ธ"""
        if post_id not in self.posts:
            return {"error": "Post not found! ๐Ÿ˜•"}
        
        post = self.posts[post_id]
        
        # Authors can only edit their own posts
        if (user_info['role'] == 'author' and 
            post['author'] != user_info['username']):
            return {"error": "You can only edit your own posts! ๐Ÿšซ"}
        
        if title:
            post['title'] = title
        if content:
            post['content'] = content
        post['updated'] = datetime.now().isoformat()
        
        return {"message": "Post updated! โœจ"}
    
    @require_role(['admin'])
    def delete_post(self, user_info, post_id):
        """Delete a post ๐Ÿ—‘๏ธ"""
        if post_id not in self.posts:
            return {"error": "Post not found! ๐Ÿ˜•"}
        
        del self.posts[post_id]
        return {"message": "Post deleted! ๐Ÿ—‘๏ธ"}

# ๐ŸŽฎ Test the Blog API
blog = BlogAPI()

# Register users
print("=== User Registration ===")
print(blog.register("alice", "admin123", "[email protected]", "admin"))
print(blog.register("bob", "author456", "[email protected]", "author"))
print(blog.register("charlie", "reader789", "[email protected]", "reader"))

# Login
print("\n=== User Login ===")
alice_login = blog.login("alice", "admin123")
bob_login = blog.login("bob", "author456")
charlie_login = blog.login("charlie", "reader789")

print(f"Alice (admin): {alice_login['message']}")
print(f"Bob (author): {bob_login['message']}")
print(f"Charlie (reader): {charlie_login['message']}")

# Test operations
print("\n=== Blog Operations ===")

# Charlie tries to create a post (should fail)
result = blog.create_post(
    charlie_login['token'],
    "My First Post",
    "Hello World!"
)
print(f"Charlie create: {result}")

# Bob creates a post (should succeed)
result = blog.create_post(
    bob_login['token'],
    "Bob's Post",
    "This is Bob's awesome post! ๐ŸŽ‰"
)
print(f"Bob create: {result}")

# Alice creates a post
result = blog.create_post(
    alice_login['token'],
    "Admin Announcement",
    "Important news from admin! ๐Ÿ“ข"
)
print(f"Alice create: {result}")

# Everyone can read
result = blog.get_posts(charlie_login['token'])
print(f"\nAll posts: {len(result['posts'])} posts found")

# Bob tries to delete (should fail)
result = blog.delete_post(bob_login['token'], "1")
print(f"Bob delete: {result}")

# Alice deletes (should succeed)
result = blog.delete_post(alice_login['token'], "1")
print(f"Alice delete: {result}")

๐ŸŽ“ Key Takeaways

Youโ€™ve mastered authentication and authorization in Python! Hereโ€™s what you learned:

  1. Authentication vs Authorization ๐Ÿ”

    • Authentication = Who are you?
    • Authorization = What can you do?
  2. Password Security ๐Ÿ›ก๏ธ

    • Always hash with salt
    • Use strong algorithms (bcrypt, pbkdf2)
    • Never store plain text!
  3. Token Management ๐ŸŽซ

    • JWTs for stateless auth
    • Session tokens for stateful
    • Always set expiration
  4. Access Control ๐Ÿ‘ฅ

    • Role-based (RBAC)
    • Permission-based
    • Always validate!
  5. Best Practices ๐ŸŒŸ

    • Use established libraries
    • Implement rate limiting
    • Log security events
    • Handle errors securely

๐Ÿค Next Steps

Congratulations on mastering security fundamentals! ๐ŸŽ‰ Youโ€™re now equipped to build secure Python applications that protect user data and control access properly.

Continue your security journey:

  • Explore OAuth 2.0 and OpenID Connect ๐ŸŒ
  • Learn about HTTPS and TLS ๐Ÿ”’
  • Study OWASP security guidelines ๐Ÿ“‹
  • Practice with security challenges ๐Ÿ†

Remember: Security is not a feature, itโ€™s a mindset! Always think about what could go wrong and protect against it. Keep learning, keep securing, and keep building amazing things! ๐Ÿš€

Happy secure coding! ๐Ÿ’ช