Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on network protocols and layers! ๐ In this guide, weโll explore how networks communicate using the OSI and TCP/IP models, and how to work with different protocol layers in Python.
Youโll discover how understanding network layers can transform your Python networking skills. Whether youโre building web applications ๐, creating network tools ๐ ๏ธ, or debugging connectivity issues ๐, understanding protocol layers is essential for writing robust network code.
By the end of this tutorial, youโll feel confident working with different network layers in your Python projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Network Protocols
๐ค What are Network Protocols?
Network protocols are like languages that computers use to talk to each other ๐ฃ๏ธ. Think of them as a set of rules and formats that ensure devices can communicate reliably across networks.
In Python terms, protocols define how data is:
- โจ Formatted and structured for transmission
- ๐ Sent across different network layers
- ๐ก๏ธ Verified and error-checked
- ๐ฆ Packaged and unpacked at each layer
๐ก The OSI Model vs TCP/IP Model
Hereโs why developers need to understand both models:
- OSI Model (7 Layers) ๐๏ธ: Theoretical framework for understanding
- TCP/IP Model (4 Layers) ๐: Practical implementation used on the Internet
- Clear Abstraction ๐: Each layer has specific responsibilities
- Debugging Power ๐ง: Isolate issues to specific layers
Real-world example: Imagine sending a package ๐ฆ. The OSI model is like having different departments handle packaging, addressing, routing, and delivery - each with its own protocols!
๐ง Basic Syntax and Usage
๐ Working with Different Layers
Letโs start with exploring network layers in Python:
# ๐ Hello, Network Layers!
import socket
import struct
import binascii
# ๐จ Layer 7: Application Layer (HTTP)
def application_layer_example():
# ๐ Simple HTTP request
http_request = (
"GET / HTTP/1.1\r\n"
"Host: example.com\r\n"
"User-Agent: Python Network Explorer ๐\r\n"
"\r\n"
)
print(f"๐ฑ Application Layer (HTTP):\n{http_request}")
return http_request.encode()
# ๐ Layer 4: Transport Layer (TCP)
def transport_layer_example():
# ๐ Create TCP socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# ๐ Get socket info
print(f"๐ Transport Layer (TCP):")
print(f" Protocol: {tcp_socket.proto}")
print(f" Type: {tcp_socket.type}")
print(f" Family: {tcp_socket.family}")
return tcp_socket
# ๐ Layer 3: Network Layer (IP)
def network_layer_example():
# ๐ฏ Parse IP header
def parse_ip_header(data):
# ๐ฆ First 20 bytes are IP header
ip_header = struct.unpack('!BBHHHBBH4s4s', data[:20])
version = ip_header[0] >> 4
header_length = (ip_header[0] & 0xF) * 4
ttl = ip_header[5]
protocol = ip_header[6]
src_ip = socket.inet_ntoa(ip_header[8])
dst_ip = socket.inet_ntoa(ip_header[9])
return {
'version': version,
'header_length': header_length,
'ttl': ttl,
'protocol': protocol,
'source_ip': src_ip,
'destination_ip': dst_ip
}
print("๐ Network Layer (IP) Parser Ready!")
return parse_ip_header
๐ก Explanation: Notice how each layer has its own responsibilities! The application layer handles HTTP, transport handles TCP connections, and network handles IP addressing.
๐ฏ Layer Interaction Patterns
Here are patterns for working with protocol layers:
# ๐๏ธ Pattern 1: Socket Programming at Multiple Layers
class NetworkLayerExplorer:
def __init__(self):
self.layers = {
7: "Application ๐ฑ",
6: "Presentation ๐จ",
5: "Session ๐",
4: "Transport ๐",
3: "Network ๐",
2: "Data Link ๐",
1: "Physical โก"
}
# ๐ฎ Create raw socket for lower layer access
def create_raw_socket(self):
try:
# ๐ก๏ธ Requires admin privileges
raw_socket = socket.socket(
socket.AF_INET,
socket.SOCK_RAW,
socket.IPPROTO_TCP
)
print("โ
Raw socket created for layer 3-4 access!")
return raw_socket
except PermissionError:
print("โ ๏ธ Need admin privileges for raw sockets!")
return None
# ๐ Analyze packet at different layers
def analyze_packet(self, data):
analysis = {}
# ๐ Layer 3: IP Header
if len(data) >= 20:
ip_version = (data[0] >> 4) & 0xF
ip_header_length = (data[0] & 0xF) * 4
analysis['layer_3'] = {
'name': 'IP',
'version': ip_version,
'header_length': ip_header_length
}
# ๐ Layer 4: TCP/UDP Header
if len(data) >= 40: # IP + TCP headers
protocol = data[9]
if protocol == 6: # TCP
tcp_start = ip_header_length
src_port = struct.unpack('!H', data[tcp_start:tcp_start+2])[0]
dst_port = struct.unpack('!H', data[tcp_start+2:tcp_start+4])[0]
analysis['layer_4'] = {
'name': 'TCP',
'source_port': src_port,
'destination_port': dst_port
}
return analysis
# ๐จ Pattern 2: Protocol Encapsulation
class ProtocolStack:
def __init__(self):
self.layers = []
# ๐ฆ Encapsulate data through layers
def encapsulate(self, data, layer_name):
header = f"[{layer_name} Header]"
encapsulated = f"{header}{data}"
self.layers.append({
'layer': layer_name,
'data': encapsulated
})
print(f"๐ฆ {layer_name} encapsulated: {encapsulated[:50]}...")
return encapsulated
# ๐ Decapsulate through layers
def decapsulate(self, data):
for layer in reversed(self.layers):
header = f"[{layer['layer']} Header]"
if data.startswith(header):
data = data[len(header):]
print(f"๐ {layer['layer']} decapsulated")
return data
๐ก Practical Examples
๐ Example 1: HTTP Protocol Analyzer
Letโs build a practical HTTP protocol analyzer:
# ๐ HTTP Protocol Analyzer
import socket
import re
from datetime import datetime
class HTTPAnalyzer:
def __init__(self):
self.methods = ['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PATCH']
self.versions = ['HTTP/1.0', 'HTTP/1.1', 'HTTP/2']
self.emoji_status = {
200: 'โ
',
404: 'โ',
500: '๐ฅ',
301: 'โช๏ธ',
403: '๐'
}
# ๐ฑ Parse HTTP request
def parse_request(self, request_data):
lines = request_data.split('\r\n')
if not lines:
return None
# ๐ฏ Parse request line
request_line = lines[0].split(' ')
if len(request_line) != 3:
return None
method, path, version = request_line
# ๐ Parse headers
headers = {}
for i in range(1, len(lines)):
if lines[i] == '':
break
if ':' in lines[i]:
key, value = lines[i].split(':', 1)
headers[key.strip()] = value.strip()
# ๐จ Create analysis
analysis = {
'method': method,
'path': path,
'version': version,
'headers': headers,
'timestamp': datetime.now().isoformat(),
'emoji': '๐'
}
return analysis
# ๐ Analyze HTTP response
def analyze_response(self, response_data):
lines = response_data.split('\r\n')
if not lines:
return None
# ๐ Parse status line
status_line = lines[0].split(' ', 2)
if len(status_line) < 2:
return None
version = status_line[0]
status_code = int(status_line[1])
status_text = status_line[2] if len(status_line) > 2 else ''
# ๐ฏ Get emoji for status
emoji = self.emoji_status.get(status_code, '๐')
return {
'version': version,
'status_code': status_code,
'status_text': status_text,
'emoji': emoji,
'meaning': self._get_status_meaning(status_code)
}
# ๐ก Get status meaning
def _get_status_meaning(self, code):
meanings = {
200: "Success! ๐ Everything worked perfectly",
404: "Not Found ๐ The resource doesn't exist",
500: "Server Error ๐ฅ Something went wrong on the server",
301: "Moved Permanently โช๏ธ Resource has a new location",
403: "Forbidden ๐ You don't have permission"
}
return meanings.get(code, f"Status code {code}")
# ๐ ๏ธ Simple HTTP client
def make_request(self, host, port=80, path='/', method='GET'):
try:
# ๐ Create socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
# ๐ Connect
print(f"๐ Connecting to {host}:{port}...")
sock.connect((host, port))
# ๐ค Send request
request = f"{method} {path} HTTP/1.1\r\n"
request += f"Host: {host}\r\n"
request += "User-Agent: Python HTTP Analyzer ๐\r\n"
request += "Connection: close\r\n"
request += "\r\n"
sock.send(request.encode())
print(f"๐ค Request sent!")
# ๐ฅ Receive response
response = b''
while True:
data = sock.recv(1024)
if not data:
break
response += data
sock.close()
# ๐จ Analyze response
response_text = response.decode('utf-8', errors='ignore')
analysis = self.analyze_response(response_text)
if analysis:
print(f"๐ฅ Response: {analysis['emoji']} {analysis['status_code']} - {analysis['meaning']}")
return response_text
except Exception as e:
print(f"โ Error: {e}")
return None
# ๐ฎ Let's use it!
analyzer = HTTPAnalyzer()
# ๐ Analyze a sample request
sample_request = """GET /api/users HTTP/1.1
Host: api.example.com
User-Agent: Mozilla/5.0
Accept: application/json
Authorization: Bearer token123"""
result = analyzer.parse_request(sample_request)
if result:
print(f"๐ฏ Parsed Request:")
print(f" Method: {result['method']}")
print(f" Path: {result['path']}")
print(f" Headers: {len(result['headers'])} found")
๐ฏ Try it yourself: Add support for parsing HTTP response bodies and handling different content types!
๐ฎ Example 2: Multi-Layer Network Monitor
Letโs create a network monitor that works across layers:
# ๐ Multi-Layer Network Monitor
import socket
import struct
import threading
import time
from collections import defaultdict
class NetworkMonitor:
def __init__(self):
self.stats = defaultdict(lambda: {
'packets': 0,
'bytes': 0,
'protocols': defaultdict(int)
})
self.running = False
self.emoji_protocols = {
6: '๐ TCP',
17: '๐จ UDP',
1: '๐ ICMP',
2: '๐ IGMP'
}
# ๐ฏ Protocol number to name
def get_protocol_name(self, protocol_num):
return self.emoji_protocols.get(protocol_num, f'๐ Protocol {protocol_num}')
# ๐ Parse Ethernet frame (Layer 2)
def parse_ethernet_frame(self, data):
if len(data) < 14:
return None
# ๐ Ethernet header is 14 bytes
eth_header = struct.unpack('!6s6sH', data[:14])
dst_mac = ':'.join(f'{b:02x}' for b in eth_header[0])
src_mac = ':'.join(f'{b:02x}' for b in eth_header[1])
eth_type = eth_header[2]
return {
'destination_mac': dst_mac,
'source_mac': src_mac,
'type': eth_type,
'payload': data[14:]
}
# ๐ Parse IP packet (Layer 3)
def parse_ip_packet(self, data):
if len(data) < 20:
return None
# ๐ฆ IP header (first 20 bytes)
ip_header = struct.unpack('!BBHHHBBH4s4s', data[:20])
version = ip_header[0] >> 4
header_length = (ip_header[0] & 0xF) * 4
total_length = ip_header[2]
protocol = ip_header[6]
src_ip = socket.inet_ntoa(ip_header[8])
dst_ip = socket.inet_ntoa(ip_header[9])
return {
'version': version,
'header_length': header_length,
'total_length': total_length,
'protocol': protocol,
'source_ip': src_ip,
'destination_ip': dst_ip,
'payload': data[header_length:]
}
# ๐ Parse TCP segment (Layer 4)
def parse_tcp_segment(self, data):
if len(data) < 20:
return None
# ๐ฏ TCP header
tcp_header = struct.unpack('!HHLLHHHH', data[:20])
src_port = tcp_header[0]
dst_port = tcp_header[1]
sequence = tcp_header[2]
acknowledgment = tcp_header[3]
offset = (tcp_header[4] >> 12) * 4
flags = tcp_header[4] & 0x1FF
# ๐จ Parse flags
flag_names = []
if flags & 0x01: flag_names.append('FIN')
if flags & 0x02: flag_names.append('SYN')
if flags & 0x04: flag_names.append('RST')
if flags & 0x08: flag_names.append('PSH')
if flags & 0x10: flag_names.append('ACK')
if flags & 0x20: flag_names.append('URG')
return {
'source_port': src_port,
'destination_port': dst_port,
'sequence': sequence,
'acknowledgment': acknowledgment,
'flags': flag_names,
'payload': data[offset:]
}
# ๐ Update statistics
def update_stats(self, src_ip, dst_ip, protocol, packet_size):
# ๐ฏ Update source stats
self.stats[src_ip]['packets'] += 1
self.stats[src_ip]['bytes'] += packet_size
self.stats[src_ip]['protocols'][protocol] += 1
# ๐ฏ Update destination stats
self.stats[dst_ip]['packets'] += 1
self.stats[dst_ip]['bytes'] += packet_size
self.stats[dst_ip]['protocols'][protocol] += 1
# ๐ Display statistics
def display_stats(self):
print("\n๐ Network Statistics Dashboard")
print("=" * 60)
for ip, data in sorted(self.stats.items(),
key=lambda x: x[1]['packets'],
reverse=True)[:10]:
print(f"\n๐ IP: {ip}")
print(f" ๐ฆ Packets: {data['packets']:,}")
print(f" ๐พ Bytes: {data['bytes']:,}")
print(f" ๐ง Protocols:")
for proto, count in data['protocols'].items():
proto_name = self.get_protocol_name(proto)
print(f" {proto_name}: {count:,} packets")
# ๐ฎ Demo mode - simulate traffic
def simulate_traffic(self):
print("๐ฎ Starting network simulation...")
# ๐ Simulate some traffic
test_ips = [
('192.168.1.100', '8.8.8.8', 6, 1500), # TCP to Google DNS
('10.0.0.5', '93.184.216.34', 17, 512), # UDP to example.com
('172.16.0.10', '1.1.1.1', 1, 64), # ICMP to Cloudflare
('192.168.1.100', '140.82.114.4', 6, 8192) # TCP to GitHub
]
for _ in range(10):
for src, dst, proto, size in test_ips:
self.update_stats(src, dst, proto, size)
time.sleep(0.1)
self.display_stats()
# ๐ Run the monitor
monitor = NetworkMonitor()
monitor.simulate_traffic()
๐ Advanced Concepts
๐งโโ๏ธ Advanced Topic 1: Raw Socket Programming
When youโre ready to level up, try working with raw sockets:
# ๐ฏ Advanced raw socket packet crafting
import socket
import struct
import random
class PacketCrafter:
def __init__(self):
self.packet_id = random.randint(1, 65535)
# ๐ช Calculate checksum
def checksum(self, data):
if len(data) % 2:
data += b'\x00'
s = 0
for i in range(0, len(data), 2):
w = (data[i] << 8) + data[i + 1]
s = (s + w) & 0xFFFF
return ~s & 0xFFFF
# ๐ Craft IP header
def craft_ip_header(self, src_ip, dst_ip, protocol=6):
# ๐ฆ IP header fields
version = 4
ihl = 5 # Header length
tos = 0
total_length = 20 # Will be updated
identification = self.packet_id
flags = 0
fragment_offset = 0
ttl = 64
header_checksum = 0
# ๐จ Pack header
header = struct.pack('!BBHHHBBH4s4s',
(version << 4) | ihl,
tos,
total_length,
identification,
(flags << 13) | fragment_offset,
ttl,
protocol,
header_checksum,
socket.inet_aton(src_ip),
socket.inet_aton(dst_ip)
)
# โจ Calculate checksum
header_checksum = self.checksum(header)
# ๐ง Repack with checksum
header = struct.pack('!BBHHHBBH4s4s',
(version << 4) | ihl,
tos,
total_length,
identification,
(flags << 13) | fragment_offset,
ttl,
protocol,
header_checksum,
socket.inet_aton(src_ip),
socket.inet_aton(dst_ip)
)
return header
# ๐ Craft TCP header
def craft_tcp_header(self, src_port, dst_port, flags='S'):
# ๐ฏ TCP header fields
sequence = random.randint(0, 2**32 - 1)
acknowledgment = 0
data_offset = 5 # 20 bytes
# ๐จ Convert flags
flag_bits = 0
if 'F' in flags: flag_bits |= 0x01 # FIN
if 'S' in flags: flag_bits |= 0x02 # SYN
if 'R' in flags: flag_bits |= 0x04 # RST
if 'P' in flags: flag_bits |= 0x08 # PSH
if 'A' in flags: flag_bits |= 0x10 # ACK
if 'U' in flags: flag_bits |= 0x20 # URG
window = 8192
checksum = 0
urgent_pointer = 0
# ๐ฆ Pack TCP header
header = struct.pack('!HHLLHHHH',
src_port,
dst_port,
sequence,
acknowledgment,
(data_offset << 12) | flag_bits,
window,
checksum,
urgent_pointer
)
print(f"๐ Crafted TCP packet with flags: {flags}")
return header
๐๏ธ Advanced Topic 2: Protocol State Machines
For the brave developers, implement protocol state machines:
# ๐ TCP State Machine Implementation
class TCPStateMachine:
def __init__(self):
self.states = {
'CLOSED': '๐',
'LISTEN': '๐',
'SYN_SENT': '๐ค',
'SYN_RECEIVED': '๐ฅ',
'ESTABLISHED': 'โ
',
'FIN_WAIT_1': '๐',
'FIN_WAIT_2': '๐ค',
'TIME_WAIT': 'โฐ',
'CLOSE_WAIT': 'โณ',
'LAST_ACK': '๐'
}
self.current_state = 'CLOSED'
self.transitions = []
# ๐ฏ State transition
def transition(self, event):
old_state = self.current_state
new_state = self._get_next_state(event)
if new_state:
self.current_state = new_state
emoji_old = self.states.get(old_state, 'โ')
emoji_new = self.states.get(new_state, 'โ')
self.transitions.append({
'from': old_state,
'to': new_state,
'event': event,
'emoji': f"{emoji_old} โ {emoji_new}"
})
print(f"๐ State transition: {emoji_old} {old_state} โ {emoji_new} {new_state}")
return True
print(f"โ Invalid transition from {old_state} with event {event}")
return False
# ๐ง State transition logic
def _get_next_state(self, event):
transitions = {
('CLOSED', 'LISTEN'): 'LISTEN',
('CLOSED', 'CONNECT'): 'SYN_SENT',
('LISTEN', 'SYN'): 'SYN_RECEIVED',
('SYN_SENT', 'SYN_ACK'): 'ESTABLISHED',
('SYN_RECEIVED', 'ACK'): 'ESTABLISHED',
('ESTABLISHED', 'FIN'): 'FIN_WAIT_1',
('FIN_WAIT_1', 'ACK'): 'FIN_WAIT_2',
('FIN_WAIT_2', 'FIN'): 'TIME_WAIT',
('ESTABLISHED', 'FIN_RECEIVED'): 'CLOSE_WAIT',
('CLOSE_WAIT', 'CLOSE'): 'LAST_ACK',
('LAST_ACK', 'ACK'): 'CLOSED',
('TIME_WAIT', 'TIMEOUT'): 'CLOSED'
}
return transitions.get((self.current_state, event))
# ๐ Show connection lifecycle
def show_lifecycle(self):
print("\n๐ฏ TCP Connection Lifecycle:")
for transition in self.transitions:
print(f" {transition['emoji']} on {transition['event']}")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Forgetting Network Byte Order
# โ Wrong way - host byte order issues!
def pack_wrong():
port = 8080
# This will be interpreted differently on different systems!
packed = struct.pack('H', port) # Missing network byte order
return packed
# โ
Correct way - always use network byte order!
def pack_correct():
port = 8080
# Use '!' for network byte order (big-endian)
packed = struct.pack('!H', port) # โ
Network byte order
return packed
# ๐ก๏ธ Helper function for safety
def safe_pack_port(port):
if not 0 <= port <= 65535:
raise ValueError(f"โ ๏ธ Invalid port: {port}")
return struct.pack('!H', port)
๐คฏ Pitfall 2: Not Handling Partial Receives
# โ Dangerous - might not receive all data!
def receive_wrong(sock):
data = sock.recv(1024) # ๐ฅ Might be partial!
return data
# โ
Safe - handle partial receives!
def receive_all(sock, expected_size):
buffer = b''
bytes_received = 0
while bytes_received < expected_size:
chunk = sock.recv(min(expected_size - bytes_received, 4096))
if not chunk:
print("โ ๏ธ Connection closed by peer!")
break
buffer += chunk
bytes_received += len(chunk)
print(f"๐ฅ Received {bytes_received}/{expected_size} bytes")
return buffer
# ๐ฏ Even better - with timeout handling
def receive_with_timeout(sock, expected_size, timeout=5):
sock.settimeout(timeout)
try:
return receive_all(sock, expected_size)
except socket.timeout:
print("โฐ Receive timeout!")
return None
๐ ๏ธ Best Practices
- ๐ฏ Always Use Network Byte Order: Use struct with โ!โ prefix
- ๐ Document Protocol Formats: Clear documentation saves debugging time
- ๐ก๏ธ Validate All Input: Never trust network data
- ๐จ Use Meaningful Constants: Define protocol constants clearly
- โจ Handle Errors Gracefully: Networks are unreliable by nature
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Protocol Analyzer
Create a multi-protocol network analyzer:
๐ Requirements:
- โ Support for TCP, UDP, and ICMP protocols
- ๐ท๏ธ Parse headers at layers 3 and 4
- ๐ค Track connections and statistics
- ๐ Log timestamps for all packets
- ๐จ Display protocol information with emojis!
๐ Bonus Points:
- Add support for HTTP parsing
- Implement connection tracking
- Create a real-time dashboard
๐ก Solution
๐ Click to see solution
# ๐ฏ Multi-Protocol Network Analyzer
import socket
import struct
import threading
import time
from datetime import datetime
from collections import defaultdict
class ProtocolAnalyzer:
def __init__(self):
self.protocols = {
1: {'name': 'ICMP', 'emoji': '๐', 'parser': self.parse_icmp},
6: {'name': 'TCP', 'emoji': '๐', 'parser': self.parse_tcp},
17: {'name': 'UDP', 'emoji': '๐จ', 'parser': self.parse_udp}
}
self.connections = defaultdict(lambda: {
'packets': 0,
'bytes': 0,
'start_time': datetime.now(),
'last_seen': datetime.now()
})
self.stats = defaultdict(int)
# ๐ Parse IP header
def parse_ip_header(self, data):
if len(data) < 20:
return None
header = struct.unpack('!BBHHHBBH4s4s', data[:20])
version = header[0] >> 4
ihl = (header[0] & 0xF) * 4
total_length = header[2]
protocol = header[6]
src_ip = socket.inet_ntoa(header[8])
dst_ip = socket.inet_ntoa(header[9])
return {
'version': version,
'header_length': ihl,
'total_length': total_length,
'protocol': protocol,
'source_ip': src_ip,
'destination_ip': dst_ip,
'payload': data[ihl:]
}
# ๐ Parse TCP
def parse_tcp(self, data, ip_info):
if len(data) < 20:
return None
tcp_header = struct.unpack('!HHLLHHHH', data[:20])
src_port = tcp_header[0]
dst_port = tcp_header[1]
sequence = tcp_header[2]
flags = tcp_header[4] & 0x1FF
# ๐จ Format connection key
conn_key = f"{ip_info['source_ip']}:{src_port} -> {ip_info['destination_ip']}:{dst_port}"
# ๐ Update connection tracking
self.connections[conn_key]['packets'] += 1
self.connections[conn_key]['bytes'] += ip_info['total_length']
self.connections[conn_key]['last_seen'] = datetime.now()
# ๐ฏ Parse flags
flag_str = ""
if flags & 0x02: flag_str += "S" # SYN
if flags & 0x10: flag_str += "A" # ACK
if flags & 0x01: flag_str += "F" # FIN
if flags & 0x04: flag_str += "R" # RST
return {
'source_port': src_port,
'destination_port': dst_port,
'sequence': sequence,
'flags': flag_str,
'connection': conn_key
}
# ๐จ Parse UDP
def parse_udp(self, data, ip_info):
if len(data) < 8:
return None
udp_header = struct.unpack('!HHHH', data[:8])
src_port = udp_header[0]
dst_port = udp_header[1]
length = udp_header[2]
return {
'source_port': src_port,
'destination_port': dst_port,
'length': length
}
# ๐ Parse ICMP
def parse_icmp(self, data, ip_info):
if len(data) < 8:
return None
icmp_header = struct.unpack('!BBHHH', data[:8])
icmp_type = icmp_header[0]
code = icmp_header[1]
# ๐จ ICMP type meanings
type_meanings = {
0: 'Echo Reply ๐ฃ',
3: 'Destination Unreachable โ',
8: 'Echo Request ๐ข',
11: 'Time Exceeded โฐ'
}
return {
'type': icmp_type,
'code': code,
'meaning': type_meanings.get(icmp_type, f'Type {icmp_type}')
}
# ๐ Analyze packet
def analyze_packet(self, data):
# ๐ Parse IP header
ip_info = self.parse_ip_header(data)
if not ip_info:
return None
# ๐ฏ Get protocol info
protocol_info = self.protocols.get(ip_info['protocol'])
if not protocol_info:
return None
# ๐ Update stats
self.stats[protocol_info['name']] += 1
# ๐ง Parse protocol-specific data
proto_data = protocol_info['parser'](ip_info['payload'], ip_info)
# ๐จ Create analysis result
result = {
'timestamp': datetime.now().isoformat(),
'emoji': protocol_info['emoji'],
'protocol': protocol_info['name'],
'source': ip_info['source_ip'],
'destination': ip_info['destination_ip'],
'size': ip_info['total_length'],
'details': proto_data
}
return result
# ๐ Display statistics
def show_stats(self):
print("\n๐ Protocol Statistics")
print("=" * 40)
total_packets = sum(self.stats.values())
if total_packets == 0:
print("No packets captured yet!")
return
for protocol, count in sorted(self.stats.items()):
percentage = (count / total_packets) * 100
proto_info = next((p for p in self.protocols.values()
if p['name'] == protocol), {})
emoji = proto_info.get('emoji', '๐ฆ')
print(f"{emoji} {protocol}: {count:,} packets ({percentage:.1f}%)")
print(f"\n๐ Total packets: {total_packets:,}")
# ๐ Show active connections
print("\n๐ Active TCP Connections")
print("=" * 40)
for conn, info in sorted(self.connections.items(),
key=lambda x: x[1]['packets'],
reverse=True)[:5]:
duration = (info['last_seen'] - info['start_time']).total_seconds()
print(f"\n{conn}")
print(f" ๐ฆ Packets: {info['packets']:,}")
print(f" ๐พ Bytes: {info['bytes']:,}")
print(f" โฑ๏ธ Duration: {duration:.1f}s")
# ๐ฎ Demo mode
def demo(self):
print("๐ฎ Running Protocol Analyzer Demo...")
# Simulate some packets
test_packets = [
# TCP SYN
{'protocol': 6, 'src': '192.168.1.100', 'dst': '8.8.8.8',
'src_port': 54321, 'dst_port': 80, 'flags': 'S'},
# UDP DNS
{'protocol': 17, 'src': '192.168.1.100', 'dst': '8.8.8.8',
'src_port': 54322, 'dst_port': 53},
# ICMP Echo
{'protocol': 1, 'src': '192.168.1.100', 'dst': '8.8.8.8',
'type': 8},
# TCP ACK
{'protocol': 6, 'src': '8.8.8.8', 'dst': '192.168.1.100',
'src_port': 80, 'dst_port': 54321, 'flags': 'SA'}
]
for packet in test_packets * 3:
# Simulate packet analysis
if packet['protocol'] == 6:
conn_key = f"{packet['src']}:{packet['src_port']} -> {packet['dst']}:{packet['dst_port']}"
self.connections[conn_key]['packets'] += 1
self.connections[conn_key]['bytes'] += 1500
self.stats['TCP'] += 1
elif packet['protocol'] == 17:
self.stats['UDP'] += 1
elif packet['protocol'] == 1:
self.stats['ICMP'] += 1
time.sleep(0.2)
self.show_stats()
# ๐ Run the analyzer
analyzer = ProtocolAnalyzer()
analyzer.demo()
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Understand network protocol layers with confidence ๐ช
- โ Work with different OSI and TCP/IP layers in Python ๐ก๏ธ
- โ Parse and analyze network protocols like a pro ๐ฏ
- โ Build network monitoring tools using Python ๐
- โ Handle low-level networking with raw sockets! ๐
Remember: Network protocols are the foundation of all internet communication. Understanding them gives you superpowers! ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered network protocol layers!
Hereโs what to do next:
- ๐ป Practice with the packet analyzer exercise above
- ๐๏ธ Build a simple protocol implementation (like a chat protocol)
- ๐ Move on to our next tutorial: UDP Sockets and Connectionless Communication
- ๐ Experiment with tools like Wireshark to see real protocols in action!
Remember: Every network expert started by understanding the basics. Keep exploring, keep learning, and most importantly, have fun with networking! ๐
Happy networking! ๐๐โจ