Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand port scanning fundamentals ๐ฏ
- Apply network discovery in real projects ๐๏ธ
- Debug common networking issues ๐
- Write clean, Pythonic network code โจ
๐ฏ Introduction
Welcome to this exciting tutorial on port scanning and network discovery! ๐ In this guide, weโll explore how to discover services and open ports on network hosts using Python.
Youโll discover how port scanning can transform your network administration and security testing capabilities. Whether youโre monitoring network infrastructure ๐ฅ๏ธ, performing security audits ๐, or troubleshooting connectivity issues ๐ง, understanding port scanning is essential for network professionals.
By the end of this tutorial, youโll feel confident implementing network discovery tools in your own projects! Letโs dive in! ๐โโ๏ธ
๐ Understanding Port Scanning
๐ค What is Port Scanning?
Port scanning is like checking which doors and windows are open in a building ๐ข. Think of it as a network reconnaissance technique that helps you discover which services are available on a host.
In networking terms, port scanning involves systematically probing a server or host for open ports ๐. This means you can:
- โจ Discover running services
- ๐ Map network topology
- ๐ก๏ธ Identify potential vulnerabilities
๐ก Why Use Port Scanning?
Hereโs why network professionals love port scanning:
- Network Discovery ๐: Find active hosts and services
- Security Auditing ๐: Identify exposed services
- Troubleshooting ๐ง: Diagnose connectivity issues
- Service Monitoring ๐: Track service availability
Real-world example: Imagine managing a corporate network ๐ข. With port scanning, you can quickly identify unauthorized services or verify that critical services are accessible.
๐ง Basic Syntax and Usage
๐ Simple Port Check
Letโs start with a friendly example:
# ๐ Hello, Network Discovery!
import socket
import sys
def check_port(host, port):
"""
Check if a port is open on a host
"""
# ๐จ Create a socket object
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # โฑ๏ธ 1 second timeout
try:
# ๐ Attempt to connect
result = sock.connect_ex((host, port))
sock.close()
# โ
Port is open!
if result == 0:
return True
else:
return False
except socket.gaierror:
# ๐ซ Hostname couldn't be resolved
print(f"โ Hostname {host} could not be resolved")
return False
except socket.error:
# ๐ฅ Couldn't connect to server
print(f"โ Could not connect to {host}")
return False
# ๐ฎ Let's test it!
if check_port("google.com", 80):
print("โ
Port 80 is open on google.com!")
else:
print("โ Port 80 is closed on google.com")
๐ก Explanation: Notice how we use socket programming to attempt connections! The connect_ex()
method returns 0 if the connection succeeds.
๐ฏ Common Port Scanning Patterns
Here are patterns youโll use frequently:
# ๐๏ธ Pattern 1: Multiple port scanning
import concurrent.futures
import socket
def scan_port(host, port):
"""Scan a single port"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((host, port))
sock.close()
return port if result == 0 else None
def scan_ports(host, start_port, end_port, max_threads=100):
"""
# ๐ Scan multiple ports concurrently
"""
open_ports = []
# ๐ฏ Use thread pool for concurrent scanning
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# ๐ Submit all port scans
futures = {
executor.submit(scan_port, host, port): port
for port in range(start_port, end_port + 1)
}
# ๐ Collect results
for future in concurrent.futures.as_completed(futures):
port = future.result()
if port:
open_ports.append(port)
print(f"โจ Found open port: {port}")
return sorted(open_ports)
# ๐จ Pattern 2: Service identification
def identify_service(port):
"""Identify common services by port number"""
services = {
21: "FTP ๐",
22: "SSH ๐",
23: "Telnet ๐",
25: "SMTP ๐ง",
53: "DNS ๐",
80: "HTTP ๐",
110: "POP3 ๐ฎ",
443: "HTTPS ๐",
3306: "MySQL ๐๏ธ",
3389: "RDP ๐ฅ๏ธ",
8080: "HTTP-Proxy ๐"
}
return services.get(port, "Unknown โ")
๐ก Practical Examples
๐ก๏ธ Example 1: Network Service Scanner
Letโs build a comprehensive network scanner:
# ๐ก๏ธ Advanced network service scanner
import socket
import threading
from datetime import datetime
import queue
class NetworkScanner:
def __init__(self, host, port_range=(1, 1000), threads=50):
self.host = host
self.port_range = port_range
self.threads = threads
self.open_ports = []
self.lock = threading.Lock()
def scan_port(self, port):
"""
# ๐ Scan a single port
"""
try:
# ๐ฏ Create socket with timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
# ๐ Try to connect
result = sock.connect_ex((self.host, port))
sock.close()
if result == 0:
# โ
Port is open!
service = self.get_service_banner(port)
with self.lock:
self.open_ports.append({
'port': port,
'service': identify_service(port),
'banner': service
})
print(f"โจ Port {port}: OPEN - {identify_service(port)}")
except Exception as e:
pass # ๐คซ Silently skip errors
def get_service_banner(self, port):
"""
# ๐ Try to grab service banner
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.host, port))
# ๐ Send a probe
sock.send(b'Hello\r\n')
banner = sock.recv(1024).decode('utf-8', errors='ignore').strip()
sock.close()
return banner[:50] if banner else ""
except:
return ""
def worker(self, port_queue):
"""
# ๐ Worker thread for scanning
"""
while True:
port = port_queue.get()
if port is None:
break
self.scan_port(port)
port_queue.task_done()
def scan(self):
"""
# ๐ Start the scanning process
"""
print(f"๐ฏ Starting scan on {self.host}")
print(f"๐ Port range: {self.port_range[0]}-{self.port_range[1]}")
print(f"โก Using {self.threads} threads\n")
start_time = datetime.now()
# ๐ฆ Create work queue
port_queue = queue.Queue()
# ๐จ Create and start threads
threads = []
for _ in range(self.threads):
t = threading.Thread(target=self.worker, args=(port_queue,))
t.start()
threads.append(t)
# ๐ Add ports to queue
for port in range(self.port_range[0], self.port_range[1] + 1):
port_queue.put(port)
# ๐ Wait for completion
port_queue.join()
# ๐ Stop workers
for _ in range(self.threads):
port_queue.put(None)
for t in threads:
t.join()
# ๐ Display results
duration = datetime.now() - start_time
self.display_results(duration)
def display_results(self, duration):
"""
# ๐ Display scan results
"""
print(f"\n{'='*50}")
print(f"๐ Scan Complete!")
print(f"โฑ๏ธ Duration: {duration}")
print(f"๐ฏ Host: {self.host}")
print(f"โ
Open ports found: {len(self.open_ports)}")
print(f"{'='*50}\n")
if self.open_ports:
print("๐ Open Ports Summary:")
print(f"{'Port':<10} {'Service':<20} {'Banner':<30}")
print("-" * 60)
for port_info in sorted(self.open_ports, key=lambda x: x['port']):
print(f"{port_info['port']:<10} {port_info['service']:<20} {port_info['banner']:<30}")
else:
print("โ No open ports found in the specified range")
# ๐ฎ Let's use it!
scanner = NetworkScanner("scanme.nmap.org", port_range=(1, 100))
scanner.scan()
๐ฏ Try it yourself: Add UDP port scanning capability and OS fingerprinting features!
๐ Example 2: Subnet Discovery Tool
Letโs scan an entire subnet:
# ๐ Subnet discovery tool
import ipaddress
import concurrent.futures
import socket
import platform
import subprocess
class SubnetScanner:
def __init__(self, subnet):
self.subnet = ipaddress.ip_network(subnet, strict=False)
self.active_hosts = []
def ping_host(self, ip):
"""
# ๐ Ping a host to check if it's alive
"""
ip_str = str(ip)
# ๐ฅ๏ธ Platform-specific ping command
if platform.system().lower() == "windows":
command = ["ping", "-n", "1", "-w", "1000", ip_str]
else:
command = ["ping", "-c", "1", "-W", "1", ip_str]
try:
# ๐ Execute ping
result = subprocess.run(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
if result.returncode == 0:
return ip_str
except:
pass
return None
def tcp_scan_host(self, ip):
"""
# ๐ TCP scan to check if host is alive
"""
common_ports = [80, 443, 22, 21, 25, 3389, 445, 139]
ip_str = str(ip)
for port in common_ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
result = sock.connect_ex((ip_str, port))
sock.close()
if result == 0:
return ip_str
except:
continue
return None
def scan_subnet(self, method="tcp"):
"""
# ๐ Scan entire subnet for active hosts
"""
print(f"๐ฏ Scanning subnet: {self.subnet}")
print(f"๐ Total hosts to scan: {self.subnet.num_addresses - 2}")
print(f"โก Method: {method.upper()}\n")
# ๐จ Choose scanning method
scan_func = self.ping_host if method == "ping" else self.tcp_scan_host
# ๐ Concurrent scanning
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
# ๐ Submit all scans
future_to_ip = {
executor.submit(scan_func, ip): ip
for ip in self.subnet.hosts()
}
# ๐ Process results
for future in concurrent.futures.as_completed(future_to_ip):
result = future.result()
if result:
self.active_hosts.append(result)
print(f"โ
Found active host: {result}")
# ๐ Display summary
self.display_summary()
def display_summary(self):
"""
# ๐ Display scanning summary
"""
print(f"\n{'='*50}")
print(f"๐ Subnet Scan Complete!")
print(f"๐ Subnet: {self.subnet}")
print(f"โ
Active hosts: {len(self.active_hosts)}")
print(f"โ Inactive hosts: {self.subnet.num_addresses - 2 - len(self.active_hosts)}")
print(f"{'='*50}\n")
if self.active_hosts:
print("๐ Active Hosts:")
for host in sorted(self.active_hosts, key=ipaddress.ip_address):
print(f" ๐ฅ๏ธ {host}")
# ๐ฎ Let's discover a network!
scanner = SubnetScanner("192.168.1.0/24")
scanner.scan_subnet(method="tcp")
๐ Advanced Concepts
๐งโโ๏ธ SYN Stealth Scanning
When youโre ready to level up, try stealth scanning:
# ๐ฏ Advanced SYN stealth scanner (requires root/admin)
from scapy.all import *
import logging
# ๐คซ Suppress scapy warnings
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
class StealthScanner:
def __init__(self, target, ports):
self.target = target
self.ports = ports
self.open_ports = []
def syn_scan(self, port):
"""
# ๐ฅท Perform SYN stealth scan
"""
# ๐จ Craft SYN packet
syn_packet = IP(dst=self.target) / TCP(dport=port, flags="S")
# ๐ Send packet and wait for response
response = sr1(syn_packet, timeout=1, verbose=0)
if response and response.haslayer(TCP):
if response[TCP].flags == 18: # SYN-ACK
# โ
Port is open!
self.open_ports.append(port)
# ๐ Send RST to close connection
rst_packet = IP(dst=self.target) / TCP(dport=port, flags="R")
send(rst_packet, verbose=0)
return True
return False
def scan(self):
"""
# ๐ Execute stealth scan
"""
print(f"๐ฅท Starting SYN stealth scan on {self.target}")
for port in self.ports:
if self.syn_scan(port):
print(f"โจ Port {port}: OPEN (Stealth)")
print(f"\n๐ฏ Stealth scan complete!")
print(f"โ
Open ports: {self.open_ports}")
# โ ๏ธ Note: Requires root/admin privileges!
# scanner = StealthScanner("scanme.nmap.org", [80, 443, 22, 21])
# scanner.scan()
๐๏ธ Advanced Service Detection
For the brave network explorers:
# ๐ Advanced service fingerprinting
import struct
import ssl
class ServiceDetector:
def __init__(self, host, port):
self.host = host
self.port = port
def detect_http(self):
"""
# ๐ Detect HTTP/HTTPS service
"""
try:
# ๐ Try HTTPS first
context = ssl.create_default_context()
with socket.create_connection((self.host, self.port), timeout=3) as sock:
with context.wrap_socket(sock, server_hostname=self.host) as ssock:
ssock.send(b"HEAD / HTTP/1.1\r\nHost: " + self.host.encode() + b"\r\n\r\n")
response = ssock.recv(1024).decode('utf-8', errors='ignore')
if "HTTP" in response:
return f"HTTPS ๐ - {response.split('\r\n')[0]}"
except:
# ๐ Try plain HTTP
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((self.host, self.port))
sock.send(b"HEAD / HTTP/1.1\r\nHost: " + self.host.encode() + b"\r\n\r\n")
response = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
if "HTTP" in response:
return f"HTTP ๐ - {response.split('\r\n')[0]}"
except:
pass
return None
def detect_ssh(self):
"""
# ๐ Detect SSH service
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((self.host, self.port))
banner = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
if "SSH" in banner:
return f"SSH ๐ - {banner.strip()}"
except:
pass
return None
def detect_smtp(self):
"""
# ๐ง Detect SMTP service
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.connect((self.host, self.port))
banner = sock.recv(1024).decode('utf-8', errors='ignore')
if "220" in banner or "SMTP" in banner.upper():
sock.send(b"EHLO test\r\n")
response = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
return f"SMTP ๐ง - {banner.split('\r\n')[0]}"
except:
pass
return None
def detect_service(self):
"""
# ๐ฏ Auto-detect service type
"""
detectors = [
self.detect_http,
self.detect_ssh,
self.detect_smtp
]
for detector in detectors:
result = detector()
if result:
return result
return "Unknown Service โ"
# ๐ฎ Test service detection
detector = ServiceDetector("scanme.nmap.org", 22)
print(f"๐ Service detected: {detector.detect_service()}")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Too Aggressive Scanning
# โ Wrong way - too fast, might trigger IDS!
def bad_scanner(host):
for port in range(1, 65536):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.1) # ๐ฅ Too short!
sock.connect_ex((host, port)) # ๐ฅ No delay!
sock.close()
# โ
Correct way - controlled scanning!
def good_scanner(host, delay=0.1):
import time
import random
for port in range(1, 1025): # ๐ฏ Reasonable range
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # โ
Reasonable timeout
sock.connect_ex((host, port))
sock.close()
# โฑ๏ธ Add random delay
time.sleep(delay + random.uniform(0, 0.1))
๐คฏ Pitfall 2: Ignoring Legal/Ethical Concerns
# โ Dangerous - scanning without permission!
def unethical_scan(random_ip):
scan_ports(random_ip, 1, 65535) # ๐ฅ Illegal!
# โ
Safe - always get permission!
def ethical_scan(host):
"""
โ ๏ธ IMPORTANT: Only scan systems you own or have
written permission to test!
"""
print("๐ Security Notice:")
print("- Only scan your own systems")
print("- Get written permission for others")
print("- Respect rate limits")
print("- Follow responsible disclosure")
# โ
Use designated test hosts
safe_hosts = [
"scanme.nmap.org", # Public test host
"localhost", # Your machine
"192.168.1.1" # Your router (if you own it)
]
if host in safe_hosts or host.startswith("192.168."):
return True
else:
print("โ ๏ธ Please ensure you have permission!")
return False
๐ ๏ธ Best Practices
- ๐ฏ Always Get Permission: Only scan systems you own or have written authorization to test!
- ๐ Log Everything: Keep detailed logs of your scanning activities
- ๐ก๏ธ Use Rate Limiting: Donโt overwhelm target systems with requests
- ๐จ Be Stealthy When Needed: Use SYN scanning for minimal footprint
- โจ Verify Results: Double-check findings with different methods
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Network Mapper
Create a comprehensive network mapping tool:
๐ Requirements:
- โ Discover active hosts in a subnet
- ๐ท๏ธ Identify open ports on each host
- ๐ค Detect running services
- ๐ Generate a network map report
- ๐จ Support different scanning techniques!
๐ Bonus Points:
- Add OS fingerprinting
- Implement vulnerability detection
- Create a GUI interface
๐ก Solution
๐ Click to see solution
# ๐ฏ Comprehensive network mapper!
import socket
import threading
import ipaddress
from datetime import datetime
import json
class NetworkMapper:
def __init__(self, network):
self.network = ipaddress.ip_network(network, strict=False)
self.results = {}
self.lock = threading.Lock()
def scan_host(self, ip):
"""
# ๐ฅ๏ธ Scan individual host
"""
ip_str = str(ip)
host_info = {
'ip': ip_str,
'hostname': None,
'open_ports': [],
'services': {},
'scan_time': datetime.now().isoformat()
}
# ๐ท๏ธ Try to get hostname
try:
host_info['hostname'] = socket.gethostbyaddr(ip_str)[0]
except:
host_info['hostname'] = 'Unknown'
# ๐ Scan common ports
common_ports = [21, 22, 23, 25, 53, 80, 110, 443, 445, 3306, 3389, 8080]
for port in common_ports:
if self.check_port(ip_str, port):
host_info['open_ports'].append(port)
# ๐จ Identify service
service_info = self.identify_service_advanced(ip_str, port)
host_info['services'][port] = service_info
# ๐ Store results
if host_info['open_ports']:
with self.lock:
self.results[ip_str] = host_info
print(f"โ
Mapped: {ip_str} ({host_info['hostname']})")
print(f" ๐ Open ports: {host_info['open_ports']}")
def check_port(self, host, port):
"""
# ๐ Check if port is open
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def identify_service_advanced(self, host, port):
"""
# ๐ Advanced service identification
"""
service_info = {
'name': identify_service(port),
'banner': '',
'version': ''
}
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((host, port))
# ๐ Send probe based on port
if port == 80 or port == 8080:
sock.send(b"GET / HTTP/1.0\r\n\r\n")
elif port == 22:
pass # SSH sends banner automatically
else:
sock.send(b"\r\n")
# ๐ Receive response
banner = sock.recv(1024).decode('utf-8', errors='ignore')
sock.close()
service_info['banner'] = banner[:100].strip()
# ๐ฏ Extract version info
if "SSH" in banner:
service_info['version'] = banner.split()[0]
elif "HTTP" in banner:
for line in banner.split('\r\n'):
if 'Server:' in line:
service_info['version'] = line.split(':', 1)[1].strip()
break
except:
pass
return service_info
def map_network(self, max_threads=50):
"""
# ๐ Map entire network
"""
print(f"๐บ๏ธ Starting network mapping for {self.network}")
print(f"๐ Hosts to scan: {self.network.num_addresses - 2}\n")
start_time = datetime.now()
# ๐ Thread pool for concurrent scanning
threads = []
hosts = list(self.network.hosts())
for i in range(0, len(hosts), max_threads):
batch = hosts[i:i + max_threads]
batch_threads = []
for host in batch:
t = threading.Thread(target=self.scan_host, args=(host,))
t.start()
batch_threads.append(t)
# โณ Wait for batch to complete
for t in batch_threads:
t.join()
# ๐ Generate report
duration = datetime.now() - start_time
self.generate_report(duration)
def generate_report(self, duration):
"""
# ๐ Generate network map report
"""
print(f"\n{'='*60}")
print(f"๐บ๏ธ Network Mapping Complete!")
print(f"โฑ๏ธ Duration: {duration}")
print(f"๐ Network: {self.network}")
print(f"โ
Active hosts: {len(self.results)}")
print(f"{'='*60}\n")
# ๐ Summary by service
service_count = {}
for host_info in self.results.values():
for port, service in host_info['services'].items():
service_name = service['name']
service_count[service_name] = service_count.get(service_name, 0) + 1
print("๐ Services Summary:")
for service, count in sorted(service_count.items(), key=lambda x: x[1], reverse=True):
print(f" {service}: {count} host(s)")
# ๐พ Save detailed report
report_file = f"network_map_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"\n๐พ Detailed report saved to: {report_file}")
# ๐ฎ Map your network!
mapper = NetworkMapper("192.168.1.0/24")
mapper.map_network()
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Perform port scanning with confidence ๐ช
- โ Discover network services like a pro ๐ก๏ธ
- โ Apply security best practices in your scans ๐ฏ
- โ Debug network issues efficiently ๐
- โ Build network discovery tools with Python! ๐
Remember: With great power comes great responsibility! Always scan ethically and legally. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered port scanning and network discovery!
Hereโs what to do next:
- ๐ป Practice with the exercises above on authorized systems
- ๐๏ธ Build a network monitoring dashboard
- ๐ Move on to our next tutorial: UDP Sockets and Connectionless Communication
- ๐ Share your network discoveries responsibly!
Remember: Every network security expert started by learning the fundamentals. Keep exploring, keep learning, and most importantly, stay ethical! ๐
Happy network discovering! ๐๐โจ