+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Part 469 of 541

๐Ÿš€ Network Monitoring: Traffic Analysis

Master network monitoring and traffic analysis in Python with practical examples, best practices, and real-world applications ๐Ÿš€

๐Ÿ’ŽAdvanced
25 min read

Prerequisites

  • Basic understanding of programming concepts ๐Ÿ“
  • Python installation (3.8+) ๐Ÿ
  • VS Code or preferred IDE ๐Ÿ’ป

What you'll learn

  • Understand network monitoring fundamentals ๐ŸŽฏ
  • Apply traffic analysis in real projects ๐Ÿ—๏ธ
  • Debug common network monitoring issues ๐Ÿ›
  • Write clean, Pythonic network analysis code โœจ

๐ŸŽฏ Introduction

Welcome to this exciting tutorial on Network Monitoring and Traffic Analysis! ๐ŸŽ‰ In this guide, weโ€™ll explore how to monitor network traffic, analyze patterns, and gain insights into network behavior using Python.

Youโ€™ll discover how network monitoring can transform your understanding of system performance, security threats, and user behavior. Whether youโ€™re building security tools ๐Ÿ”’, performance monitors ๐Ÿ“Š, or network diagnostics ๐Ÿ”, understanding traffic analysis is essential for robust network applications.

By the end of this tutorial, youโ€™ll feel confident implementing network monitoring solutions in your own projects! Letโ€™s dive in! ๐ŸŠโ€โ™‚๏ธ

๐Ÿ“š Understanding Network Monitoring

๐Ÿค” What is Network Monitoring?

Network monitoring is like being a traffic controller at a busy intersection ๐Ÿšฆ. Think of it as watching and analyzing all the cars (data packets) that pass through, understanding where theyโ€™re going, how fast theyโ€™re moving, and identifying any unusual patterns.

In Python terms, network monitoring involves capturing, analyzing, and interpreting network packets in real-time. This means you can:

  • โœจ Detect security threats and intrusions
  • ๐Ÿš€ Identify performance bottlenecks
  • ๐Ÿ›ก๏ธ Monitor bandwidth usage and traffic patterns

๐Ÿ’ก Why Use Network Monitoring?

Hereโ€™s why developers love network monitoring:

  1. Security Analysis ๐Ÿ”’: Detect malicious activities and intrusions
  2. Performance Optimization ๐Ÿ’ป: Identify and fix network bottlenecks
  3. Traffic Insights ๐Ÿ“–: Understand usage patterns and behaviors
  4. Troubleshooting ๐Ÿ”ง: Quickly diagnose network issues

Real-world example: Imagine monitoring a web server ๐ŸŒ. With traffic analysis, you can detect DDoS attacks, identify slow endpoints, and optimize resource allocation!

๐Ÿ”ง Basic Syntax and Usage

๐Ÿ“ Simple Packet Capture

Letโ€™s start with a friendly example using Scapy:

# ๐Ÿ‘‹ Hello, Network Monitor!
from scapy.all import sniff, IP, TCP, UDP

# ๐ŸŽจ Simple packet handler
def packet_handler(packet):
    # ๐Ÿ‘ค Check if it's an IP packet
    if IP in packet:
        src_ip = packet[IP].src  # ๐Ÿ  Source address
        dst_ip = packet[IP].dst  # ๐ŸŽฏ Destination address
        print(f"๐Ÿ“ฆ Packet: {src_ip} โ†’ {dst_ip}")
        
        # ๐Ÿ” Check protocol
        if TCP in packet:
            print(f"   ๐Ÿšข TCP Port: {packet[TCP].dport}")
        elif UDP in packet:
            print(f"   โœˆ๏ธ UDP Port: {packet[UDP].dport}")

# ๐ŸŽฃ Start capturing packets!
print("๐Ÿ” Starting network monitor... Press Ctrl+C to stop")
sniff(prn=packet_handler, count=10)  # Capture 10 packets

