Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on FastAPI WebSockets! ๐ In this guide, weโll explore the magical world of real-time communication between your Python server and clients.
Imagine building a live chat app ๐ฌ, a real-time dashboard ๐, or a multiplayer game ๐ฎ - thatโs the power of WebSockets! Unlike traditional HTTP requests that are like sending letters ๐ฎ, WebSockets create a phone line ๐ that stays open for instant, two-way communication.
By the end of this tutorial, youโll be creating amazing real-time features that will make your applications come alive! Letโs dive in! ๐โโ๏ธ
๐ Understanding WebSockets
๐ค What are WebSockets?
WebSockets are like having a direct phone line between your server and client! ๐ฑ Think of regular HTTP as sending text messages - you send one, wait for a reply, then send another. WebSockets? Thatโs a live phone call where both sides can talk whenever they want!
In Python terms, WebSockets provide a persistent, bidirectional communication channel between client and server. This means you can:
- โจ Send messages instantly without polling
- ๐ Push updates from server to client immediately
- ๐ก๏ธ Maintain a single connection for continuous communication
- ๐ก Build truly interactive applications
๐ก Why Use WebSockets with FastAPI?
Hereโs why developers love WebSockets in FastAPI:
- Real-time Updates ๐: Push live data without client requests
- Low Latency โก: Instant message delivery
- Efficient Resource Usage ๐ฐ: One connection, many messages
- Native FastAPI Support ๐ฏ: Built-in WebSocket handling
Real-world example: Imagine building a stock trading app ๐. With WebSockets, you can push price updates instantly to all connected traders - no refresh button needed!
๐ง Basic Syntax and Usage
๐ Simple WebSocket Example
Letโs start with a friendly example:
# ๐ Hello, WebSockets!
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
# ๐จ Simple HTML client for testing
html = """
<!DOCTYPE html>
<html>
<head>
<title>Chat ๐ฌ</title>
</head>
<body>
<h1>WebSocket Chat ๐</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send ๐ค</button>
</form>
<ul id='messages'></ul>
<script>
// ๐ Create WebSocket connection
const ws = new WebSocket("ws://localhost:8000/ws");
// ๐จ Handle incoming messages
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('li');
message.textContent = event.data;
messages.appendChild(message);
};
// ๐ค Send message function
function sendMessage(event) {
const input = document.getElementById("messageText");
ws.send(input.value);
input.value = '';
event.preventDefault();
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# ๐ค Accept the connection
await websocket.accept()
# ๐ฌ Echo messages back
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data} ๐")
๐ก Explanation: This creates a simple echo server! The client connects, sends messages, and receives them back with an echo prefix. Perfect for testing!
๐ฏ WebSocket Connection Lifecycle
Hereโs how WebSocket connections work:
# ๐๏ธ Complete WebSocket lifecycle
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
app = FastAPI()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
# 1๏ธโฃ Connection phase
await websocket.accept()
print(f"โ
Client {client_id} connected!")
try:
# 2๏ธโฃ Communication phase
while True:
# ๐ฅ Receive message
message = await websocket.receive_text()
print(f"๐จ From {client_id}: {message}")
# ๐ค Send response
response = f"Server received: {message} โจ"
await websocket.send_text(response)
except WebSocketDisconnect:
# 3๏ธโฃ Disconnection phase
print(f"๐ Client {client_id} disconnected")
๐ก Practical Examples
๐ฎ Example 1: Live Chat Room
Letโs build a real chat application:
# ๐ฌ Real-time chat room
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import json
from datetime import datetime
app = FastAPI()
# ๐ Chat room manager
class ConnectionManager:
def __init__(self):
# ๐ฅ Active connections
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
# ๐ Announce new user
await self.broadcast(f"{user_id} joined the chat! ๐", "system")
def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str, sender: str):
# ๐ข Send to all connected users
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = {
"sender": sender,
"message": message,
"timestamp": timestamp,
"emoji": "๐ฌ" if sender != "system" else "๐ข"
}
disconnected = []
for user_id, connection in self.active_connections.items():
try:
await connection.send_text(json.dumps(formatted_message))
except:
disconnected.append(user_id)
# ๐งน Clean up disconnected users
for user_id in disconnected:
self.disconnect(user_id)
# ๐ฏ Create manager instance
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
# ๐จ Receive and broadcast messages
data = await websocket.receive_text()
await manager.broadcast(data, user_id)
except WebSocketDisconnect:
manager.disconnect(user_id)
await manager.broadcast(f"{user_id} left the chat ๐", "system")
๐ฏ Try it yourself: Add private messaging between users or typing indicators!
๐ Example 2: Real-time Dashboard
Letโs create a live dashboard:
# ๐ Real-time metrics dashboard
from fastapi import FastAPI, WebSocket
import asyncio
import random
import json
from datetime import datetime
app = FastAPI()
# ๐ Simulated metrics generator
async def generate_metrics():
while True:
metrics = {
"timestamp": datetime.now().isoformat(),
"cpu": random.randint(20, 80),
"memory": random.randint(30, 90),
"requests_per_second": random.randint(100, 1000),
"active_users": random.randint(50, 500),
"emoji_mood": random.choice(["๐", "๐", "๐ช", "๐ฏ"])
}
return metrics
# ๐ฏ Connected dashboards
class DashboardManager:
def __init__(self):
self.connections: List[WebSocket] = []
self.is_broadcasting = False
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.connections.append(websocket)
# ๐ Start broadcasting if not already running
if not self.is_broadcasting:
self.is_broadcasting = True
asyncio.create_task(self.broadcast_metrics())
def disconnect(self, websocket: WebSocket):
self.connections.remove(websocket)
if not self.connections:
self.is_broadcasting = False
async def broadcast_metrics(self):
while self.is_broadcasting and self.connections:
# ๐ Generate new metrics
metrics = await generate_metrics()
# ๐ค Send to all dashboards
disconnected = []
for connection in self.connections:
try:
await connection.send_text(json.dumps(metrics))
except:
disconnected.append(connection)
# ๐งน Clean up
for conn in disconnected:
self.disconnect(conn)
# โฑ๏ธ Update every second
await asyncio.sleep(1)
dashboard_manager = DashboardManager()
@app.websocket("/ws/dashboard")
async def dashboard_websocket(websocket: WebSocket):
await dashboard_manager.connect(websocket)
try:
# ๐ Keep connection alive
while True:
await websocket.receive_text()
except:
dashboard_manager.disconnect(websocket)
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Authentication & Security
When youโre ready to level up, add WebSocket authentication:
# ๐ Secure WebSocket with authentication
from fastapi import FastAPI, WebSocket, Query, WebSocketDisconnect
from fastapi.exceptions import WebSocketException
import jwt
app = FastAPI()
SECRET_KEY = "your-secret-key-here-๐"
async def verify_token(token: str) -> str:
try:
# ๐ Verify JWT token
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload.get("user_id")
except:
raise WebSocketException(code=1008, reason="Invalid authentication")
@app.websocket("/ws/secure")
async def secure_websocket(
websocket: WebSocket,
token: str = Query(...) # ๐ซ Token required
):
# ๐ก๏ธ Verify authentication
user_id = await verify_token(token)
await websocket.accept()
await websocket.send_text(f"โ
Welcome {user_id}! You're authenticated ๐")
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Secure echo: {data} ๐ก๏ธ")
except WebSocketDisconnect:
print(f"๐ Secure connection closed for {user_id}")
๐๏ธ Advanced Topic 2: Broadcasting with Rooms
For the brave developers, hereโs room-based broadcasting:
# ๐ Room-based WebSocket system
from typing import Dict, Set
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
class RoomManager:
def __init__(self):
# ๐ช Rooms with their connections
self.rooms: Dict[str, Set[WebSocket]] = {}
async def join_room(self, room_id: str, websocket: WebSocket):
# ๐ Create room if doesn't exist
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(websocket)
await self.room_broadcast(room_id, {
"type": "user_joined",
"message": "A new user joined! ๐",
"room_members": len(self.rooms[room_id])
})
async def leave_room(self, room_id: str, websocket: WebSocket):
if room_id in self.rooms:
self.rooms[room_id].discard(websocket)
# ๐งน Clean empty rooms
if not self.rooms[room_id]:
del self.rooms[room_id]
else:
await self.room_broadcast(room_id, {
"type": "user_left",
"message": "A user left ๐",
"room_members": len(self.rooms[room_id])
})
async def room_broadcast(self, room_id: str, message: dict):
if room_id in self.rooms:
# ๐ข Send to everyone in the room
disconnected = set()
for connection in self.rooms[room_id]:
try:
await connection.send_text(json.dumps(message))
except:
disconnected.add(connection)
# ๐งน Remove disconnected
self.rooms[room_id] -= disconnected
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Forgetting to Handle Disconnections
# โ Wrong way - no disconnect handling!
@app.websocket("/ws")
async def bad_websocket(websocket: WebSocket):
await websocket.accept()
while True: # ๐ฅ This will crash on disconnect!
data = await websocket.receive_text()
await websocket.send_text(data)
# โ
Correct way - handle disconnections gracefully!
@app.websocket("/ws")
async def good_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(data)
except WebSocketDisconnect:
print("๐ Client disconnected gracefully")
except Exception as e:
print(f"โ ๏ธ Error: {e}")
๐คฏ Pitfall 2: Blocking Operations in WebSocket Handlers
# โ Dangerous - blocks all connections!
import time
@app.websocket("/ws")
async def blocking_websocket(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
time.sleep(5) # ๐ฅ This blocks everything!
await websocket.send_text("Response")
# โ
Safe - use async operations!
import asyncio
@app.websocket("/ws")
async def async_websocket(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await asyncio.sleep(5) # โ
Non-blocking!
await websocket.send_text("Response")
๐ ๏ธ Best Practices
- ๐ฏ Always Handle Disconnections: Use try-except for WebSocketDisconnect
- ๐ Use Connection Managers: Organize connections for scalability
- ๐ก๏ธ Implement Authentication: Secure your WebSocket endpoints
- ๐จ Send Structured Data: Use JSON for complex messages
- โจ Keep Connections Alive: Implement heartbeat/ping-pong mechanism
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Collaborative Drawing App
Create a real-time collaborative drawing application:
๐ Requirements:
- โ Multiple users can draw simultaneously
- ๐จ Different colors for each user
- ๐ฅ Show active users list
- ๐ Broadcast cursor positions
- ๐ Clear canvas option
๐ Bonus Points:
- Add drawing tools (pen, eraser, shapes)
- Implement undo/redo functionality
- Save and load drawings
๐ก Solution
๐ Click to see solution
# ๐จ Collaborative drawing app backend
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import json
import uuid
app = FastAPI()
class DrawingRoom:
def __init__(self):
# ๐จ Active artists
self.artists: Dict[str, WebSocket] = {}
# ๐ Drawing history
self.drawing_history: List[dict] = []
# ๐จ User colors
self.user_colors: Dict[str, str] = {}
async def add_artist(self, websocket: WebSocket) -> str:
artist_id = str(uuid.uuid4())
self.artists[artist_id] = websocket
# ๐จ Assign random color
import random
colors = ["#FF6B6B", "#4ECDC4", "#45B7D1", "#96CEB4", "#FFEAA7"]
self.user_colors[artist_id] = random.choice(colors)
# ๐ค Send initial data
await websocket.send_text(json.dumps({
"type": "init",
"artist_id": artist_id,
"color": self.user_colors[artist_id],
"history": self.drawing_history,
"active_users": len(self.artists)
}))
# ๐ข Announce new artist
await self.broadcast({
"type": "user_joined",
"message": f"New artist joined! ๐จ",
"active_users": len(self.artists)
}, exclude=artist_id)
return artist_id
async def remove_artist(self, artist_id: str):
if artist_id in self.artists:
del self.artists[artist_id]
del self.user_colors[artist_id]
await self.broadcast({
"type": "user_left",
"message": f"Artist left ๐",
"active_users": len(self.artists)
})
async def handle_drawing(self, artist_id: str, data: dict):
# ๐จ Add artist info to drawing data
data["artist_id"] = artist_id
data["color"] = self.user_colors[artist_id]
# ๐ Store in history
if data["type"] == "draw":
self.drawing_history.append(data)
# ๐ข Broadcast to all other artists
await self.broadcast(data, exclude=artist_id)
async def clear_canvas(self):
self.drawing_history = []
await self.broadcast({
"type": "clear",
"message": "Canvas cleared! ๐งน"
})
async def broadcast(self, data: dict, exclude: str = None):
disconnected = []
for artist_id, websocket in self.artists.items():
if artist_id != exclude:
try:
await websocket.send_text(json.dumps(data))
except:
disconnected.append(artist_id)
# ๐งน Clean up
for artist_id in disconnected:
await self.remove_artist(artist_id)
# ๐ฏ Create drawing room
drawing_room = DrawingRoom()
@app.websocket("/ws/draw")
async def drawing_websocket(websocket: WebSocket):
await websocket.accept()
artist_id = await drawing_room.add_artist(websocket)
try:
while True:
# ๐จ Receive drawing data
data = await websocket.receive_text()
drawing_data = json.loads(data)
if drawing_data["type"] == "clear":
await drawing_room.clear_canvas()
else:
await drawing_room.handle_drawing(artist_id, drawing_data)
except WebSocketDisconnect:
await drawing_room.remove_artist(artist_id)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Create WebSocket endpoints with FastAPI ๐ช
- โ Handle real-time communication between server and clients ๐ก๏ธ
- โ Build connection managers for scalable applications ๐ฏ
- โ Implement authentication for secure WebSockets ๐
- โ Create amazing real-time features like chat, dashboards, and collaborative tools! ๐
Remember: WebSockets open up a world of real-time possibilities! Theyโre perfect for building interactive, engaging applications. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered FastAPI WebSockets!
Hereโs what to do next:
- ๐ป Build the collaborative drawing app from the exercise
- ๐๏ธ Add WebSockets to an existing FastAPI project
- ๐ Explore WebSocket subprotocols and advanced features
- ๐ Create your own real-time application!
Remember: Real-time features can transform user experience. Keep experimenting, keep building, and most importantly, have fun creating live, interactive applications! ๐
Happy real-time coding! ๐๐โจ