Prerequisites
- Basic understanding of programming concepts ๐
- Python installation (3.8+) ๐
- VS Code or preferred IDE ๐ป
What you'll learn
- Understand the concept fundamentals ๐ฏ
- Apply the concept in real projects ๐๏ธ
- Debug common issues ๐
- Write clean, Pythonic code โจ
๐ฏ Introduction
Welcome to the exciting world of network automation! ๐ In this guide, weโll explore how Netmiko and NAPALM can revolutionize the way you manage network devices using Python.
Youโll discover how these powerful libraries can transform tedious manual network configurations into automated, reliable, and scalable solutions. Whether youโre managing routers ๐, switches ๐ฅ๏ธ, or firewalls ๐ก๏ธ, understanding network automation is essential for modern network engineering.
By the end of this tutorial, youโll feel confident automating network tasks with Python! Letโs dive in! ๐โโ๏ธ
๐ Understanding Network Automation
๐ค What are Netmiko and NAPALM?
Network automation is like having a smart assistant for your network devices ๐ค. Think of it as teaching your computer to speak the language of routers and switches!
Netmiko is like a universal translator ๐ that helps Python communicate with network devices through SSH. Itโs built on top of Paramiko and simplifies connecting to various vendor devices.
NAPALM (Network Automation and Programmability Abstraction Layer with Multivendor support) is like a Swiss Army knife ๐ง for network automation. It provides a unified API to interact with different network device operating systems.
In Python terms, these libraries help you:
- โจ Connect to network devices programmatically
- ๐ Execute commands and retrieve outputs
- ๐ก๏ธ Configure devices consistently across vendors
- ๐ Extract operational data in structured formats
๐ก Why Use Network Automation?
Hereโs why network engineers love automation:
- Consistency ๐: Apply configurations uniformly across devices
- Speed โก: Configure hundreds of devices in minutes
- Accuracy ๐ฏ: Eliminate human errors from manual typing
- Documentation ๐: Code serves as living documentation
- Scalability ๐: Manage growing networks efficiently
Real-world example: Imagine updating VLAN configurations across 100 switches ๐ข. With automation, you can complete this task in minutes instead of hours!
๐ง Basic Syntax and Usage
๐ Netmiko Basics
Letโs start with connecting to a device using Netmiko:
from netmiko import ConnectHandler
# ๐ Device connection details
cisco_device = {
'device_type': 'cisco_ios', # ๐ท๏ธ Specify device type
'host': '192.168.1.1', # ๐ Device IP address
'username': 'admin', # ๐ค Login username
'password': 'secure_pass', # ๐ Login password
'secret': 'enable_pass' # ๐ Enable password
}
# ๐ Connect to the device
connection = ConnectHandler(**cisco_device)
connection.enable() # ๐ Enter enable mode
# ๐ก Send a command
output = connection.send_command('show version')
print(f"๐ฅ๏ธ Device info:\n{output}")
# ๐ Always close the connection!
connection.disconnect()
๐ก Explanation: Notice how we specify the device type! Netmiko supports many vendors including Cisco, Juniper, Arista, and more.
๐ฏ NAPALM Basics
Hereโs how to use NAPALM for vendor-agnostic operations:
from napalm import get_network_driver
# ๐จ Get the appropriate driver
driver = get_network_driver('ios') # ๐ท๏ธ Cisco IOS driver
# ๐ง Create device object
device = driver(
hostname='192.168.1.1',
username='admin',
password='secure_pass',
optional_args={'secret': 'enable_pass'}
)
# ๐ Open connection
device.open()
# ๐ Get device facts
facts = device.get_facts()
print(f"๐ข Device Model: {facts['model']}")
print(f"๐ข Serial Number: {facts['serial_number']}")
print(f"๐พ OS Version: {facts['os_version']}")
# ๐ฏ Get interfaces
interfaces = device.get_interfaces()
for intf, details in interfaces.items():
print(f"๐ {intf}: {'UP' if details['is_up'] else 'DOWN'} ๐ข")
# ๐ Close connection
device.close()
๐ก Practical Examples
๐ข Example 1: Bulk Configuration Updater
Letโs build a tool to update configurations across multiple devices:
from netmiko import ConnectHandler
from concurrent.futures import ThreadPoolExecutor
import logging
# ๐ Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NetworkConfigurator:
def __init__(self, devices):
self.devices = devices # ๐ List of device dictionaries
def configure_device(self, device):
"""๐ง Configure a single device"""
try:
# ๐ Connect to device
logger.info(f"๐ Connecting to {device['host']}")
connection = ConnectHandler(**device)
connection.enable()
# ๐ Configuration commands
config_commands = [
'ntp server 10.1.1.1', # โฐ NTP server
'logging host 10.1.1.100', # ๐ Syslog server
'banner motd # Authorized Use Only! ๐ #', # ๐ช Login banner
'service timestamps debug datetime msec', # โฑ๏ธ Timestamps
]
# ๐ซ Send configuration
output = connection.send_config_set(config_commands)
logger.info(f"โ
Configured {device['host']} successfully!")
# ๐พ Save configuration
connection.save_config()
logger.info(f"๐พ Configuration saved on {device['host']}")
connection.disconnect()
return {'device': device['host'], 'status': 'success', 'output': output}
except Exception as e:
logger.error(f"โ Failed to configure {device['host']}: {str(e)}")
return {'device': device['host'], 'status': 'failed', 'error': str(e)}
def configure_all(self, max_threads=5):
"""๐ Configure all devices in parallel"""
results = []
# ๐โโ๏ธ Use ThreadPoolExecutor for parallel execution
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = [executor.submit(self.configure_device, device)
for device in self.devices]
# ๐ Collect results
for future in futures:
results.append(future.result())
# ๐ Summary
success_count = sum(1 for r in results if r['status'] == 'success')
print(f"\n๐ Configuration Summary:")
print(f"โ
Successful: {success_count}/{len(self.devices)}")
print(f"โ Failed: {len(self.devices) - success_count}/{len(self.devices)}")
return results
# ๐ฎ Let's use it!
devices = [
{
'device_type': 'cisco_ios',
'host': '192.168.1.1',
'username': 'admin',
'password': 'pass123',
'secret': 'enable123'
},
{
'device_type': 'cisco_ios',
'host': '192.168.1.2',
'username': 'admin',
'password': 'pass123',
'secret': 'enable123'
}
]
configurator = NetworkConfigurator(devices)
results = configurator.configure_all()
๐ฏ Try it yourself: Add error handling for specific configuration failures and implement rollback functionality!
๐ฎ Example 2: Network Health Monitor
Letโs create a comprehensive network health monitoring system:
from napalm import get_network_driver
import json
from datetime import datetime
import pandas as pd
class NetworkHealthMonitor:
def __init__(self):
self.health_data = [] # ๐ Store health metrics
def check_device_health(self, device_info):
"""๐ฅ Check health of a single device"""
driver = get_network_driver(device_info['driver'])
device = driver(**device_info['connection_params'])
try:
device.open()
health_report = {
'timestamp': datetime.now().isoformat(), # โฐ Current time
'hostname': device_info['connection_params']['hostname'],
'checks': {}
}
# ๐ Check 1: Device facts
facts = device.get_facts()
health_report['device_info'] = {
'model': facts['model'],
'uptime': facts['uptime'],
'vendor': facts['vendor']
}
# ๐พ Check 2: Memory usage
environment = device.get_environment()
if 'memory' in environment:
memory = environment['memory']
used_percent = (memory['used_ram'] / memory['available_ram']) * 100
health_report['checks']['memory'] = {
'status': 'โ
' if used_percent < 80 else 'โ ๏ธ',
'used_percent': round(used_percent, 2),
'message': f"Memory usage: {used_percent:.1f}%"
}
# ๐ก๏ธ Check 3: CPU temperature
if 'cpu' in environment:
cpu_temps = [cpu['temperature'] for cpu in environment['cpu'].values()]
avg_temp = sum(cpu_temps) / len(cpu_temps)
health_report['checks']['temperature'] = {
'status': 'โ
' if avg_temp < 70 else '๐ฅ',
'avg_celsius': round(avg_temp, 1),
'message': f"CPU temp: {avg_temp:.1f}ยฐC"
}
# ๐ Check 4: Interface errors
interfaces = device.get_interfaces_counters()
error_interfaces = []
for intf, counters in interfaces.items():
if counters['rx_errors'] > 100 or counters['tx_errors'] > 100:
error_interfaces.append(intf)
health_report['checks']['interfaces'] = {
'status': 'โ
' if not error_interfaces else 'โ ๏ธ',
'error_count': len(error_interfaces),
'message': f"Interfaces with errors: {len(error_interfaces)}"
}
# ๐ฏ Overall health score
health_score = self._calculate_health_score(health_report['checks'])
health_report['health_score'] = health_score
health_report['health_emoji'] = self._get_health_emoji(health_score)
device.close()
return health_report
except Exception as e:
logger.error(f"โ Health check failed: {str(e)}")
return {
'hostname': device_info['connection_params']['hostname'],
'status': 'failed',
'error': str(e)
}
def _calculate_health_score(self, checks):
"""๐ Calculate overall health score (0-100)"""
scores = {
'โ
': 100,
'โ ๏ธ': 70,
'โ': 30,
'๐ฅ': 50
}
if not checks:
return 0
total_score = sum(scores.get(check['status'], 0) for check in checks.values())
return total_score // len(checks)
def _get_health_emoji(self, score):
"""๐จ Get emoji based on health score"""
if score >= 90:
return "๐" # Excellent
elif score >= 70:
return "๐" # Good
elif score >= 50:
return "๐งก" # Warning
else:
return "โค๏ธ" # Critical
def generate_report(self, devices):
"""๐ Generate health report for all devices"""
print("๐ฅ Network Health Check Report")
print("=" * 50)
for device in devices:
report = self.check_device_health(device)
if 'error' not in report:
print(f"\n๐ข Device: {report['hostname']}")
print(f" Health: {report['health_emoji']} {report['health_score']}%")
for check_name, check_data in report['checks'].items():
print(f" {check_data['status']} {check_name}: {check_data['message']}")
else:
print(f"\nโ Device: {report['hostname']} - Check Failed!")
# ๐ฎ Usage example
devices = [
{
'driver': 'ios',
'connection_params': {
'hostname': '192.168.1.1',
'username': 'admin',
'password': 'pass123',
'optional_args': {'secret': 'enable123'}
}
}
]
monitor = NetworkHealthMonitor()
monitor.generate_report(devices)
๐ Advanced Concepts
๐งโโ๏ธ Configuration Templating with Jinja2
When youโre ready to level up, combine network automation with templating:
from jinja2 import Template
from netmiko import ConnectHandler
# ๐จ Create a configuration template
vlan_template = Template("""
{% for vlan in vlans %}
vlan {{ vlan.id }}
name {{ vlan.name }}
{% if vlan.description %}
description {{ vlan.description }} ๐ท๏ธ
{% endif %}
!
interface vlan {{ vlan.id }}
description {{ vlan.name }} SVI ๐
ip address {{ vlan.ip }} {{ vlan.mask }}
no shutdown
!
{% endfor %}
""")
# ๐ VLAN data
vlan_data = {
'vlans': [
{'id': 10, 'name': 'SALES', 'description': 'Sales Department',
'ip': '10.1.10.1', 'mask': '255.255.255.0'},
{'id': 20, 'name': 'IT', 'description': 'IT Department',
'ip': '10.1.20.1', 'mask': '255.255.255.0'},
{'id': 30, 'name': 'GUEST', 'description': 'Guest Network',
'ip': '10.1.30.1', 'mask': '255.255.255.0'}
]
}
# ๐ง Generate configuration
config = vlan_template.render(vlan_data)
print("๐ Generated Configuration:")
print(config)
# ๐ Apply to device
def apply_templated_config(device_params, config_text):
connection = ConnectHandler(**device_params)
connection.enable()
# ๐ซ Send configuration
output = connection.send_config_set(config_text.split('\n'))
connection.save_config()
connection.disconnect()
return output
๐๏ธ Event-Driven Automation
For the brave automators, implement reactive network automation:
import asyncio
from napalm import get_network_driver
import time
class NetworkEventHandler:
def __init__(self):
self.thresholds = {
'cpu_usage': 80, # ๐ฅ CPU threshold
'memory_usage': 85, # ๐พ Memory threshold
'interface_errors': 100 # โ ๏ธ Error threshold
}
async def monitor_device(self, device_info):
"""๐ Continuously monitor device and trigger actions"""
driver = get_network_driver(device_info['driver'])
while True:
try:
device = driver(**device_info['connection_params'])
device.open()
# ๐ Get environment data
env = device.get_environment()
# ๐ฅ Check CPU
if 'cpu' in env:
cpu_usage = list(env['cpu'].values())[0]['%usage']
if cpu_usage > self.thresholds['cpu_usage']:
await self.handle_high_cpu(device_info, cpu_usage)
# ๐พ Check Memory
if 'memory' in env:
memory = env['memory']
memory_usage = (memory['used_ram'] / memory['available_ram']) * 100
if memory_usage > self.thresholds['memory_usage']:
await self.handle_high_memory(device_info, memory_usage)
device.close()
except Exception as e:
print(f"โ Monitoring error: {str(e)}")
# โฑ๏ธ Wait before next check
await asyncio.sleep(60) # Check every minute
async def handle_high_cpu(self, device_info, cpu_usage):
"""๐ฅ Handle high CPU usage event"""
print(f"๐จ HIGH CPU ALERT on {device_info['connection_params']['hostname']}!")
print(f" CPU Usage: {cpu_usage}%")
print(f" ๐ง Triggering automated response...")
# Implement automated response (e.g., clear ARP cache, restart process)
# This is where you'd add your remediation logic
async def handle_high_memory(self, device_info, memory_usage):
"""๐พ Handle high memory usage event"""
print(f"๐จ HIGH MEMORY ALERT on {device_info['connection_params']['hostname']}!")
print(f" Memory Usage: {memory_usage:.1f}%")
print(f" ๐ง Triggering automated response...")
โ ๏ธ Common Pitfalls and Solutions
๐ฑ Pitfall 1: Not Handling Connection Timeouts
# โ Wrong way - no timeout handling
connection = ConnectHandler(**device_params)
output = connection.send_command('show running-config') # ๐ฅ Might hang forever!
# โ
Correct way - set timeouts
connection = ConnectHandler(
**device_params,
timeout=30, # ๐ Connection timeout
session_timeout=60 # โฑ๏ธ Session timeout
)
# Also use command-specific timeouts
output = connection.send_command(
'show running-config',
read_timeout=120 # ๐ Long commands need more time!
)
๐คฏ Pitfall 2: Not Saving Configurations
# โ Dangerous - changes lost on reboot!
connection.send_config_set(['interface gi0/1', 'description Important Link'])
connection.disconnect() # ๐ฅ Config not saved!
# โ
Safe - always save your work!
connection.send_config_set(['interface gi0/1', 'description Important Link'])
connection.save_config() # ๐พ Save to startup-config
print("โ
Configuration saved successfully!")
connection.disconnect()
๐ Pitfall 3: Hardcoding Credentials
# โ Security nightmare - never do this!
device = {
'username': 'admin',
'password': 'MyPassword123!' # ๐จ Exposed credential!
}
# โ
Secure way - use environment variables or vault
import os
from getpass import getpass
device = {
'username': os.environ.get('NETWORK_USER'),
'password': os.environ.get('NETWORK_PASS') or getpass('Password: ')
}
# ๐ Even better - use a secrets management system!
๐ ๏ธ Best Practices
- ๐ฏ Use Context Managers: Always ensure connections are closed
- ๐ Log Everything: Track all changes and operations
- ๐ก๏ธ Implement Rollback: Have a way to undo changes
- ๐ Test in Lab First: Never test automation in production
- โจ Use Version Control: Track your automation scripts
- ๐ Parallelize Carefully: Donโt overwhelm devices with connections
- ๐ Monitor Impact: Track CPU/memory during automation
๐งช Hands-On Exercise
๐ฏ Challenge: Build a Network Compliance Checker
Create a tool that checks network devices for compliance:
๐ Requirements:
- โ Check for required NTP servers
- ๐ Verify SSH is enabled and Telnet is disabled
- ๐ Ensure logging is configured
- ๐ก๏ธ Check for banner messages
- ๐ Generate compliance report
๐ Bonus Points:
- Add automatic remediation for non-compliant items
- Create a web dashboard for compliance status
- Implement configuration backup before changes
๐ก Solution
๐ Click to see solution
from netmiko import ConnectHandler
import re
from datetime import datetime
import json
class ComplianceChecker:
def __init__(self):
self.compliance_rules = {
'ntp_servers': ['10.1.1.1', '10.1.1.2'], # ๐ Required NTP
'required_banner': 'Authorized', # ๐ช Banner keyword
'syslog_server': '10.1.1.100', # ๐ Logging host
'ssh_enabled': True, # ๐ SSH required
'telnet_disabled': True # ๐ซ No telnet
}
def check_device_compliance(self, device_params):
"""๐ Check device against compliance rules"""
results = {
'device': device_params['host'],
'timestamp': datetime.now().isoformat(),
'compliant': True,
'checks': {}
}
try:
connection = ConnectHandler(**device_params)
connection.enable()
# ๐ Check NTP servers
ntp_output = connection.send_command('show run | include ntp server')
configured_ntp = re.findall(r'ntp server (\S+)', ntp_output)
ntp_compliant = all(ntp in configured_ntp for ntp in self.compliance_rules['ntp_servers'])
results['checks']['ntp'] = {
'compliant': ntp_compliant,
'status': 'โ
' if ntp_compliant else 'โ',
'message': f"NTP servers: {', '.join(configured_ntp)}"
}
# ๐ช Check banner
banner_output = connection.send_command('show run | include banner')
banner_compliant = self.compliance_rules['required_banner'] in banner_output
results['checks']['banner'] = {
'compliant': banner_compliant,
'status': 'โ
' if banner_compliant else 'โ',
'message': 'Login banner configured' if banner_compliant else 'Banner missing!'
}
# ๐ Check syslog
syslog_output = connection.send_command('show run | include logging host')
syslog_compliant = self.compliance_rules['syslog_server'] in syslog_output
results['checks']['syslog'] = {
'compliant': syslog_compliant,
'status': 'โ
' if syslog_compliant else 'โ',
'message': f"Syslog server: {'configured' if syslog_compliant else 'not configured'}"
}
# ๐ Check SSH/Telnet
ssh_output = connection.send_command('show ip ssh')
ssh_enabled = 'SSH Enabled' in ssh_output
vty_output = connection.send_command('show run | section line vty')
telnet_disabled = 'transport input ssh' in vty_output or 'transport input none' in vty_output
results['checks']['remote_access'] = {
'compliant': ssh_enabled and telnet_disabled,
'status': 'โ
' if (ssh_enabled and telnet_disabled) else 'โ',
'message': f"SSH: {'enabled' if ssh_enabled else 'disabled'}, Telnet: {'disabled' if telnet_disabled else 'enabled'}"
}
# ๐ฏ Overall compliance
results['compliant'] = all(check['compliant'] for check in results['checks'].values())
results['compliance_score'] = sum(1 for check in results['checks'].values() if check['compliant']) / len(results['checks']) * 100
# ๐ง Auto-remediation if requested
if not results['compliant']:
results['remediation_available'] = True
results['remediation_commands'] = self._generate_remediation(results['checks'])
connection.disconnect()
except Exception as e:
results['error'] = str(e)
results['compliant'] = False
return results
def _generate_remediation(self, checks):
"""๐ง Generate commands to fix non-compliant items"""
commands = []
if not checks.get('ntp', {}).get('compliant'):
for ntp in self.compliance_rules['ntp_servers']:
commands.append(f'ntp server {ntp}')
if not checks.get('banner', {}).get('compliant'):
commands.append(f'banner motd # {self.compliance_rules["required_banner"]} Use Only! #')
if not checks.get('syslog', {}).get('compliant'):
commands.append(f'logging host {self.compliance_rules["syslog_server"]}')
if not checks.get('remote_access', {}).get('compliant'):
commands.extend([
'ip ssh version 2',
'line vty 0 15',
'transport input ssh'
])
return commands
def generate_report(self, results):
"""๐ Generate compliance report"""
print("\n๐ NETWORK COMPLIANCE REPORT")
print("=" * 60)
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Devices Checked: {len(results)}")
compliant_count = sum(1 for r in results if r['compliant'])
print(f"\n๐ Summary:")
print(f" โ
Compliant: {compliant_count}/{len(results)}")
print(f" โ Non-Compliant: {len(results) - compliant_count}/{len(results)}")
print("\n๐ Detailed Results:")
for result in results:
print(f"\n๐ข Device: {result['device']}")
if 'error' in result:
print(f" โ Error: {result['error']}")
else:
score_emoji = '๐' if result['compliance_score'] == 100 else '๐' if result['compliance_score'] >= 75 else 'โค๏ธ'
print(f" Score: {score_emoji} {result['compliance_score']:.0f}%")
for check_name, check_data in result['checks'].items():
print(f" {check_data['status']} {check_name}: {check_data['message']}")
# ๐ฎ Test it out!
devices = [
{
'device_type': 'cisco_ios',
'host': '192.168.1.1',
'username': 'admin',
'password': 'pass123',
'secret': 'enable123'
}
]
checker = ComplianceChecker()
results = [checker.check_device_compliance(device) for device in devices]
checker.generate_report(results)
๐ Key Takeaways
Youโve learned so much! Hereโs what you can now do:
- โ Connect to network devices programmatically with Netmiko ๐ช
- โ Use NAPALM for vendor-agnostic operations ๐ก๏ธ
- โ Automate configurations across multiple devices ๐ฏ
- โ Build monitoring tools for network health ๐
- โ Implement compliance checking and remediation ๐
Remember: Network automation is about making your life easier while improving reliability! Start small, test thoroughly, and gradually expand your automation toolkit. ๐ค
๐ค Next Steps
Congratulations! ๐ Youโve mastered network automation basics!
Hereโs what to do next:
- ๐ป Practice with the exercises above on lab devices
- ๐๏ธ Build an automation project for your network
- ๐ Explore advanced topics like NETCONF/RESTCONF
- ๐ Share your automation scripts with the community!
Remember: Every network automation expert started with their first script. Keep automating, keep learning, and most importantly, have fun transforming your network operations! ๐
Happy automating! ๐๐โจ