๐Ÿ’ก Explanation: Notice how we use emojis to make the output more readable! The sniff() function captures packets and calls our handler for each one.

๐ŸŽฏ Common Monitoring Patterns

Here are patterns youโ€™ll use daily:

# ๐Ÿ—๏ธ Pattern 1: Traffic statistics collector
from collections import defaultdict
import time

class TrafficMonitor:
    def __init__(self):
        self.stats = defaultdict(int)  # ๐Ÿ“Š Traffic stats
        self.start_time = time.time()  # โฑ๏ธ Start timer
        
    # ๐Ÿ“ˆ Update statistics
    def update_stats(self, packet):
        if IP in packet:
            # ๐ŸŽฏ Count packets per IP
            self.stats[packet[IP].src] += 1
            
    # ๐Ÿ“Š Display statistics
    def show_stats(self):
        print("\n๐Ÿ“Š Traffic Statistics:")
        for ip, count in sorted(self.stats.items(), 
                               key=lambda x: x[1], reverse=True)[:5]:
            print(f"   ๐ŸŒ {ip}: {count} packets")

# ๐ŸŽจ Pattern 2: Protocol analyzer
def analyze_protocols(packet):
    protocols = []
    
    if TCP in packet:
        protocols.append("TCP ๐Ÿšข")
    if UDP in packet:
        protocols.append("UDP โœˆ๏ธ")
    if packet.haslayer("ICMP"):
        protocols.append("ICMP ๐Ÿ“")
        
    return protocols

# ๐Ÿ”„ Pattern 3: Bandwidth monitor
class BandwidthMonitor:
    def __init__(self):
        self.bytes_total = 0  # ๐Ÿ“ฆ Total bytes
        self.start_time = time.time()  # โฑ๏ธ Timer
        
    def process_packet(self, packet):
        if hasattr(packet, 'len'):
            self.bytes_total += packet.len
            
    def get_bandwidth_mbps(self):
        elapsed = time.time() - self.start_time
        if elapsed > 0:
            mbps = (self.bytes_total * 8) / (elapsed * 1_000_000)
            return round(mbps, 2)
        return 0

๐Ÿ’ก Practical Examples

๐Ÿ›ก๏ธ Example 1: Security Monitor

Letโ€™s build a security monitoring system:

# ๐Ÿ”’ Security monitor for suspicious activity
import datetime
from scapy.all import *

class SecurityMonitor:
    def __init__(self):
        self.suspicious_ips = set()  # ๐Ÿšจ Suspicious IPs
        self.port_scans = defaultdict(set)  # ๐ŸŽฏ Port scan detection
        self.syn_flood = defaultdict(int)  # ๐Ÿ’ฅ SYN flood detection
        
    # ๐Ÿ” Analyze packet for threats
    def analyze_packet(self, packet):
        if not packet.haslayer(IP):
            return
            
        src_ip = packet[IP].src
        
        # ๐ŸŽฏ Detect port scanning
        if TCP in packet and packet[TCP].flags == 2:  # SYN flag
            dst_port = packet[TCP].dport
            self.port_scans[src_ip].add(dst_port)
            
            # ๐Ÿšจ Alert if scanning multiple ports
            if len(self.port_scans[src_ip]) > 10:
                self.alert(f"๐Ÿšจ Port scan detected from {src_ip}!")
                self.suspicious_ips.add(src_ip)
                
        # ๐Ÿ’ฅ Detect SYN flood
        if TCP in packet and packet[TCP].flags == 2:
            self.syn_flood[src_ip] += 1
            
            # ๐Ÿšจ Alert if too many SYN packets
            if self.syn_flood[src_ip] > 100:
                self.alert(f"๐Ÿ’ฅ Possible SYN flood from {src_ip}!")
                self.suspicious_ips.add(src_ip)
                
    # ๐Ÿ“ข Alert function
    def alert(self, message):
        timestamp = datetime.datetime.now().strftime("%H:%M:%S")
        print(f"[{timestamp}] {message}")
        
    # ๐Ÿ“Š Generate security report
    def generate_report(self):
        print("\n๐Ÿ›ก๏ธ Security Report:")
        print(f"   ๐Ÿšจ Suspicious IPs: {len(self.suspicious_ips)}")
        
        if self.suspicious_ips:
            print("   ๐Ÿ“‹ List of suspicious IPs:")
            for ip in self.suspicious_ips:
                ports = len(self.port_scans.get(ip, []))
                syns = self.syn_flood.get(ip, 0)
                print(f"      ๐Ÿ”ด {ip} - Ports scanned: {ports}, SYN packets: {syns}")

