Prerequisites
- Understanding of HTTP and browser APIs 📝
- Basic knowledge of event-driven programming ⚡
- Experience with TypeScript interfaces and types 💻
What you'll learn
- Master Server-Sent Events for real-time streaming 🎯
- Build type-safe SSE clients with automatic reconnection 🏗️
- Implement scalable server-side event streaming 🐛
- Create production-ready real-time notification systems ✨
🎯 Introduction
Welcome to the world of Server-Sent Events! 📡 If WebSockets are like having a phone conversation (two-way), then Server-Sent Events (SSE) are like listening to a live radio broadcast - perfect for when you need real-time updates flowing from server to client!
Think of SSE as a persistent connection 📻 where your server can push live updates, notifications, news feeds, stock prices, or any real-time data directly to your clients. It’s simpler than WebSockets when you only need one-way communication, and it comes with built-in browser support for automatic reconnection!
By the end of this tutorial, you’ll be a master of real-time streaming, able to build live dashboards, notification systems, chat feeds, and any application that needs instant server-to-client updates. Let’s start streaming! 🌊
📚 Understanding Server-Sent Events
🤔 What Are Server-Sent Events?
Server-Sent Events provide a simple way for servers to push data to web pages over HTTP. Unlike WebSockets, SSE is unidirectional - only the server can send data to the client. But that’s perfect for many real-time use cases!
// 🌟 Basic Server-Sent Events connection
// client.ts - Browser SSE client
class BasicSSEClient {
private eventSource: EventSource | null = null;
connect(url: string): void {
console.log('📡 Connecting to SSE stream...');
// 🔌 Create EventSource connection
this.eventSource = new EventSource(url);
// 🎧 Listen for incoming messages
this.eventSource.onmessage = (event) => {
console.log('📨 Message received:', event.data);
this.handleMessage(JSON.parse(event.data));
};
// ✅ Connection opened successfully
this.eventSource.onopen = () => {
console.log('✅ SSE connection established!');
};
// ❌ Handle connection errors
this.eventSource.onerror = (error) => {
console.error('❌ SSE connection error:', error);
// 🔍 Check connection state
if (this.eventSource?.readyState === EventSource.CLOSED) {
console.log('🔌 SSE connection closed');
} else if (this.eventSource?.readyState === EventSource.CONNECTING) {
console.log('🔄 SSE reconnecting...');
}
};
}
private handleMessage(data: any): void {
// Override in subclasses or provide callback
console.log('📥 Processing message:', data);
}
disconnect(): void {
if (this.eventSource) {
console.log('🔌 Closing SSE connection...');
this.eventSource.close();
this.eventSource = null;
}
}
}
// 🚀 Simple usage
const sseClient = new BasicSSEClient();
sseClient.connect('/api/events');
💡 Key Characteristics
- 📡 Server-to-Client Only: Unidirectional communication
- 🔄 Automatic Reconnection: Built-in reconnection on connection loss
- 📦 Text-Based Protocol: Simple HTTP-based streaming
- 🎯 Event-Driven: Native browser EventSource API support
// 🎨 Type-safe SSE message system
interface SSEMessage {
type: string;
timestamp: number;
id: string;
}
interface NotificationMessage extends SSEMessage {
type: 'notification';
title: string;
message: string;
severity: 'info' | 'warning' | 'error' | 'success';
}
interface StockUpdateMessage extends SSEMessage {
type: 'stock_update';
symbol: string;
price: number;
change: number;
changePercent: number;
}
interface SystemStatusMessage extends SSEMessage {
type: 'system_status';
service: string;
status: 'online' | 'offline' | 'degraded';
uptime: number;
}
interface ChatMessage extends SSEMessage {
type: 'chat_message';
username: string;
content: string;
roomId: string;
}
// 🎯 Union type for all possible SSE messages
type SSEEventMessage = NotificationMessage | StockUpdateMessage | SystemStatusMessage | ChatMessage;
// 📦 Message factory for type safety
class SSEMessageFactory {
static createNotification(
title: string,
message: string,
severity: NotificationMessage['severity']
): NotificationMessage {
return {
type: 'notification',
title,
message,
severity,
timestamp: Date.now(),
id: crypto.randomUUID()
};
}
static createStockUpdate(
symbol: string,
price: number,
change: number
): StockUpdateMessage {
return {
type: 'stock_update',
symbol,
price,
change,
changePercent: (change / (price - change)) * 100,
timestamp: Date.now(),
id: crypto.randomUUID()
};
}
static createSystemStatus(
service: string,
status: SystemStatusMessage['status'],
uptime: number
): SystemStatusMessage {
return {
type: 'system_status',
service,
status,
uptime,
timestamp: Date.now(),
id: crypto.randomUUID()
};
}
}
🆚 SSE vs Other Real-Time Technologies
// 📊 Comparison of real-time communication methods
// 🐌 HTTP Polling - inefficient, not real-time
class PollingClient {
private intervalId: number | null = null;
startPolling(url: string, interval = 5000): void {
// ❌ Wasteful - constant HTTP requests
this.intervalId = window.setInterval(async () => {
try {
const response = await fetch(url);
const data = await response.json();
console.log('📊 Polled data:', data);
} catch (error) {
console.error('Polling error:', error);
}
}, interval);
}
stopPolling(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
}
// 📡 Server-Sent Events - efficient one-way streaming
class SSEClient {
private eventSource: EventSource | null = null;
connect(url: string): void {
// ✅ Efficient persistent connection
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
console.log('📡 Real-time update:', JSON.parse(event.data));
};
}
disconnect(): void {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// 🔌 WebSocket - bidirectional communication
class WebSocketClient {
private socket: WebSocket | null = null;
connect(url: string): void {
// ✅ Bidirectional but more complex
this.socket = new WebSocket(url);
this.socket.onmessage = (event) => {
console.log('🔌 WebSocket message:', JSON.parse(event.data));
};
// Can send messages back to server
setTimeout(() => {
this.send({ type: 'ping' });
}, 1000);
}
send(data: any): void {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(data));
}
}
}
// 🎯 Use case comparison
const useCases = {
// ✅ Use SSE for:
sse: [
'📊 Live dashboards and metrics',
'🔔 Real-time notifications',
'📰 News feeds and updates',
'💹 Stock price feeds',
'📝 Activity logs',
'🎮 Live scoreboards',
'📈 Analytics data streams'
],
// ✅ Use WebSockets for:
websockets: [
'💬 Chat applications',
'🎮 Real-time games',
'✏️ Collaborative editing',
'🎥 Video conferencing',
'🎯 Any bidirectional communication'
],
// ✅ Use Polling for:
polling: [
'🗂️ Infrequent updates',
'📊 Non-critical data',
'🔧 Simple implementations',
'🌐 Limited browser support scenarios'
]
};
🏗️ Building a Type-Safe SSE Client
🎯 Advanced SSE Client Implementation
Let’s build a production-ready SSE client with TypeScript that handles all the edge cases:
// 🏗️ Advanced SSE client with full TypeScript support
interface SSEClientConfig {
autoReconnect?: boolean;
maxReconnectAttempts?: number;
reconnectDelay?: number;
withCredentials?: boolean;
customHeaders?: Record<string, string>;
}
interface SSEClientEvents {
connected: () => void;
disconnected: (reason: string) => void;
message: (message: SSEEventMessage) => void;
error: (error: Error) => void;
reconnecting: (attempt: number) => void;
}
class TypeSafeSSEClient {
private eventSource: EventSource | null = null;
private reconnectAttempts = 0;
private reconnectTimer: number | null = null;
private eventListeners: Partial<SSEClientEvents> = {};
private lastEventId: string | null = null;
constructor(
private url: string,
private config: SSEClientConfig = {}
) {
// 🎛️ Default configuration
this.config = {
autoReconnect: true,
maxReconnectAttempts: 5,
reconnectDelay: 1000,
withCredentials: false,
...config
};
}
// 🎧 Event listener management
on<K extends keyof SSEClientEvents>(
event: K,
listener: SSEClientEvents[K]
): void {
this.eventListeners[event] = listener;
}
off<K extends keyof SSEClientEvents>(event: K): void {
delete this.eventListeners[event];
}
private emit<K extends keyof SSEClientEvents>(
event: K,
...args: Parameters<NonNullable<SSEClientEvents[K]>>
): void {
const listener = this.eventListeners[event];
if (listener) {
(listener as any)(...args);
}
}
// 🔌 Connection management
connect(): void {
console.log('📡 Initiating SSE connection...');
try {
// 🔧 Build URL with Last-Event-ID if available
const urlWithParams = this.buildUrlWithParams();
// 🏗️ Create EventSource with configuration
this.eventSource = new EventSource(urlWithParams, {
withCredentials: this.config.withCredentials
});
this.setupEventHandlers();
} catch (error) {
console.error('❌ Failed to create SSE connection:', error);
this.emit('error', new Error('Failed to create SSE connection'));
}
}
private buildUrlWithParams(): string {
const url = new URL(this.url, window.location.origin);
// 🔄 Add Last-Event-ID for reliable reconnection
if (this.lastEventId) {
url.searchParams.set('lastEventId', this.lastEventId);
}
return url.toString();
}
private setupEventHandlers(): void {
if (!this.eventSource) return;
// ✅ Connection opened
this.eventSource.onopen = () => {
console.log('✅ SSE connection established!');
this.reconnectAttempts = 0;
this.clearReconnectTimer();
this.emit('connected');
};
// 📨 Message received
this.eventSource.onmessage = (event) => {
try {
// 🔖 Store Last-Event-ID for reconnection
if (event.lastEventId) {
this.lastEventId = event.lastEventId;
}
const message: SSEEventMessage = JSON.parse(event.data);
console.log('📨 SSE message received:', message.type);
this.emit('message', message);
} catch (error) {
console.error('❌ Failed to parse SSE message:', error);
this.emit('error', new Error('Failed to parse SSE message'));
}
};
// ❌ Connection error or closed
this.eventSource.onerror = (error) => {
console.error('❌ SSE connection error:', error);
const readyState = this.eventSource?.readyState;
if (readyState === EventSource.CLOSED) {
console.log('🔌 SSE connection closed');
this.emit('disconnected', 'Connection closed');
if (this.config.autoReconnect) {
this.attemptReconnect();
}
} else if (readyState === EventSource.CONNECTING) {
console.log('🔄 SSE reconnecting...');
} else {
this.emit('error', new Error('SSE connection error'));
}
};
// 🎯 Custom event listeners for different message types
this.setupCustomEventListeners();
}
private setupCustomEventListeners(): void {
if (!this.eventSource) return;
// 🔔 Notification events
this.eventSource.addEventListener('notification', (event) => {
try {
const message: NotificationMessage = JSON.parse(event.data);
this.emit('message', message);
} catch (error) {
console.error('❌ Failed to parse notification:', error);
}
});
// 💹 Stock update events
this.eventSource.addEventListener('stock_update', (event) => {
try {
const message: StockUpdateMessage = JSON.parse(event.data);
this.emit('message', message);
} catch (error) {
console.error('❌ Failed to parse stock update:', error);
}
});
// 🔧 System status events
this.eventSource.addEventListener('system_status', (event) => {
try {
const message: SystemStatusMessage = JSON.parse(event.data);
this.emit('message', message);
} catch (error) {
console.error('❌ Failed to parse system status:', error);
}
});
}
// 🔄 Reconnection logic
private attemptReconnect(): void {
if (this.reconnectAttempts >= (this.config.maxReconnectAttempts || 5)) {
console.error('💥 Max reconnection attempts exceeded');
this.emit('error', new Error('Max reconnection attempts exceeded'));
return;
}
this.reconnectAttempts++;
const delay = (this.config.reconnectDelay || 1000) * this.reconnectAttempts;
console.log(`🔄 Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})...`);
this.emit('reconnecting', this.reconnectAttempts);
this.reconnectTimer = window.setTimeout(() => {
this.connect();
}, delay);
}
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
// 🔍 Connection state
isConnected(): boolean {
return this.eventSource?.readyState === EventSource.OPEN;
}
getReadyState(): string {
if (!this.eventSource) return 'DISCONNECTED';
switch (this.eventSource.readyState) {
case EventSource.CONNECTING: return 'CONNECTING';
case EventSource.OPEN: return 'OPEN';
case EventSource.CLOSED: return 'CLOSED';
default: return 'UNKNOWN';
}
}
// 🔌 Disconnect
disconnect(): void {
console.log('🔌 Disconnecting SSE...');
this.config.autoReconnect = false; // Prevent auto-reconnect
this.clearReconnectTimer();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.lastEventId = null;
this.reconnectAttempts = 0;
}
}
🛠️ Real-World Notification System
Let’s build a complete notification system using SSE:
// 🔔 Real-time notification system using SSE
interface NotificationOptions {
autoClose?: boolean;
autoCloseDelay?: number;
showTimestamp?: boolean;
allowDismiss?: boolean;
}
interface DisplayedNotification extends NotificationMessage {
displayId: string;
createdAt: Date;
dismissed: boolean;
}
class RealTimeNotificationSystem {
private sseClient: TypeSafeSSEClient;
private notifications: Map<string, DisplayedNotification> = new Map();
private notificationContainer: HTMLElement;
private maxNotifications = 5;
constructor(private sseUrl: string) {
this.sseClient = new TypeSafeSSEClient(sseUrl, {
autoReconnect: true,
maxReconnectAttempts: 10,
reconnectDelay: 2000
});
this.setupNotificationContainer();
this.setupSSEEventHandlers();
}
private setupNotificationContainer(): void {
// 🎨 Create notification container if it doesn't exist
this.notificationContainer = document.getElementById('notifications')
|| this.createNotificationContainer();
}
private createNotificationContainer(): HTMLElement {
const container = document.createElement('div');
container.id = 'notifications';
container.className = 'notification-container';
container.style.cssText = `
position: fixed;
top: 20px;
right: 20px;
z-index: 10000;
max-width: 400px;
pointer-events: none;
`;
document.body.appendChild(container);
return container;
}
private setupSSEEventHandlers(): void {
// 🎧 Handle SSE connection events
this.sseClient.on('connected', () => {
console.log('🎉 Notification system connected!');
this.showSystemNotification('Connected to notification service', 'success');
});
this.sseClient.on('disconnected', (reason) => {
console.log('😞 Notification system disconnected:', reason);
this.showSystemNotification('Disconnected from notification service', 'warning');
});
this.sseClient.on('reconnecting', (attempt) => {
console.log(`🔄 Reconnecting to notification service (attempt ${attempt})...`);
});
this.sseClient.on('error', (error) => {
console.error('💥 Notification system error:', error);
this.showSystemNotification('Notification service error', 'error');
});
this.sseClient.on('message', (message) => {
this.handleIncomingMessage(message);
});
}
// 🚀 Initialize the notification system
async initialize(): Promise<void> {
try {
this.sseClient.connect();
console.log('✅ Notification system initialized!');
} catch (error) {
console.error('❌ Failed to initialize notification system:', error);
throw error;
}
}
// 📨 Handle incoming SSE messages
private handleIncomingMessage(message: SSEEventMessage): void {
switch (message.type) {
case 'notification':
this.handleNotification(message);
break;
case 'stock_update':
this.handleStockUpdate(message);
break;
case 'system_status':
this.handleSystemStatus(message);
break;
case 'chat_message':
this.handleChatMessage(message);
break;
default:
console.warn('🤷 Unknown message type:', (message as any).type);
}
}
private handleNotification(notification: NotificationMessage): void {
console.log(`🔔 Notification: ${notification.title}`);
this.displayNotification(notification, {
autoClose: notification.severity !== 'error',
autoCloseDelay: 5000,
showTimestamp: true,
allowDismiss: true
});
}
private handleStockUpdate(stockUpdate: StockUpdateMessage): void {
console.log(`💹 Stock Update: ${stockUpdate.symbol} - $${stockUpdate.price}`);
// 📊 Create notification for significant price changes
if (Math.abs(stockUpdate.changePercent) > 5) {
const notification = SSEMessageFactory.createNotification(
`${stockUpdate.symbol} Alert`,
`Price ${stockUpdate.change > 0 ? 'increased' : 'decreased'} by ${stockUpdate.changePercent.toFixed(2)}%`,
stockUpdate.change > 0 ? 'success' : 'warning'
);
this.displayNotification(notification, {
autoClose: true,
autoCloseDelay: 8000
});
}
}
private handleSystemStatus(status: SystemStatusMessage): void {
console.log(`🔧 System Status: ${status.service} - ${status.status}`);
// 🚨 Show notifications for service issues
if (status.status !== 'online') {
const notification = SSEMessageFactory.createNotification(
'Service Alert',
`${status.service} is currently ${status.status}`,
status.status === 'offline' ? 'error' : 'warning'
);
this.displayNotification(notification, {
autoClose: status.status === 'degraded',
autoCloseDelay: 10000
});
}
}
private handleChatMessage(chatMessage: ChatMessage): void {
console.log(`💬 Chat: ${chatMessage.username}: ${chatMessage.content}`);
// 🔔 Show notification for chat messages (if not in focus)
if (document.hidden) {
const notification = SSEMessageFactory.createNotification(
`New message from ${chatMessage.username}`,
chatMessage.content,
'info'
);
this.displayNotification(notification, {
autoClose: true,
autoCloseDelay: 4000
});
}
}
// 🎨 Display notification in UI
private displayNotification(
notification: NotificationMessage,
options: NotificationOptions = {}
): void {
const displayNotification: DisplayedNotification = {
...notification,
displayId: `notif-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
createdAt: new Date(),
dismissed: false
};
// 🧹 Remove oldest notifications if at limit
this.enforceNotificationLimit();
// 📦 Store notification
this.notifications.set(displayNotification.displayId, displayNotification);
// 🎨 Create and display notification element
const notificationElement = this.createNotificationElement(displayNotification, options);
this.notificationContainer.appendChild(notificationElement);
// ⏰ Auto-close if configured
if (options.autoClose) {
setTimeout(() => {
this.dismissNotification(displayNotification.displayId);
}, options.autoCloseDelay || 5000);
}
// 📢 Trigger browser notification if page is not visible
this.triggerBrowserNotification(notification);
}
private createNotificationElement(
notification: DisplayedNotification,
options: NotificationOptions
): HTMLElement {
const element = document.createElement('div');
element.id = notification.displayId;
element.className = `notification notification-${notification.severity}`;
element.style.cssText = `
background: white;
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15);
margin-bottom: 12px;
padding: 16px;
border-left: 4px solid ${this.getSeverityColor(notification.severity)};
opacity: 0;
transform: translateX(100%);
transition: all 0.3s ease;
pointer-events: auto;
max-width: 100%;
word-wrap: break-word;
`;
// 📝 Create notification content
const content = `
<div style="display: flex; justify-content: space-between; align-items: flex-start;">
<div style="flex: 1;">
<h4 style="margin: 0 0 8px 0; font-weight: 600; color: #333;">
${this.getSeverityIcon(notification.severity)} ${notification.title}
</h4>
<p style="margin: 0; color: #666; line-height: 1.4;">
${notification.message}
</p>
${options.showTimestamp ? `
<small style="color: #999; font-size: 0.8em; margin-top: 8px; display: block;">
${notification.createdAt.toLocaleTimeString()}
</small>
` : ''}
</div>
${options.allowDismiss ? `
<button
onclick="notificationSystem.dismissNotification('${notification.displayId}')"
style="
background: none;
border: none;
font-size: 18px;
cursor: pointer;
color: #999;
padding: 0;
margin-left: 12px;
"
>×</button>
` : ''}
</div>
`;
element.innerHTML = content;
// 🎭 Animate in
setTimeout(() => {
element.style.opacity = '1';
element.style.transform = 'translateX(0)';
}, 10);
return element;
}
private getSeverityColor(severity: NotificationMessage['severity']): string {
const colors = {
success: '#10B981',
info: '#3B82F6',
warning: '#F59E0B',
error: '#EF4444'
};
return colors[severity];
}
private getSeverityIcon(severity: NotificationMessage['severity']): string {
const icons = {
success: '✅',
info: 'ℹ️',
warning: '⚠️',
error: '❌'
};
return icons[severity];
}
// 🗑️ Dismiss notification
dismissNotification(displayId: string): void {
const notification = this.notifications.get(displayId);
if (!notification || notification.dismissed) return;
const element = document.getElementById(displayId);
if (element) {
// 🎭 Animate out
element.style.opacity = '0';
element.style.transform = 'translateX(100%)';
setTimeout(() => {
element.remove();
}, 300);
}
// 🧹 Mark as dismissed
notification.dismissed = true;
this.notifications.delete(displayId);
}
private enforceNotificationLimit(): void {
const activeNotifications = Array.from(this.notifications.values())
.filter(n => !n.dismissed)
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
// 🗑️ Remove oldest notifications if over limit
while (activeNotifications.length >= this.maxNotifications) {
const oldest = activeNotifications.shift();
if (oldest) {
this.dismissNotification(oldest.displayId);
}
}
}
// 📢 Trigger browser notification
private async triggerBrowserNotification(notification: NotificationMessage): Promise<void> {
if (!('Notification' in window) || document.visibilityState === 'visible') {
return;
}
// 🔔 Request permission if needed
if (Notification.permission === 'default') {
await Notification.requestPermission();
}
if (Notification.permission === 'granted') {
new Notification(notification.title, {
body: notification.message,
icon: '/favicon.ico',
tag: notification.id,
requireInteraction: notification.severity === 'error'
});
}
}
// 📊 Show system notification
private showSystemNotification(message: string, severity: NotificationMessage['severity']): void {
const notification = SSEMessageFactory.createNotification(
'System',
message,
severity
);
this.displayNotification(notification, {
autoClose: true,
autoCloseDelay: 3000,
showTimestamp: false
});
}
// 📊 Get notification statistics
getStats(): any {
const all = Array.from(this.notifications.values());
const active = all.filter(n => !n.dismissed);
return {
total: all.length,
active: active.length,
dismissed: all.length - active.length,
byseverity: {
success: all.filter(n => n.severity === 'success').length,
info: all.filter(n => n.severity === 'info').length,
warning: all.filter(n => n.severity === 'warning').length,
error: all.filter(n => n.severity === 'error').length
},
connectionState: this.sseClient.getReadyState()
};
}
// 🔌 Disconnect from SSE
disconnect(): void {
this.sseClient.disconnect();
// 🧹 Clear all notifications
this.notifications.forEach((_, displayId) => {
this.dismissNotification(displayId);
});
}
}
// 🌍 Global instance for use in HTML onclick handlers
let notificationSystem: RealTimeNotificationSystem;
// 🚀 Initialize notification system
async function initializeNotificationSystem(): Promise<void> {
notificationSystem = new RealTimeNotificationSystem('/api/sse');
await notificationSystem.initialize();
}
🛠️ Server-Side SSE Implementation
🏗️ Node.js SSE Server with TypeScript
Let’s create a robust SSE server using Node.js and Express:
// 🖥️ Server-side SSE implementation with Node.js and Express
import express from 'express';
import cors from 'cors';
import { v4 as uuidv4 } from 'uuid';
interface SSEConnection {
id: string;
response: express.Response;
userId?: string;
lastEventId?: string;
connectedAt: Date;
isAlive: boolean;
}
interface SSEChannel {
name: string;
connections: Map<string, SSEConnection>;
messageHistory: Array<{ id: string; data: any; type?: string; timestamp: Date }>;
maxHistorySize: number;
}
class SSEServer {
private app: express.Application;
private connections = new Map<string, SSEConnection>();
private channels = new Map<string, SSEChannel>();
private heartbeatInterval: NodeJS.Timeout;
constructor(private port = 3000) {
this.app = express();
this.setupMiddleware();
this.setupRoutes();
this.setupHeartbeat();
// 🏗️ Create default channels
this.createChannel('notifications', 100);
this.createChannel('stock-updates', 50);
this.createChannel('system-status', 20);
this.createChannel('chat', 200);
}
private setupMiddleware(): void {
// 🌐 CORS configuration for SSE
this.app.use(cors({
origin: true,
credentials: true
}));
this.app.use(express.json());
this.app.use(express.static('public'));
// 📝 Request logging
this.app.use((req, res, next) => {
console.log(`${new Date().toISOString()} - ${req.method} ${req.path}`);
next();
});
}
private setupRoutes(): void {
// 📡 Main SSE endpoint
this.app.get('/api/sse', (req, res) => {
this.handleSSEConnection(req, res);
});
// 📡 Channel-specific SSE endpoints
this.app.get('/api/sse/:channel', (req, res) => {
this.handleSSEConnection(req, res, req.params.channel);
});
// 📤 Send notification endpoint
this.app.post('/api/send-notification', (req, res) => {
this.sendNotification(req.body);
res.json({ success: true });
});
// 📊 Send stock update endpoint
this.app.post('/api/send-stock-update', (req, res) => {
this.sendStockUpdate(req.body);
res.json({ success: true });
});
// 🔧 Send system status endpoint
this.app.post('/api/send-system-status', (req, res) => {
this.sendSystemStatus(req.body);
res.json({ success: true });
});
// 📊 Server statistics endpoint
this.app.get('/api/sse-stats', (req, res) => {
res.json(this.getServerStats());
});
// 🏠 Demo page
this.app.get('/', (req, res) => {
res.send(this.generateDemoPage());
});
}
private handleSSEConnection(
req: express.Request,
res: express.Response,
channelName = 'general'
): void {
const connectionId = uuidv4();
const lastEventId = req.headers['last-event-id'] as string || req.query.lastEventId as string;
console.log(`📡 New SSE connection: ${connectionId} to channel: ${channelName}`);
// 🔧 Setup SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control, Last-Event-ID',
'Access-Control-Allow-Credentials': 'true'
});
// 📦 Create connection object
const connection: SSEConnection = {
id: connectionId,
response: res,
userId: req.query.userId as string,
lastEventId,
connectedAt: new Date(),
isAlive: true
};
// 🗂️ Store connection
this.connections.set(connectionId, connection);
// 📡 Add to channel
this.addConnectionToChannel(connection, channelName);
// 👋 Send welcome message
this.sendToConnection(connection, {
type: 'connected',
message: `Connected to SSE channel: ${channelName}`,
connectionId,
timestamp: Date.now()
});
// 📜 Send missed messages if Last-Event-ID provided
if (lastEventId) {
this.sendMissedMessages(connection, channelName, lastEventId);
}
// 🎧 Handle connection close
req.on('close', () => {
console.log(`🔌 SSE connection closed: ${connectionId}`);
this.removeConnection(connectionId, channelName);
});
req.on('error', (error) => {
console.error(`❌ SSE connection error: ${connectionId}`, error);
this.removeConnection(connectionId, channelName);
});
}
private createChannel(name: string, maxHistorySize = 100): void {
const channel: SSEChannel = {
name,
connections: new Map(),
messageHistory: [],
maxHistorySize
};
this.channels.set(name, channel);
console.log(`📺 Created SSE channel: ${name}`);
}
private addConnectionToChannel(connection: SSEConnection, channelName: string): void {
const channel = this.channels.get(channelName);
if (!channel) {
console.warn(`⚠️ Channel not found: ${channelName}`);
return;
}
channel.connections.set(connection.id, connection);
}
private removeConnection(connectionId: string, channelName: string): void {
// 🗑️ Remove from global connections
this.connections.delete(connectionId);
// 🗑️ Remove from channel
const channel = this.channels.get(channelName);
if (channel) {
channel.connections.delete(connectionId);
}
console.log(`🗑️ Removed connection: ${connectionId}`);
}
// 📤 Send message to specific connection
private sendToConnection(connection: SSEConnection, data: any, eventType?: string): void {
if (!connection.isAlive) return;
try {
const eventId = uuidv4();
const formattedData = JSON.stringify(data);
let message = `id: ${eventId}\n`;
if (eventType) {
message += `event: ${eventType}\n`;
}
message += `data: ${formattedData}\n\n`;
connection.response.write(message);
console.log(`📤 Sent to ${connection.id}: ${eventType || 'message'}`);
} catch (error) {
console.error(`❌ Failed to send to ${connection.id}:`, error);
connection.isAlive = false;
}
}
// 📢 Broadcast to all connections in channel
private broadcastToChannel(channelName: string, data: any, eventType?: string): void {
const channel = this.channels.get(channelName);
if (!channel) return;
const message = {
id: uuidv4(),
data,
type: eventType,
timestamp: new Date()
};
// 💾 Store in channel history
channel.messageHistory.push(message);
if (channel.messageHistory.length > channel.maxHistorySize) {
channel.messageHistory = channel.messageHistory.slice(-channel.maxHistorySize);
}
// 📡 Send to all connections
let sentCount = 0;
channel.connections.forEach((connection) => {
this.sendToConnection(connection, data, eventType);
sentCount++;
});
console.log(`📢 Broadcast to ${channelName}: ${sentCount} connections`);
}
// 📜 Send missed messages since Last-Event-ID
private sendMissedMessages(connection: SSEConnection, channelName: string, lastEventId: string): void {
const channel = this.channels.get(channelName);
if (!channel) return;
// 🔍 Find messages after lastEventId
const lastIndex = channel.messageHistory.findIndex(msg => msg.id === lastEventId);
if (lastIndex === -1) {
// Send all recent messages if lastEventId not found
channel.messageHistory.slice(-10).forEach(msg => {
this.sendToConnection(connection, msg.data, msg.type);
});
return;
}
// 📤 Send messages after the last received one
const missedMessages = channel.messageHistory.slice(lastIndex + 1);
missedMessages.forEach(msg => {
this.sendToConnection(connection, msg.data, msg.type);
});
console.log(`📜 Sent ${missedMessages.length} missed messages to ${connection.id}`);
}
// 💓 Heartbeat to detect dead connections
private setupHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
this.performHeartbeat();
}, 30000); // Every 30 seconds
}
private performHeartbeat(): void {
console.log('💓 Performing SSE heartbeat...');
this.connections.forEach((connection, connectionId) => {
try {
// 💓 Send heartbeat ping
this.sendToConnection(connection, {
type: 'heartbeat',
timestamp: Date.now()
}, 'heartbeat');
} catch (error) {
console.log(`💀 Removing dead connection: ${connectionId}`);
connection.isAlive = false;
this.connections.delete(connectionId);
}
});
// 🧹 Clean up dead connections from channels
this.channels.forEach((channel) => {
const deadConnections: string[] = [];
channel.connections.forEach((connection, connectionId) => {
if (!connection.isAlive) {
deadConnections.push(connectionId);
}
});
deadConnections.forEach(connectionId => {
channel.connections.delete(connectionId);
});
});
}
// 🔔 Send notification
sendNotification(notification: NotificationMessage): void {
console.log(`🔔 Sending notification: ${notification.title}`);
this.broadcastToChannel('notifications', notification, 'notification');
}
// 💹 Send stock update
sendStockUpdate(stockUpdate: StockUpdateMessage): void {
console.log(`💹 Sending stock update: ${stockUpdate.symbol}`);
this.broadcastToChannel('stock-updates', stockUpdate, 'stock_update');
}
// 🔧 Send system status
sendSystemStatus(status: SystemStatusMessage): void {
console.log(`🔧 Sending system status: ${status.service}`);
this.broadcastToChannel('system-status', status, 'system_status');
}
// 📊 Get server statistics
private getServerStats(): any {
const totalConnections = this.connections.size;
const channelStats = Array.from(this.channels.entries()).map(([name, channel]) => ({
name,
connections: channel.connections.size,
messageHistory: channel.messageHistory.length
}));
return {
totalConnections,
channels: channelStats,
uptime: process.uptime(),
memoryUsage: process.memoryUsage()
};
}
// 🎭 Generate demo page
private generateDemoPage(): string {
return `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE Demo - Real-Time Notifications</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 1200px; margin: 0 auto; }
.controls { background: white; padding: 20px; border-radius: 8px; margin-bottom: 20px; }
.status { padding: 10px; border-radius: 4px; margin-bottom: 10px; }
.connected { background: #d4edda; color: #155724; }
.disconnected { background: #f8d7da; color: #721c24; }
button { padding: 10px 20px; margin: 5px; border: none; border-radius: 4px; cursor: pointer; }
.btn-primary { background: #007bff; color: white; }
.btn-success { background: #28a745; color: white; }
.btn-warning { background: #ffc107; color: black; }
.btn-danger { background: #dc3545; color: white; }
#notifications { position: fixed; top: 20px; right: 20px; width: 300px; z-index: 1000; }
</style>
</head>
<body>
<div class="container">
<h1>📡 Server-Sent Events Demo</h1>
<div class="controls">
<h2>Connection Control</h2>
<div id="status" class="status disconnected">Disconnected</div>
<button class="btn-primary" onclick="connect()">Connect</button>
<button class="btn-danger" onclick="disconnect()">Disconnect</button>
</div>
<div class="controls">
<h2>Send Test Messages</h2>
<button class="btn-success" onclick="sendNotification('success')">Success Notification</button>
<button class="btn-warning" onclick="sendNotification('warning')">Warning Notification</button>
<button class="btn-danger" onclick="sendNotification('error')">Error Notification</button>
<button class="btn-primary" onclick="sendStockUpdate()">Stock Update</button>
<button class="btn-primary" onclick="sendSystemStatus()">System Status</button>
</div>
<div class="controls">
<h2>Message Log</h2>
<div id="messageLog" style="height: 300px; overflow-y: auto; background: #f8f9fa; padding: 10px; border-radius: 4px;"></div>
</div>
</div>
<div id="notifications"></div>
<script>
let eventSource = null;
let messageCount = 0;
function connect() {
if (eventSource) return;
eventSource = new EventSource('/api/sse');
eventSource.onopen = () => {
updateStatus('Connected', true);
logMessage('✅ Connected to SSE stream');
};
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
logMessage('📨 Message: ' + JSON.stringify(data, null, 2));
};
eventSource.onerror = (error) => {
updateStatus('Error/Reconnecting', false);
logMessage('❌ Connection error');
};
}
function disconnect() {
if (eventSource) {
eventSource.close();
eventSource = null;
updateStatus('Disconnected', false);
logMessage('🔌 Disconnected from SSE stream');
}
}
function updateStatus(message, connected) {
const status = document.getElementById('status');
status.textContent = message;
status.className = 'status ' + (connected ? 'connected' : 'disconnected');
}
function logMessage(message) {
const log = document.getElementById('messageLog');
const time = new Date().toLocaleTimeString();
log.innerHTML += '<div>' + time + ' - ' + message + '</div>';
log.scrollTop = log.scrollHeight;
}
async function sendNotification(severity) {
await fetch('/api/send-notification', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 'notification',
title: 'Test Notification',
message: 'This is a test ' + severity + ' notification',
severity: severity,
timestamp: Date.now(),
id: 'test-' + Date.now()
})
});
}
async function sendStockUpdate() {
await fetch('/api/send-stock-update', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 'stock_update',
symbol: 'AAPL',
price: 150 + Math.random() * 20,
change: (Math.random() - 0.5) * 10,
timestamp: Date.now(),
id: 'stock-' + Date.now()
})
});
}
async function sendSystemStatus() {
const statuses = ['online', 'offline', 'degraded'];
const services = ['API', 'Database', 'Cache', 'Queue'];
await fetch('/api/send-system-status', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
type: 'system_status',
service: services[Math.floor(Math.random() * services.length)],
status: statuses[Math.floor(Math.random() * statuses.length)],
uptime: Math.random() * 1000000,
timestamp: Date.now(),
id: 'status-' + Date.now()
})
});
}
// Auto-connect on page load
window.addEventListener('load', () => {
connect();
});
// Handle page unload
window.addEventListener('beforeunload', () => {
disconnect();
});
</script>
</body>
</html>
`;
}
// 🚀 Start the server
start(): void {
this.app.listen(this.port, () => {
console.log(`🚀 SSE Server running on port ${this.port}`);
console.log(`📡 SSE endpoint: http://localhost:${this.port}/api/sse`);
console.log(`🎭 Demo page: http://localhost:${this.port}`);
});
// 🧹 Cleanup on shutdown
process.on('SIGTERM', () => this.shutdown());
process.on('SIGINT', () => this.shutdown());
}
private shutdown(): void {
console.log('🛑 Shutting down SSE server...');
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
// 👋 Close all SSE connections
this.connections.forEach((connection) => {
connection.response.end();
});
console.log('✅ SSE server shutdown complete');
process.exit(0);
}
}
// 🚀 Start the SSE server
const sseServer = new SSEServer(3000);
sseServer.start();
// 🎯 Demo data generators for testing
setInterval(() => {
// 📊 Send random stock updates
const symbols = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA'];
const symbol = symbols[Math.floor(Math.random() * symbols.length)];
const price = 100 + Math.random() * 200;
const change = (Math.random() - 0.5) * 20;
sseServer.sendStockUpdate({
type: 'stock_update',
symbol,
price: Math.round(price * 100) / 100,
change: Math.round(change * 100) / 100,
changePercent: (change / (price - change)) * 100,
timestamp: Date.now(),
id: `stock-${Date.now()}`
});
}, 5000);
setInterval(() => {
// 🔧 Send random system status updates
const services = ['API Gateway', 'Database', 'Cache Layer', 'Message Queue'];
const statuses: SystemStatusMessage['status'][] = ['online', 'degraded'];
sseServer.sendSystemStatus({
type: 'system_status',
service: services[Math.floor(Math.random() * services.length)],
status: statuses[Math.floor(Math.random() * statuses.length)],
uptime: Math.random() * 1000000,
timestamp: Date.now(),
id: `status-${Date.now()}`
});
}, 15000);
🎨 Advanced SSE Patterns
🔄 SSE with Authentication and Channels
// 🔐 Advanced SSE with authentication and channel management
interface AuthenticatedSSEClient extends TypeSafeSSEClient {
authenticate(token: string): Promise<void>;
subscribeToChannel(channel: string): void;
unsubscribeFromChannel(channel: string): void;
}
class ChannelBasedSSEClient extends TypeSafeSSEClient {
private authToken: string | null = null;
private subscribedChannels = new Set<string>();
private channelEventHandlers = new Map<string, Set<(message: any) => void>>();
constructor(baseUrl: string, config?: SSEClientConfig) {
super(baseUrl, config);
}
// 🔐 Authenticate with server
async authenticate(token: string): Promise<void> {
this.authToken = token;
// 🔄 Reconnect with authentication
if (this.isConnected()) {
this.disconnect();
}
// 🔗 Build authenticated URL
const authUrl = `${this.url}?token=${encodeURIComponent(token)}`;
this.url = authUrl;
this.connect();
}
// 📺 Subscribe to specific channel
subscribeToChannel(channel: string): void {
if (this.subscribedChannels.has(channel)) {
console.warn(`⚠️ Already subscribed to channel: ${channel}`);
return;
}
console.log(`📺 Subscribing to channel: ${channel}`);
this.subscribedChannels.add(channel);
// 📤 Send subscription request (if connection supports it)
if (this.isConnected()) {
this.sendChannelCommand('subscribe', channel);
}
}
// 📺 Unsubscribe from channel
unsubscribeFromChannel(channel: string): void {
if (!this.subscribedChannels.has(channel)) {
console.warn(`⚠️ Not subscribed to channel: ${channel}`);
return;
}
console.log(`📺 Unsubscribing from channel: ${channel}`);
this.subscribedChannels.delete(channel);
this.channelEventHandlers.delete(channel);
// 📤 Send unsubscription request
if (this.isConnected()) {
this.sendChannelCommand('unsubscribe', channel);
}
}
// 🎧 Add channel-specific event handler
onChannelMessage(channel: string, handler: (message: any) => void): void {
if (!this.channelEventHandlers.has(channel)) {
this.channelEventHandlers.set(channel, new Set());
}
this.channelEventHandlers.get(channel)!.add(handler);
}
// 🗑️ Remove channel-specific event handler
offChannelMessage(channel: string, handler: (message: any) => void): void {
const handlers = this.channelEventHandlers.get(channel);
if (handlers) {
handlers.delete(handler);
}
}
private sendChannelCommand(action: 'subscribe' | 'unsubscribe', channel: string): void {
// Note: This would require a bidirectional connection or separate HTTP endpoint
// For SSE, you might use a separate HTTP POST to manage subscriptions
fetch(`${this.url.split('?')[0]}/channels/${action}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.authToken}`
},
body: JSON.stringify({ channel })
}).catch(error => {
console.error(`❌ Failed to ${action} channel ${channel}:`, error);
});
}
// 🎧 Override message handling to route to channels
protected handleMessage(message: SSEEventMessage): void {
// 📡 Handle channel-specific messages
if ((message as any).channel) {
const channel = (message as any).channel;
const handlers = this.channelEventHandlers.get(channel);
if (handlers) {
handlers.forEach(handler => {
try {
handler(message);
} catch (error) {
console.error(`❌ Error in channel handler for ${channel}:`, error);
}
});
}
}
// 🎯 Call parent handler for general messages
super.emit('message', message);
}
// 📊 Get subscription status
getSubscriptions(): string[] {
return Array.from(this.subscribedChannels);
}
// 🔌 Override disconnect to cleanup subscriptions
disconnect(): void {
this.subscribedChannels.clear();
this.channelEventHandlers.clear();
super.disconnect();
}
}
⚡ SSE with Message Buffering and Offline Support
// 💾 SSE client with offline support and message buffering
interface BufferedMessage {
message: SSEEventMessage;
receivedAt: Date;
processed: boolean;
}
class OfflineCapableSSEClient extends TypeSafeSSEClient {
private messageBuffer: BufferedMessage[] = new Map();
private isOnline = navigator.onLine;
private maxBufferSize = 1000;
private offlineHandlers = new Set<(isOnline: boolean) => void>();
constructor(url: string, config?: SSEClientConfig) {
super(url, config);
this.setupOfflineHandling();
}
private setupOfflineHandling(): void {
// 🌐 Listen for online/offline events
window.addEventListener('online', () => {
console.log('🌐 Device came online');
this.isOnline = true;
this.handleOnlineStateChange(true);
// 🔄 Reconnect if needed
if (!this.isConnected()) {
this.connect();
}
});
window.addEventListener('offline', () => {
console.log('📵 Device went offline');
this.isOnline = false;
this.handleOnlineStateChange(false);
});
}
private handleOnlineStateChange(isOnline: boolean): void {
// 🔔 Notify offline handlers
this.offlineHandlers.forEach(handler => {
try {
handler(isOnline);
} catch (error) {
console.error('❌ Error in offline handler:', error);
}
});
if (isOnline) {
// 📤 Process buffered messages when coming back online
this.processBufferedMessages();
}
}
// 🎧 Add offline state change handler
onOfflineStateChange(handler: (isOnline: boolean) => void): void {
this.offlineHandlers.add(handler);
}
// 🗑️ Remove offline state change handler
offOfflineStateChange(handler: (isOnline: boolean) => void): void {
this.offlineHandlers.delete(handler);
}
// 📦 Override message handling to support buffering
protected handleMessage(message: SSEEventMessage): void {
const bufferedMessage: BufferedMessage = {
message,
receivedAt: new Date(),
processed: false
};
// 💾 Add to buffer
this.addToBuffer(bufferedMessage);
// 🎯 Process immediately if online
if (this.isOnline) {
this.processMessage(bufferedMessage);
}
// 🎧 Always emit for real-time handlers
super.emit('message', message);
}
private addToBuffer(bufferedMessage: BufferedMessage): void {
// 📥 Add to buffer
this.messageBuffer.push(bufferedMessage);
// 🧹 Trim buffer if too large
if (this.messageBuffer.length > this.maxBufferSize) {
const removed = this.messageBuffer.shift();
console.warn(`⚠️ Buffer full, removed oldest message: ${removed?.message.id}`);
}
}
private processMessage(bufferedMessage: BufferedMessage): void {
if (bufferedMessage.processed) return;
try {
// 🎯 Process the message (could involve API calls, storage, etc.)
console.log(`🔄 Processing buffered message: ${bufferedMessage.message.type}`);
// Mark as processed
bufferedMessage.processed = true;
// 🎉 Emit processed event
this.emit('messageProcessed' as any, bufferedMessage.message);
} catch (error) {
console.error('❌ Failed to process buffered message:', error);
}
}
private processBufferedMessages(): void {
console.log(`🔄 Processing ${this.messageBuffer.length} buffered messages...`);
// 📤 Process all unprocessed messages
this.messageBuffer
.filter(buffered => !buffered.processed)
.forEach(buffered => {
this.processMessage(buffered);
});
// 🧹 Clean up old processed messages
this.messageBuffer = this.messageBuffer.filter(buffered => {
const age = Date.now() - buffered.receivedAt.getTime();
return !buffered.processed || age < 300000; // Keep processed messages for 5 minutes
});
}
// 📊 Get buffer statistics
getBufferStats(): any {
const processed = this.messageBuffer.filter(m => m.processed).length;
const unprocessed = this.messageBuffer.length - processed;
return {
total: this.messageBuffer.length,
processed,
unprocessed,
isOnline: this.isOnline,
connectionState: this.getReadyState()
};
}
// 🧹 Clear message buffer
clearBuffer(): void {
console.log(`🧹 Clearing message buffer (${this.messageBuffer.length} messages)`);
this.messageBuffer = [];
}
// 🔄 Manually retry processing failed messages
retryFailedMessages(): void {
console.log('🔄 Retrying failed message processing...');
this.messageBuffer
.filter(buffered => !buffered.processed)
.forEach(buffered => {
this.processMessage(buffered);
});
}
}
🧪 Testing SSE Applications
🔬 Unit Testing SSE Clients
// 🧪 Testing utilities for SSE applications
import { jest } from '@jest/globals';
// 🎭 Mock EventSource for testing
class MockEventSource {
static CONNECTING = 0;
static OPEN = 1;
static CLOSED = 2;
readyState = MockEventSource.CONNECTING;
url: string;
withCredentials: boolean;
onopen: ((event: Event) => void) | null = null;
onmessage: ((event: MessageEvent) => void) | null = null;
onerror: ((event: Event) => void) | null = null;
private listeners: { [key: string]: Function[] } = {};
constructor(url: string, eventSourceInitDict?: EventSourceInit) {
this.url = url;
this.withCredentials = eventSourceInitDict?.withCredentials || false;
// 🕐 Simulate async connection
setTimeout(() => {
this.readyState = MockEventSource.OPEN;
if (this.onopen) {
this.onopen(new Event('open'));
}
}, 10);
}
addEventListener(type: string, listener: EventListener): void {
if (!this.listeners[type]) {
this.listeners[type] = [];
}
this.listeners[type].push(listener);
}
removeEventListener(type: string, listener: EventListener): void {
if (this.listeners[type]) {
this.listeners[type] = this.listeners[type].filter(l => l !== listener);
}
}
close(): void {
this.readyState = MockEventSource.CLOSED;
if (this.onclose) {
(this.onclose as any)(new Event('close'));
}
}
// Test helpers
simulateMessage(data: any, eventType?: string): void {
const event = {
data: JSON.stringify(data),
type: 'message',
lastEventId: `id-${Date.now()}`,
origin: window.location.origin
} as MessageEvent;
if (eventType && this.listeners[eventType]) {
this.listeners[eventType].forEach(listener => {
listener(event);
});
} else if (this.onmessage) {
this.onmessage(event);
}
}
simulateError(): void {
if (this.onerror) {
this.onerror(new Event('error'));
}
}
simulateClose(): void {
this.readyState = MockEventSource.CLOSED;
if (this.onclose) {
(this.onclose as any)(new Event('close'));
}
}
}
// 🧪 Test suite for SSE client
describe('SSE Client Tests', () => {
let mockEventSource: MockEventSource;
let client: TypeSafeSSEClient;
beforeEach(() => {
// 🎭 Replace global EventSource with mock
(global as any).EventSource = MockEventSource;
client = new TypeSafeSSEClient('/api/sse');
});
afterEach(() => {
client.disconnect();
});
test('🔌 should connect successfully', async () => {
const connectPromise = new Promise<void>((resolve) => {
client.on('connected', resolve);
});
client.connect();
// ✅ Should resolve when connection opens
await expect(connectPromise).resolves.toBeUndefined();
expect(client.isConnected()).toBe(true);
});
test('📨 should handle incoming messages', async () => {
client.connect();
// ⏰ Wait for connection
await new Promise(resolve => {
client.on('connected', resolve);
});
const messageHandler = jest.fn();
client.on('message', messageHandler);
// 📨 Simulate incoming message
const testMessage = {
type: 'notification',
title: 'Test Notification',
message: 'Test message content',
severity: 'info',
timestamp: Date.now(),
id: 'test-123'
};
// Access the underlying mock EventSource
const eventSource = (client as any).eventSource as MockEventSource;
eventSource.simulateMessage(testMessage);
// ⏰ Wait for message processing
await new Promise(resolve => setTimeout(resolve, 10));
expect(messageHandler).toHaveBeenCalledWith(testMessage);
});
test('🔄 should attempt reconnection on connection error', async () => {
const client = new TypeSafeSSEClient('/api/sse', {
autoReconnect: true,
maxReconnectAttempts: 2,
reconnectDelay: 100
});
client.connect();
// ⏰ Wait for connection
await new Promise(resolve => {
client.on('connected', resolve);
});
const reconnectingHandler = jest.fn();
client.on('reconnecting', reconnectingHandler);
// 💥 Simulate connection error
const eventSource = (client as any).eventSource as MockEventSource;
eventSource.simulateClose();
// ⏰ Wait for reconnection attempt
await new Promise(resolve => setTimeout(resolve, 150));
expect(reconnectingHandler).toHaveBeenCalledWith(1);
});
test('❌ should handle connection errors', async () => {
const errorHandler = jest.fn();
client.on('error', errorHandler);
client.connect();
// 💥 Simulate connection error
setTimeout(() => {
const eventSource = (client as any).eventSource as MockEventSource;
eventSource.simulateError();
}, 5);
// ⏰ Wait for error handling
await new Promise(resolve => setTimeout(resolve, 50));
expect(errorHandler).toHaveBeenCalled();
});
test('📺 should handle channel subscriptions', async () => {
const channelClient = new ChannelBasedSSEClient('/api/sse');
// 🔐 Mock authentication
await channelClient.authenticate('test-token');
const messageHandler = jest.fn();
channelClient.onChannelMessage('notifications', messageHandler);
channelClient.subscribeToChannel('notifications');
expect(channelClient.getSubscriptions()).toContain('notifications');
});
test('💾 should buffer messages when offline', async () => {
const offlineClient = new OfflineCapableSSEClient('/api/sse');
// 📵 Simulate offline state
Object.defineProperty(navigator, 'onLine', {
writable: true,
value: false
});
offlineClient.connect();
// 📨 Simulate incoming message while offline
const testMessage = {
type: 'notification',
title: 'Offline Message',
message: 'This message should be buffered',
severity: 'info',
timestamp: Date.now(),
id: 'offline-123'
};
const eventSource = (offlineClient as any).eventSource as MockEventSource;
eventSource.simulateMessage(testMessage);
// 📊 Check buffer stats
const stats = offlineClient.getBufferStats();
expect(stats.total).toBe(1);
expect(stats.unprocessed).toBe(1);
expect(stats.isOnline).toBe(false);
});
});
// 🎭 Integration test for notification system
describe('Notification System Integration Tests', () => {
let notificationSystem: RealTimeNotificationSystem;
beforeEach(() => {
// 🎭 Setup DOM and mock EventSource
document.body.innerHTML = '';
(global as any).EventSource = MockEventSource;
(global as any).Notification = class {
static permission = 'granted';
static requestPermission = jest.fn().mockResolvedValue('granted');
constructor(public title: string, public options: any) {}
};
notificationSystem = new RealTimeNotificationSystem('/api/sse');
});
afterEach(() => {
notificationSystem.disconnect();
});
test('🔔 should display notifications in UI', async () => {
await notificationSystem.initialize();
// ⏰ Wait for connection
await new Promise(resolve => setTimeout(resolve, 20));
// 📨 Simulate notification message
const notification = {
type: 'notification',
title: 'Test Notification',
message: 'This is a test notification',
severity: 'success',
timestamp: Date.now(),
id: 'ui-test-123'
} as NotificationMessage;
// Get the SSE client and simulate message
const sseClient = (notificationSystem as any).sseClient;
const eventSource = sseClient.eventSource as MockEventSource;
eventSource.simulateMessage(notification);
// ⏰ Wait for UI update
await new Promise(resolve => setTimeout(resolve, 50));
// 🔍 Check that notification was added to DOM
const notificationContainer = document.getElementById('notifications');
expect(notificationContainer?.children.length).toBe(1);
});
test('📊 should track notification statistics', async () => {
await notificationSystem.initialize();
// 📨 Send multiple notifications
const notifications = [
{ type: 'notification', severity: 'success', title: 'Success', message: 'Success message', timestamp: Date.now(), id: '1' },
{ type: 'notification', severity: 'warning', title: 'Warning', message: 'Warning message', timestamp: Date.now(), id: '2' },
{ type: 'notification', severity: 'error', title: 'Error', message: 'Error message', timestamp: Date.now(), id: '3' }
] as NotificationMessage[];
const sseClient = (notificationSystem as any).sseClient;
const eventSource = sseClient.eventSource as MockEventSource;
notifications.forEach(notification => {
eventSource.simulateMessage(notification);
});
// ⏰ Wait for processing
await new Promise(resolve => setTimeout(resolve, 100));
const stats = notificationSystem.getStats();
expect(stats.total).toBe(3);
expect(stats.byS severity.success).toBe(1);
expect(stats.bySeverity.warning).toBe(1);
expect(stats.bySeverity.error).toBe(1);
});
});
🎯 Common Pitfalls and Best Practices
❌ Common Mistakes to Avoid
// ❌ Common SSE pitfalls and their solutions
class SSEPitfalls {
// ❌ WRONG: Not handling connection states properly
static wrongConnectionHandling(): void {
const eventSource = new EventSource('/api/sse');
// 💥 No error handling or connection state checking!
eventSource.onmessage = (event) => {
console.log(event.data);
};
}
// ✅ CORRECT: Proper connection state handling
static correctConnectionHandling(): void {
const eventSource = new EventSource('/api/sse');
eventSource.onopen = () => {
console.log('✅ SSE connected');
};
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
console.log('📨 Message:', data);
} catch (error) {
console.error('❌ Failed to parse message:', error);
}
};
eventSource.onerror = (error) => {
console.error('❌ SSE error:', error);
if (eventSource.readyState === EventSource.CLOSED) {
console.log('🔌 Connection closed, attempting reconnect...');
// Implement reconnection logic
}
};
}
// ❌ WRONG: Not cleaning up EventSource
static wrongCleanup(): void {
let eventSource = new EventSource('/api/sse');
// Later in the app...
eventSource = null; // 💥 Memory leak! EventSource still running
}
// ✅ CORRECT: Proper EventSource cleanup
static correctCleanup(): void {
let eventSource = new EventSource('/api/sse');
// ✅ Cleanup function
function cleanup(): void {
if (eventSource) {
eventSource.close();
eventSource = null;
}
}
// Call cleanup when component unmounts or page unloads
window.addEventListener('beforeunload', cleanup);
}
// ❌ WRONG: Not implementing Last-Event-ID for reliable delivery
static wrongEventIdHandling(): void {
const eventSource = new EventSource('/api/sse');
// 💥 No handling of missed messages on reconnection!
eventSource.onmessage = (event) => {
console.log(event.data);
};
}
// ✅ CORRECT: Implementing Last-Event-ID
static correctEventIdHandling(): void {
let lastEventId: string | null = null;
function connect(): void {
const url = lastEventId
? `/api/sse?lastEventId=${encodeURIComponent(lastEventId)}`
: '/api/sse';
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
// ✅ Store Last-Event-ID for reliable delivery
if (event.lastEventId) {
lastEventId = event.lastEventId;
}
console.log('📨 Message:', event.data);
};
eventSource.onerror = () => {
if (eventSource.readyState === EventSource.CLOSED) {
// 🔄 Reconnect with Last-Event-ID
setTimeout(connect, 1000);
}
};
}
connect();
}
// ❌ WRONG: Not handling message parsing errors
static wrongMessageParsing(): void {
const eventSource = new EventSource('/api/sse');
eventSource.onmessage = (event) => {
// 💥 This could crash if data is invalid JSON!
const data = JSON.parse(event.data);
console.log(data.title.toUpperCase()); // 💥 What if title is undefined?
};
}
// ✅ CORRECT: Safe message parsing with validation
static correctMessageParsing(): void {
const eventSource = new EventSource('/api/sse');
function isValidMessage(data: any): boolean {
return (
typeof data === 'object' &&
typeof data.type === 'string' &&
typeof data.timestamp === 'number' &&
typeof data.id === 'string'
);
}
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// ✅ Validate message structure
if (isValidMessage(data)) {
switch (data.type) {
case 'notification':
if (typeof data.title === 'string') {
console.log(data.title.toUpperCase());
}
break;
// Handle other message types...
}
} else {
console.warn('⚠️ Invalid message format:', data);
}
} catch (error) {
console.error('❌ Failed to parse SSE message:', error);
}
};
}
// ❌ WRONG: Not handling browser compatibility
static wrongBrowserSupport(): void {
// 💥 EventSource might not be supported!
const eventSource = new EventSource('/api/sse');
}
// ✅ CORRECT: Check browser support and provide fallback
static correctBrowserSupport(): void {
if (!('EventSource' in window)) {
console.warn('⚠️ EventSource not supported, falling back to polling');
// Implement polling fallback
setInterval(async () => {
try {
const response = await fetch('/api/updates');
const data = await response.json();
// Handle updates
} catch (error) {
console.error('Polling error:', error);
}
}, 5000);
return;
}
// ✅ Use SSE if supported
const eventSource = new EventSource('/api/sse');
// ... setup handlers
}
}
🏆 Best Practices Summary
// 🏆 SSE best practices checklist
class SSEBestPractices {
// ✅ 1. Always implement proper error handling
static implementErrorHandling(): void {
const client = new TypeSafeSSEClient('/api/sse');
client.on('error', (error) => {
console.error('SSE error:', error);
// Implement user notification
// Log to error tracking service
// Attempt recovery if possible
});
}
// ✅ 2. Use Last-Event-ID for reliable message delivery
static implementReliableDelivery(): void {
const client = new TypeSafeSSEClient('/api/sse', {
autoReconnect: true,
maxReconnectAttempts: 10
});
// Client automatically handles Last-Event-ID
}
// ✅ 3. Implement proper message validation
static implementMessageValidation(): void {
function isValidSSEMessage(data: any): data is SSEEventMessage {
return (
typeof data === 'object' &&
typeof data.type === 'string' &&
typeof data.timestamp === 'number' &&
typeof data.id === 'string'
);
}
const client = new TypeSafeSSEClient('/api/sse');
client.on('message', (message) => {
if (isValidSSEMessage(message)) {
// ✅ Type-safe processing
console.log(`Valid message: ${message.type}`);
}
});
}
// ✅ 4. Handle offline scenarios gracefully
static handleOfflineScenarios(): void {
const client = new OfflineCapableSSEClient('/api/sse');
client.onOfflineStateChange((isOnline) => {
if (isOnline) {
console.log('🌐 Back online, processing buffered messages');
} else {
console.log('📵 Gone offline, buffering messages');
}
});
}
// ✅ 5. Implement proper cleanup
static implementCleanup(): void {
class ComponentWithSSE {
private sseClient: TypeSafeSSEClient;
constructor() {
this.sseClient = new TypeSafeSSEClient('/api/sse');
this.sseClient.connect();
}
// ✅ Always cleanup on component unmount
destroy(): void {
this.sseClient.disconnect();
// Clear any timers
// Remove event listeners
// Cancel pending operations
}
}
}
// ✅ 6. Use channels for organized messaging
static useChannelOrganization(): void {
const client = new ChannelBasedSSEClient('/api/sse');
// Subscribe to specific channels
client.subscribeToChannel('notifications');
client.subscribeToChannel('stock-updates');
// Handle channel-specific messages
client.onChannelMessage('notifications', (notification) => {
console.log('🔔 Notification:', notification);
});
}
// ✅ 7. Monitor connection health
static monitorConnectionHealth(): void {
const client = new TypeSafeSSEClient('/api/sse');
let messagesReceived = 0;
let lastMessageTime = Date.now();
client.on('message', () => {
messagesReceived++;
lastMessageTime = Date.now();
});
// Monitor connection health
setInterval(() => {
const timeSinceLastMessage = Date.now() - lastMessageTime;
console.log('SSE Health:', {
connected: client.isConnected(),
messagesReceived,
timeSinceLastMessage,
state: client.getReadyState()
});
// Alert if no messages for too long
if (timeSinceLastMessage > 60000 && client.isConnected()) {
console.warn('⚠️ No messages received for over 1 minute');
}
}, 30000);
}
// ✅ 8. Implement graceful degradation
static implementGracefulDegradation(): void {
class RealTimeService {
private strategy: 'sse' | 'polling' = 'sse';
async initialize(): Promise<void> {
if ('EventSource' in window && navigator.onLine) {
try {
await this.trySSE();
this.strategy = 'sse';
} catch (error) {
console.warn('SSE failed, falling back to polling');
this.fallbackToPolling();
}
} else {
this.fallbackToPolling();
}
}
private async trySSE(): Promise<void> {
const client = new TypeSafeSSEClient('/api/sse');
client.connect();
// Test connection
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('SSE connection timeout'));
}, 5000);
client.on('connected', () => {
clearTimeout(timeout);
resolve();
});
client.on('error', () => {
clearTimeout(timeout);
reject(new Error('SSE connection failed'));
});
});
}
private fallbackToPolling(): void {
this.strategy = 'polling';
setInterval(async () => {
try {
const response = await fetch('/api/updates');
const updates = await response.json();
// Process updates
} catch (error) {
console.error('Polling error:', error);
}
}, 5000);
}
}
}
}
🎉 Hands-On Exercises
💻 Exercise 1: Build a Live News Feed
Create a real-time news feed using SSE:
// 💻 Exercise 1: Live News Feed
// Your task: Implement a real-time news feed system
interface NewsArticle {
id: string;
title: string;
summary: string;
category: string;
publishedAt: Date;
author: string;
imageUrl?: string;
}
class LiveNewsFeed {
private sseClient: TypeSafeSSEClient;
private articles: NewsArticle[] = [];
private feedContainer: HTMLElement;
constructor() {
this.sseClient = new TypeSafeSSEClient('/api/news-feed');
this.feedContainer = document.getElementById('news-feed')!;
this.setupEventHandlers();
}
async initialize(): Promise<void> {
// TODO: Initialize the SSE connection
// TODO: Load initial articles
// Hint: Use this.sseClient.connect() and setup message handlers
}
private setupEventHandlers(): void {
// TODO: Setup SSE event listeners
// Handle: new articles, article updates, breaking news
}
private handleNewArticle(article: NewsArticle): void {
// TODO: Add new article to feed
// TODO: Update UI
// Hint: Add to this.articles array and call updateFeedDisplay()
}
private updateFeedDisplay(): void {
// TODO: Render articles in the feed container
// TODO: Sort by publishedAt (newest first)
// TODO: Limit to 20 most recent articles
}
filterByCategory(category: string): void {
// TODO: Filter articles by category
// TODO: Update display
}
}
// 🧪 Test your implementation:
// const newsFeed = new LiveNewsFeed();
// newsFeed.initialize();
💻 Exercise 2: Real-Time Trading Dashboard
Build a stock trading dashboard with live price updates:
// 💻 Exercise 2: Trading Dashboard
// Your task: Create a real-time stock trading dashboard
interface StockData {
symbol: string;
price: number;
change: number;
changePercent: number;
volume: number;
high: number;
low: number;
lastUpdated: Date;
}
interface PriceAlert {
symbol: string;
targetPrice: number;
condition: 'above' | 'below';
active: boolean;
}
class TradingDashboard {
private sseClient: TypeSafeSSEClient;
private stocks = new Map<string, StockData>();
private priceAlerts: PriceAlert[] = [];
private watchlist: string[] = [];
constructor() {
this.sseClient = new TypeSafeSSEClient('/api/market-data');
this.setupEventHandlers();
}
async initialize(): Promise<void> {
// TODO: Initialize SSE connection
// TODO: Load watchlist
// TODO: Setup price alerts
}
private setupEventHandlers(): void {
// TODO: Handle stock price updates
// TODO: Handle market status changes
// TODO: Check price alerts on updates
}
private handleStockUpdate(stockData: StockData): void {
// TODO: Update stock data
// TODO: Update UI display
// TODO: Check if any price alerts triggered
}
addToWatchlist(symbol: string): void {
// TODO: Add symbol to watchlist
// TODO: Subscribe to updates for this symbol
}
createPriceAlert(alert: PriceAlert): void {
// TODO: Add price alert
// TODO: Check if alert should trigger immediately
}
private checkPriceAlerts(stockData: StockData): void {
// TODO: Check if any alerts should trigger
// TODO: Send notifications for triggered alerts
}
private updateStockDisplay(symbol: string): void {
// TODO: Update the UI for specific stock
// TODO: Show price change with colors (green/red)
// TODO: Animate price changes
}
}
💻 Exercise 3: Live Chat with SSE
Create a live chat system using SSE for incoming messages:
// 💻 Exercise 3: Live Chat with SSE
// Your task: Build a live chat system using SSE for real-time messages
interface ChatMessage {
id: string;
username: string;
content: string;
timestamp: Date;
roomId: string;
type: 'message' | 'join' | 'leave' | 'typing';
}
interface ChatRoom {
id: string;
name: string;
users: string[];
lastMessage?: ChatMessage;
}
class LiveChatSystem {
private sseClient: TypeSafeSSEClient;
private currentRoom: string | null = null;
private username: string;
private messages: ChatMessage[] = [];
private typingUsers = new Set<string>();
constructor(username: string) {
this.username = username;
this.sseClient = new TypeSafeSSEClient('/api/chat-sse');
this.setupEventHandlers();
}
async initialize(): Promise<void> {
// TODO: Initialize SSE connection
// TODO: Join default room
// TODO: Setup typing detection
}
private setupEventHandlers(): void {
// TODO: Handle incoming chat messages
// TODO: Handle user join/leave events
// TODO: Handle typing indicators
}
joinRoom(roomId: string): void {
// TODO: Leave current room
// TODO: Join new room
// TODO: Load room history
// Note: Since SSE is one-way, you'll need to use HTTP POST to join rooms
}
sendMessage(content: string): void {
// TODO: Send message via HTTP POST
// TODO: Add to local message list for immediate feedback
// Note: Use fetch() to POST message, SSE will deliver to other users
}
private handleIncomingMessage(message: ChatMessage): void {
// TODO: Add message to chat
// TODO: Update UI
// TODO: Play notification sound if needed
}
private handleUserTyping(username: string, isTyping: boolean): void {
// TODO: Update typing indicators
// TODO: Show/hide typing status
}
private displayMessage(message: ChatMessage): void {
// TODO: Render message in chat UI
// TODO: Handle different message types (join/leave/message)
// TODO: Auto-scroll to bottom
}
startTyping(): void {
// TODO: Send typing indicator via HTTP POST
// TODO: Throttle typing notifications
}
stopTyping(): void {
// TODO: Send stop typing indicator
}
}
🎊 Conclusion
Congratulations! 🎉 You’ve mastered the art of Server-Sent Events and one-way real-time communication! You’ve learned how to:
- 📡 Create robust SSE connections with automatic reconnection
- 🔔 Build real-time notification systems with proper UI integration
- 🏗️ Implement scalable server-side event streaming
- 🛡️ Handle offline scenarios and message buffering
- ⚡ Optimize performance with channels and message filtering
- 🧪 Test SSE applications thoroughly
- 🎯 Avoid common pitfalls and follow best practices
You’re now equipped to build any real-time feature that needs server-to-client communication - from live dashboards and notifications to news feeds and activity streams. SSE is your perfect tool when you need efficient, reliable one-way real-time updates!
Keep practicing, keep building, and remember: when you need real-time updates flowing from server to client, Server-Sent Events are your reliable, efficient solution! 🚀✨
Next Steps: Try building a live sports scoreboard, a real-time analytics dashboard, or a social media activity feed. The streaming possibilities are endless with your new SSE superpowers! 🌟