Prerequisites
- Basic understanding of programming concepts 📝
- Python installation (3.8+) 🐍
- VS Code or preferred IDE 💻
What you'll learn
- Understand the concept fundamentals 🎯
- Apply the concept in real projects 🏗️
- Debug common issues 🐛
- Write clean, Pythonic code ✨
🎯 Introduction
Welcome to this exciting tutorial on FastAPI authentication with JWT tokens! 🎉 In this guide, we’ll explore how to build secure authentication systems for your FastAPI applications.
You’ll discover how JWT (JSON Web Tokens) can transform your API security game. Whether you’re building web applications 🌐, mobile backends 📱, or microservices 🔧, understanding JWT authentication is essential for protecting your users’ data and creating secure APIs.
By the end of this tutorial, you’ll feel confident implementing JWT authentication in your own FastAPI projects! Let’s dive in! 🏊♂️
📚 Understanding JWT Authentication
🤔 What is JWT Authentication?
JWT authentication is like having a special wristband at a concert 🎫. Think of it as a secure pass that proves who you are without having to show your ID every single time. Once you get your wristband (JWT token), you can access all the areas you’re allowed to visit!
In Python terms, JWT is a way to securely transmit information between parties as a JSON object 🔐. This means you can:
- ✨ Authenticate users without storing sessions
- 🚀 Scale your application easily
- 🛡️ Secure your API endpoints effectively
💡 Why Use JWT with FastAPI?
Here’s why developers love JWT authentication:
- Stateless Authentication 🔒: No server-side sessions needed
- Cross-Domain Support 💻: Works across different domains
- Mobile-Friendly 📱: Perfect for mobile app backends
- Scalable 🔧: Easy to scale horizontally
Real-world example: Imagine building a food delivery app 🍕. With JWT authentication, users can log in once and their token works across ordering, tracking, and payment services!
🔧 Basic Syntax and Usage
📝 Simple JWT Example
Let’s start with a friendly example:
# 👋 Hello, JWT Authentication!
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# 🎨 Setup our security tools
SECRET_KEY = "your-secret-key-here" # 🔐 Keep this secret!
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 🛡️ Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 🎫 OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
# 📋 User model
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
💡 Explanation: Notice how we set up all our security tools! The SECRET_KEY
is super important - it’s like the secret recipe that makes your tokens secure 🍳.
🎯 Creating JWT Tokens
Here’s how to create and verify tokens:
# 🎨 Create access token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
# ⏰ Set expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
# 📝 Add expiration to token data
to_encode.update({"exp": expire})
# 🔐 Create the JWT token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 🔍 Verify password
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 🔐 Hash password
def get_password_hash(password):
return pwd_context.hash(password)
💡 Practical Examples
🛒 Example 1: E-Commerce API Authentication
Let’s build a real authentication system:
# 🛍️ Fake database of users
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wonderland",
"email": "[email protected]",
"hashed_password": get_password_hash("secret123"),
"disabled": False,
}
}
# 👤 Get user from database
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return User(**user_dict)
# 🔍 Authenticate user
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, fake_db[username]["hashed_password"]):
return False
return user
# 🎯 Get current user from token
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials 😢",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 🔓 Decode the token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# 👤 Get user from database
user = get_user(fake_users_db, username=username)
if user is None:
raise credentials_exception
return user
# 🚀 Login endpoint
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 🔍 Check username and password
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password 😔",
headers={"WWW-Authenticate": "Bearer"},
)
# ⏰ Create token with expiration
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# 🎉 Return the token
return {"access_token": access_token, "token_type": "bearer"}
# 🛡️ Protected endpoint
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return {"username": current_user.username, "message": "Welcome back! 🎉"}
🎯 Try it yourself: Add a registration endpoint that creates new users!
🎮 Example 2: Game API with Role-Based Access
Let’s make it more fun with roles:
# 🏆 Enhanced user model with roles
class UserInDB(User):
hashed_password: str
role: str # 🎮 admin, player, moderator
# 🎨 Token data model
class TokenData(BaseModel):
username: Optional[str] = None
role: Optional[str] = None
# 🛡️ Role-based access control
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user 😴")
return current_user
# 👑 Admin only access
def get_admin_user(current_user: User = Depends(get_current_active_user)):
# 🔍 Check if user has admin role
user_data = fake_users_db.get(current_user.username)
if user_data.get("role") != "admin":
raise HTTPException(
status_code=403,
detail="Not enough permissions 🚫"
)
return current_user
# 🎮 Game endpoints with different access levels
@app.get("/scores/public")
async def get_public_scores():
return {"scores": [
{"player": "Alice", "score": 1000, "emoji": "🏆"},
{"player": "Bob", "score": 950, "emoji": "🥈"},
{"player": "Charlie", "score": 900, "emoji": "🥉"}
]}
@app.get("/scores/my")
async def get_my_scores(current_user: User = Depends(get_current_active_user)):
# 🎯 Return user's personal scores
return {
"player": current_user.username,
"scores": [850, 920, 1050],
"best": 1050,
"emoji": "🌟"
}
@app.post("/scores/admin/reset")
async def reset_all_scores(admin_user: User = Depends(get_admin_user)):
# 🔄 Admin-only endpoint
return {"message": "All scores reset! 🔄", "admin": admin_user.username}
🚀 Advanced Concepts
🧙♂️ Refresh Tokens
When you’re ready to level up, implement refresh tokens:
# 🎯 Advanced token management
REFRESH_TOKEN_EXPIRE_DAYS = 7
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
# 🔄 Create refresh token
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 🎨 Enhanced login with refresh token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password 😔"
)
# 🎫 Create both tokens
access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
# 🔄 Refresh endpoint
@app.post("/refresh")
async def refresh_token(refresh_token: str):
try:
# 🔓 Decode refresh token
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_type: str = payload.get("type")
if username is None or token_type != "refresh":
raise HTTPException(status_code=401, detail="Invalid refresh token 😢")
# 🎫 Create new access token
new_access_token = create_access_token(data={"sub": username})
return {"access_token": new_access_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(status_code=401, detail="Invalid refresh token 😢")
🏗️ Token Blacklisting
For the brave developers, implement token revocation:
# 🚀 Token blacklist storage (use Redis in production!)
blacklisted_tokens = set()
# 🛡️ Check if token is blacklisted
async def get_current_user_with_blacklist(token: str = Depends(oauth2_scheme)):
if token in blacklisted_tokens:
raise HTTPException(
status_code=401,
detail="Token has been revoked 🚫"
)
# Continue with normal verification
return await get_current_user(token)
# 🚪 Logout endpoint
@app.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
# 🗑️ Add token to blacklist
blacklisted_tokens.add(token)
return {"message": "Successfully logged out! 👋"}
⚠️ Common Pitfalls and Solutions
😱 Pitfall 1: Hardcoded Secret Keys
# ❌ Wrong way - exposing your secret!
SECRET_KEY = "my-super-secret-key" # 💥 Never commit this!
# ✅ Correct way - use environment variables!
import os
from dotenv import load_dotenv
load_dotenv()
SECRET_KEY = os.getenv("SECRET_KEY") # 🛡️ Safe and secure!
if not SECRET_KEY:
raise ValueError("No SECRET_KEY set for JWT 😱")
🤯 Pitfall 2: No Token Expiration
# ❌ Dangerous - tokens that never expire!
def create_bad_token(data: dict):
return jwt.encode(data, SECRET_KEY, algorithm=ALGORITHM)
# ✅ Safe - always set expiration!
def create_good_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire}) # ⏰ Token expires!
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
🛠️ Best Practices
- 🎯 Use Strong Secret Keys: Generate cryptographically secure keys!
- 📝 Set Reasonable Expiration: Access tokens: 15-30 min, Refresh tokens: 7-30 days
- 🛡️ HTTPS Only: Always use HTTPS in production
- 🎨 Validate Everything: Check token claims and user permissions
- ✨ Handle Errors Gracefully: Provide clear error messages
🧪 Hands-On Exercise
🎯 Challenge: Build a Secure Blog API
Create a JWT-authenticated blog system:
📋 Requirements:
- ✅ User registration with email verification
- 🏷️ Different roles (reader, writer, admin)
- 👤 User profile management
- 📝 CRUD operations for blog posts
- 🎨 Only writers can create posts!
🚀 Bonus Points:
- Add password reset functionality
- Implement email notifications
- Create rate limiting for login attempts
💡 Solution
🔍 Click to see solution
# 🎯 Our secure blog API!
from typing import List, Optional
from datetime import datetime
# 📝 Blog post model
class BlogPost(BaseModel):
id: Optional[int] = None
title: str
content: str
author: str
created_at: datetime = datetime.utcnow()
tags: List[str] = []
emoji: str = "📝"
# 👤 Enhanced user with roles
class BlogUser(BaseModel):
username: str
email: str
role: str # reader, writer, admin
verified: bool = False
# 🗄️ Fake blog database
blog_posts = []
blog_users = {}
# 🛡️ Role checking decorators
def require_writer(current_user: User = Depends(get_current_active_user)):
user_data = blog_users.get(current_user.username)
if user_data["role"] not in ["writer", "admin"]:
raise HTTPException(
status_code=403,
detail="Only writers can perform this action ✍️"
)
return current_user
# 📝 Create blog post
@app.post("/posts", response_model=BlogPost)
async def create_post(
post: BlogPost,
current_user: User = Depends(require_writer)
):
# ✨ Add post with author
post.id = len(blog_posts) + 1
post.author = current_user.username
blog_posts.append(post)
return post
# 📖 Get all posts (public)
@app.get("/posts", response_model=List[BlogPost])
async def get_posts(skip: int = 0, limit: int = 10):
return blog_posts[skip : skip + limit]
# ✏️ Update post (only author or admin)
@app.put("/posts/{post_id}")
async def update_post(
post_id: int,
updated_post: BlogPost,
current_user: User = Depends(get_current_active_user)
):
if post_id <= 0 or post_id > len(blog_posts):
raise HTTPException(status_code=404, detail="Post not found 😢")
post = blog_posts[post_id - 1]
user_data = blog_users.get(current_user.username)
# 🔍 Check permissions
if post.author != current_user.username and user_data["role"] != "admin":
raise HTTPException(
status_code=403,
detail="You can only edit your own posts 🚫"
)
# ✅ Update the post
post.title = updated_post.title
post.content = updated_post.content
post.tags = updated_post.tags
return {"message": "Post updated successfully! ✨", "post": post}
# 👤 User registration
@app.post("/register")
async def register(username: str, email: str, password: str):
if username in blog_users:
raise HTTPException(
status_code=400,
detail="Username already exists 😔"
)
# 🎨 Create new user
blog_users[username] = {
"username": username,
"email": email,
"hashed_password": get_password_hash(password),
"role": "reader", # Default role
"verified": False
}
# 📧 Send verification email (mock)
print(f"📧 Verification email sent to {email}")
return {"message": f"Welcome {username}! Please verify your email 📧"}
# 📊 Admin stats endpoint
@app.get("/admin/stats")
async def get_stats(admin_user: User = Depends(get_admin_user)):
return {
"total_posts": len(blog_posts),
"total_users": len(blog_users),
"roles": {
"readers": sum(1 for u in blog_users.values() if u["role"] == "reader"),
"writers": sum(1 for u in blog_users.values() if u["role"] == "writer"),
"admins": sum(1 for u in blog_users.values() if u["role"] == "admin")
},
"emoji": "📊"
}
🎓 Key Takeaways
You’ve learned so much! Here’s what you can now do:
- ✅ Create JWT authentication with confidence 💪
- ✅ Avoid common security mistakes that trip up beginners 🛡️
- ✅ Apply best practices in real FastAPI projects 🎯
- ✅ Debug authentication issues like a pro 🐛
- ✅ Build secure APIs with FastAPI and JWT! 🚀
Remember: Security is not optional - it’s essential! JWT helps you build secure, scalable APIs. 🤝
🤝 Next Steps
Congratulations! 🎉 You’ve mastered JWT authentication in FastAPI!
Here’s what to do next:
- 💻 Practice with the exercises above
- 🏗️ Build a small API with JWT authentication
- 📚 Learn about OAuth2 and social login integration
- 🌟 Share your secure API with the world!
Remember: Every security expert was once a beginner. Keep coding, keep learning, and most importantly, keep your APIs secure! 🚀
Happy coding! 🎉🚀✨