# ๐ŸŽฎ Let's use it!
monitor = SecurityMonitor()

def packet_handler(packet):
    monitor.analyze_packet(packet)

# ๐Ÿš€ Start monitoring
print("๐Ÿ›ก๏ธ Security monitor started! Looking for threats...")
sniff(prn=packet_handler, timeout=30)
monitor.generate_report()

๐ŸŽฏ Try it yourself: Add detection for other attacks like ARP spoofing or DNS hijacking!

๐Ÿ“Š Example 2: Performance Monitor

Letโ€™s make a network performance monitor:

# ๐Ÿš€ Network performance monitor
import time
from datetime import datetime
from scapy.all import *

class PerformanceMonitor:
    def __init__(self):
        self.start_time = time.time()  # โฑ๏ธ Start timer
        self.packet_count = 0  # ๐Ÿ“ฆ Total packets
        self.byte_count = 0  # ๐Ÿ’พ Total bytes
        self.protocol_stats = defaultdict(int)  # ๐Ÿ“Š Protocol breakdown
        self.latency_samples = []  # โšก Latency measurements
        
    # ๐Ÿ“ˆ Process packet for performance metrics
    def process_packet(self, packet):
        self.packet_count += 1
        
        # ๐Ÿ’พ Count bytes
        if hasattr(packet, 'len'):
            self.byte_count += packet.len
            
        # ๐Ÿ“Š Track protocols
        if TCP in packet:
            self.protocol_stats['TCP'] += 1
        elif UDP in packet:
            self.protocol_stats['UDP'] += 1
        elif packet.haslayer('ICMP'):
            self.protocol_stats['ICMP'] += 1
            # โšก Measure ICMP latency
            if packet.haslayer('ICMP') and packet['ICMP'].type == 0:
                self.measure_latency(packet)
                
    # โšก Measure network latency
    def measure_latency(self, packet):
        # Simple latency estimation based on timestamp
        if hasattr(packet, 'time'):
            latency = (time.time() - packet.time) * 1000  # Convert to ms
            self.latency_samples.append(latency)
            
    # ๐Ÿ“Š Calculate current metrics
    def get_metrics(self):
        elapsed = time.time() - self.start_time
        
        # ๐Ÿš€ Calculate rates
        pps = self.packet_count / elapsed if elapsed > 0 else 0
        bps = (self.byte_count * 8) / elapsed if elapsed > 0 else 0
        mbps = bps / 1_000_000
        
        # โšก Calculate average latency
        avg_latency = sum(self.latency_samples) / len(self.latency_samples) \
                     if self.latency_samples else 0
                     
        return {
            'packets_per_second': round(pps, 2),
            'megabits_per_second': round(mbps, 2),
            'average_latency_ms': round(avg_latency, 2),
            'total_packets': self.packet_count,
            'uptime_seconds': round(elapsed, 2)
        }
        
    # ๐ŸŽจ Display dashboard
    def show_dashboard(self):
        metrics = self.get_metrics()
        
        print("\n" + "="*50)
        print("๐Ÿ“Š Network Performance Dashboard")
        print("="*50)
        print(f"โฑ๏ธ  Uptime: {metrics['uptime_seconds']}s")
        print(f"๐Ÿ“ฆ Packets: {metrics['total_packets']:,} ({metrics['packets_per_second']} pps)")
        print(f"๐Ÿš€ Bandwidth: {metrics['megabits_per_second']} Mbps")
        print(f"โšก Latency: {metrics['average_latency_ms']} ms avg")
        
        print("\n๐Ÿ“Š Protocol Distribution:")
        total = sum(self.protocol_stats.values())
        for proto, count in self.protocol_stats.items():
            percentage = (count / total * 100) if total > 0 else 0
            bar = "โ–ˆ" * int(percentage / 5)
            print(f"   {proto:5} {bar:20} {percentage:.1f}%")

# ๐ŸŽฎ Real-time monitoring loop
def monitor_network():
    monitor = PerformanceMonitor()
    
    def update_display(packet):
        monitor.process_packet(packet)
        # Update dashboard every 100 packets
        if monitor.packet_count % 100 == 0:
            monitor.show_dashboard()
            
    print("๐Ÿš€ Performance monitor started! Press Ctrl+C to stop...")
    sniff(prn=update_display, store=False)

๐Ÿš€ Advanced Concepts

๐Ÿง™โ€โ™‚๏ธ Advanced Topic 1: Deep Packet Inspection

When youโ€™re ready to level up, try deep packet inspection:

# ๐ŸŽฏ Advanced packet inspector
from scapy.all import *

class DeepPacketInspector:
    def __init__(self):
        self.http_requests = []  # ๐ŸŒ HTTP tracking
        self.dns_queries = []  # ๐Ÿ” DNS tracking
        self.application_data = defaultdict(list)  # ๐Ÿ“ฑ App data
        
    # ๐Ÿ”ฌ Inspect application layer
    def inspect_packet(self, packet):
        # ๐ŸŒ HTTP inspection
        if packet.haslayer(TCP) and packet[TCP].dport == 80:
            if packet.haslayer(Raw):
                payload = packet[Raw].load.decode('utf-8', errors='ignore')
                if 'GET' in payload or 'POST' in payload:
                    self.extract_http_info(payload, packet)
                    
        # ๐Ÿ” DNS inspection
        if packet.haslayer(DNS) and packet[DNS].qr == 0:
            self.extract_dns_info(packet)
            
    # ๐ŸŒ Extract HTTP information
    def extract_http_info(self, payload, packet):
        lines = payload.split('\n')
        if lines:
            request_line = lines[0].strip()
            src_ip = packet[IP].src if IP in packet else 'Unknown'
            
            self.http_requests.append({
                'time': datetime.now(),
                'src_ip': src_ip,
                'request': request_line,
                'headers': self.extract_headers(lines[1:])
            })
            
            print(f"๐ŸŒ HTTP: {src_ip} โ†’ {request_line}")
            
    # ๐Ÿ” Extract DNS queries
    def extract_dns_info(self, packet):
        if packet.haslayer(DNSQR):
            query = packet[DNSQR].qname.decode('utf-8', errors='ignore')
            src_ip = packet[IP].src if IP in packet else 'Unknown'
            
            self.dns_queries.append({
                'time': datetime.now(),
                'src_ip': src_ip,
                'query': query
            })
            
            print(f"๐Ÿ” DNS Query: {src_ip} โ†’ {query}")

๐Ÿ—๏ธ Advanced Topic 2: Real-time Alerting System

For production monitoring systems:

# ๐Ÿš€ Real-time alert system
import threading
import queue
from collections import deque

class AlertingSystem:
    def __init__(self):
        self.alert_queue = queue.Queue()  # ๐Ÿ“ฌ Alert queue
        self.thresholds = {  # ๐ŸŽฏ Alert thresholds
            'bandwidth_mbps': 100,
            'packets_per_second': 10000,
            'suspicious_ips': 5,
            'error_rate': 0.05
        }
        self.alert_history = deque(maxlen=100)  # ๐Ÿ“œ Alert history
        
    # ๐Ÿšจ Check thresholds and generate alerts
    def check_thresholds(self, metrics):
        alerts = []
        
        # ๐Ÿš€ Bandwidth alert
        if metrics.get('bandwidth_mbps', 0) > self.thresholds['bandwidth_mbps']:
            alerts.append({
                'level': 'HIGH',
                'type': 'BANDWIDTH',
                'message': f"๐Ÿš€ High bandwidth usage: {metrics['bandwidth_mbps']} Mbps",
                'emoji': '๐Ÿš€'
            })
            
        # ๐Ÿ“ฆ Packet rate alert
        if metrics.get('pps', 0) > self.thresholds['packets_per_second']:
            alerts.append({
                'level': 'MEDIUM',
                'type': 'PACKET_RATE',
                'message': f"๐Ÿ“ฆ High packet rate: {metrics['pps']} pps",
                'emoji': '๐Ÿ“ฆ'
            })
            
        return alerts
        
    # ๐Ÿ“ข Send alerts
    def send_alert(self, alert):
        # Add timestamp
        alert['timestamp'] = datetime.now()
        
        # Queue for processing
        self.alert_queue.put(alert)
        self.alert_history.append(alert)
        
        # ๐Ÿ”” Display alert
        level_emoji = {
            'CRITICAL': '๐Ÿ”ด',
            'HIGH': '๐ŸŸ ',
            'MEDIUM': '๐ŸŸก',
            'LOW': '๐ŸŸข'
        }
        
        emoji = level_emoji.get(alert['level'], 'โšช')
        print(f"\n{emoji} ALERT: {alert['message']}")
        
    # ๐Ÿ“Š Alert statistics
    def get_alert_stats(self):
        stats = defaultdict(int)
        for alert in self.alert_history:
            stats[alert['type']] += 1
        return dict(stats)

โš ๏ธ Common Pitfalls and Solutions

๐Ÿ˜ฑ Pitfall 1: Missing Permissions

# โŒ Wrong way - forgetting about permissions!
try:
    sniff(count=10)  # ๐Ÿ’ฅ Might fail without sudo/admin!
except PermissionError:
    print("๐Ÿ˜ฐ Need admin privileges!")

# โœ… Correct way - check permissions first!
import os
import sys

def check_permissions():
    if os.name == 'posix' and os.geteuid() != 0:
        print("โš ๏ธ This script requires root privileges!")
        print("๐Ÿ”ง Run with: sudo python script.py")
        sys.exit(1)
    elif os.name == 'nt' and not ctypes.windll.shell32.IsUserAnAdmin():
        print("โš ๏ธ This script requires administrator privileges!")
        sys.exit(1)
    print("โœ… Permissions OK! Starting monitor...")

check_permissions()

๐Ÿคฏ Pitfall 2: Memory Leaks in Long-Running Monitors

# โŒ Dangerous - storing all packets!
all_packets = []  # ๐Ÿ’ฅ Memory will explode!

def bad_handler(packet):
    all_packets.append(packet)  # ๐Ÿ˜ฑ Never-ending growth!

# โœ… Safe - use circular buffer or process immediately!
from collections import deque

class SafeMonitor:
    def __init__(self, max_packets=1000):
        # ๐Ÿ›ก๏ธ Limited size buffer
        self.recent_packets = deque(maxlen=max_packets)
        self.stats = defaultdict(int)  # ๐Ÿ“Š Aggregated stats only
        
    def process_packet(self, packet):
        # Process and aggregate, don't store raw packets
        if IP in packet:
            self.stats['total'] += 1
            self.stats[packet[IP].src] += 1
            
        # Only keep recent packets for analysis
        self.recent_packets.append({
            'time': time.time(),
            'size': len(packet),
            'src': packet[IP].src if IP in packet else None
        })

๐Ÿ› ๏ธ Best Practices

  1. ๐ŸŽฏ Use Filters: Donโ€™t capture everything - filter what you need!
  2. ๐Ÿ“ Process in Real-time: Analyze packets as they arrive, donโ€™t store all
  3. ๐Ÿ›ก๏ธ Handle Errors Gracefully: Network operations can fail anytime
  4. ๐ŸŽจ Use Threading: Separate capture from analysis for performance
  5. โœจ Monitor Resources: Track your monitorโ€™s CPU and memory usage

๐Ÿงช Hands-On Exercise

๐ŸŽฏ Challenge: Build a Network Anomaly Detector

Create a system that detects unusual network patterns:

๐Ÿ“‹ Requirements:

  • โœ… Baseline normal traffic patterns
  • ๐Ÿท๏ธ Detect traffic spikes and anomalies
  • ๐Ÿ‘ค Identify new devices on network
  • ๐Ÿ“… Time-based pattern analysis
  • ๐ŸŽจ Visual dashboard with statistics!

๐Ÿš€ Bonus Points:

  • Add machine learning for pattern recognition
  • Implement real-time notifications
  • Create traffic prediction models

๐Ÿ’ก Solution

๐Ÿ” Click to see solution
# ๐ŸŽฏ Network anomaly detection system!
import time
import numpy as np
from collections import defaultdict, deque
from datetime import datetime, timedelta
from scapy.all import *

class AnomalyDetector:
    def __init__(self, baseline_duration=300):  # 5 min baseline
        self.baseline_duration = baseline_duration
        self.start_time = time.time()
        self.is_baseline_complete = False
        
        # ๐Ÿ“Š Baseline statistics
        self.baseline_stats = {
            'avg_pps': 0,  # Packets per second
            'avg_bps': 0,  # Bytes per second
            'known_ips': set(),  # Known IP addresses
            'protocol_dist': defaultdict(int)  # Protocol distribution
        }
        
        # ๐Ÿ” Current monitoring data
        self.current_window = deque(maxlen=60)  # 1 minute window
        self.anomalies = []  # ๐Ÿšจ Detected anomalies
        self.new_devices = set()  # ๐Ÿ†• New devices detected
        
    # ๐Ÿ“Š Build baseline profile
    def build_baseline(self, packet):
        if IP in packet:
            self.baseline_stats['known_ips'].add(packet[IP].src)
            self.baseline_stats['known_ips'].add(packet[IP].dst)
            
        if TCP in packet:
            self.baseline_stats['protocol_dist']['TCP'] += 1
        elif UDP in packet:
            self.baseline_stats['protocol_dist']['UDP'] += 1
            
    # ๐Ÿ” Detect anomalies
    def detect_anomalies(self, packet):
        anomalies = []
        
        # ๐Ÿ†• New device detection
        if IP in packet:
            src_ip = packet[IP].src
            if src_ip not in self.baseline_stats['known_ips']:
                self.new_devices.add(src_ip)
                anomalies.append({
                    'type': 'NEW_DEVICE',
                    'severity': 'MEDIUM',
                    'message': f'๐Ÿ†• New device detected: {src_ip}',
                    'timestamp': datetime.now()
                })
                
        # ๐Ÿ“ˆ Traffic spike detection
        current_pps = len(self.current_window)
        if current_pps > self.baseline_stats['avg_pps'] * 2:
            anomalies.append({
                'type': 'TRAFFIC_SPIKE',
                'severity': 'HIGH',
                'message': f'๐Ÿ“ˆ Traffic spike: {current_pps} pps (baseline: {self.baseline_stats["avg_pps"]})',
                'timestamp': datetime.now()
            })
            
        # ๐Ÿ”„ Protocol anomaly detection
        if TCP in packet and self.baseline_stats['protocol_dist']['TCP'] == 0:
            anomalies.append({
                'type': 'UNEXPECTED_PROTOCOL',
                'severity': 'LOW',
                'message': '๐Ÿ”„ Unexpected TCP traffic detected',
                'timestamp': datetime.now()
            })
            
        return anomalies
        
    # ๐ŸŽฏ Process packet
    def process_packet(self, packet):
        current_time = time.time()
        
        # Build baseline first
        if not self.is_baseline_complete:
            self.build_baseline(packet)
            if current_time - self.start_time > self.baseline_duration:
                self.finalize_baseline()
                print("โœ… Baseline complete! Starting anomaly detection...")
        else:
            # Detect anomalies
            self.current_window.append(packet)
            anomalies = self.detect_anomalies(packet)
            
            for anomaly in anomalies:
                self.anomalies.append(anomaly)
                self.alert(anomaly)
                
    # ๐Ÿ”” Alert on anomaly
    def alert(self, anomaly):
        severity_emoji = {
            'CRITICAL': '๐Ÿ”ด',
            'HIGH': '๐ŸŸ ',
            'MEDIUM': '๐ŸŸก',
            'LOW': '๐ŸŸข'
        }
        
        emoji = severity_emoji.get(anomaly['severity'], 'โšช')
        print(f"\n{emoji} ANOMALY: {anomaly['message']}")
        
    # ๐Ÿ“Š Finalize baseline
    def finalize_baseline(self):
        self.is_baseline_complete = True
        # Calculate averages (simplified)
        self.baseline_stats['avg_pps'] = 10  # Simplified for demo
        
    # ๐Ÿ“ˆ Generate report
    def generate_report(self):
        print("\n" + "="*60)
        print("๐Ÿ” Anomaly Detection Report")
        print("="*60)
        print(f"๐Ÿ“Š Baseline IPs: {len(self.baseline_stats['known_ips'])}")
        print(f"๐Ÿ†• New devices detected: {len(self.new_devices)}")
        print(f"๐Ÿšจ Total anomalies: {len(self.anomalies)}")
        
        # Group anomalies by type
        anomaly_types = defaultdict(int)
        for anomaly in self.anomalies:
            anomaly_types[anomaly['type']] += 1
            
        print("\n๐Ÿ“Š Anomaly Breakdown:")
        for atype, count in anomaly_types.items():
            print(f"   {atype}: {count}")
            
        # Show recent anomalies
        if self.anomalies:
            print("\n๐Ÿ” Recent Anomalies:")
            for anomaly in self.anomalies[-5:]:
                print(f"   [{anomaly['timestamp'].strftime('%H:%M:%S')}] {anomaly['message']}")

# ๐ŸŽฎ Test the anomaly detector!
detector = AnomalyDetector(baseline_duration=30)  # 30 sec baseline for demo

print("๐Ÿ” Starting anomaly detector...")
print("๐Ÿ“Š Building baseline for 30 seconds...")

try:
    sniff(prn=detector.process_packet, timeout=60)
except KeyboardInterrupt:
    print("\nโน๏ธ Stopping detector...")
    
detector.generate_report()

๐ŸŽ“ Key Takeaways

Youโ€™ve learned so much! Hereโ€™s what you can now do:

  • โœ… Capture network packets with confidence ๐Ÿ’ช
  • โœ… Analyze traffic patterns like a pro ๐Ÿ›ก๏ธ
  • โœ… Detect security threats in real-time ๐ŸŽฏ
  • โœ… Monitor network performance effectively ๐Ÿ›
  • โœ… Build custom monitoring tools with Python! ๐Ÿš€

Remember: Network monitoring is powerful but comes with responsibility! Always respect privacy and follow legal requirements. ๐Ÿค

๐Ÿค Next Steps

Congratulations! ๐ŸŽ‰ Youโ€™ve mastered network monitoring and traffic analysis!

Hereโ€™s what to do next:

  1. ๐Ÿ’ป Practice with the exercises above
  2. ๐Ÿ—๏ธ Build a custom monitoring tool for your network
  3. ๐Ÿ“š Move on to our next tutorial: Building Proxy Servers
  4. ๐ŸŒŸ Share your monitoring insights with others!

Remember: Every network security expert started by understanding traffic patterns. Keep monitoring, keep learning, and most importantly, stay secure! ๐Ÿš€


Happy monitoring! ๐ŸŽ‰๐Ÿš€โœจ