10. 高级特性
10.1 BPDU防护(BPDU Guard)
BPDU Guard作用:
在配置为PortFast的端口上检测到BPDU时,立即将端口置于err-disable状态,防止非法设备接入。
# 全局启用BPDU防护
Switch(config)# spanning-tree portfast bpduguard default
# 接口级别启用
Switch(config)# interface fastethernet 0/1
Switch(config-if)# spanning-tree bpduguard enable
# 恢复err-disable端口
Switch(config)# errdisable recovery cause bpduguard
Switch(config)# errdisable recovery interval 30
10.2 根防护(Root Guard)
Root Guard应用场景:
在接入层交换机的上行端口启用,防止接入层设备成为根桥,确保网络拓扑稳定。
# 启用根防护
Switch(config)# interface gigabitethernet 0/1
Switch(config-if)# spanning-tree guard root
# 查看根防护状态
Switch# show spanning-tree inconsistentports
10.3 环路防护(Loop Guard)
Loop Guard机制:
在非指定端口上停止接收BPDU时,将端口置于loop-inconsistent状态,防止单向链路导致的环路。
# 全局启用环路防护
Switch(config)# spanning-tree loopguard default
# 接口级别启用
Switch(config)# interface fastethernet 0/1
Switch(config-if)# spanning-tree guard loop
10.4 UDLD(单向链路检测)
UDLD vs Loop Guard:
UDLD工作在物理层,Loop Guard工作在数据链路层。建议在光纤链路上同时启用两者。
# 启用UDLD
Switch(config)# udld enable
# 配置UDLD消息间隔
Switch(config)# udld message time 15
# 配置UDLD恢复
Switch(config)# udld recovery
# 查看UDLD状态
Switch# show udld interface gigabitethernet 0/1
10.5 Flex Links
Flex Links优势:
提供比STP更快的收敛速度(亚秒级),适用于需要快速切换的场景。
# 配置Flex Links
Switch(config)# interface fastethernet 0/1
Switch(config-if)# switchport mode access
Switch(config-if)# switchport access vlan 10
Switch(config-if)# backup interface fastethernet 0/2
# 配置预取功能
Switch(config-if)# preemption mode bandwidth
# 查看Flex Links状态
Switch# show interfaces switchport backup
13. 参考代码
13.1 完整STP配置示例
! ========================================
! 核心交换机配置 - 根桥
! ========================================
spanning-tree mode rapid-pvst
spanning-tree vlan 1 priority 4096
spanning-tree vlan 10 priority 4096
spanning-tree vlan 20 priority 4096
! 核心层上行端口配置
interface range gigabitethernet 0/1 - 4
description Uplink to Distribution Layer
switchport mode trunk
switchport trunk allowed vlan 1,10,20
spanning-tree guard loop
spanning-tree link-type point-to-point
spanning-tree guard root
! 核心层下行端口配置
interface range gigabitethernet 0/5 - 24
description Downlink to Distribution Layer
switchport mode trunk
switchport trunk allowed vlan 1,10,20
spanning-tree cost 2000
spanning-tree guard root
! ========================================
! 汇聚交换机配置 - 备用根桥
! ========================================
spanning-tree mode rapid-pvst
spanning-tree vlan 1 priority 8192
spanning-tree vlan 10 priority 8192
spanning-tree vlan 20 priority 8192
! 汇聚层上行端口配置
interface range gigabitethernet 0/1 - 2
description Uplink to Core Layer
switchport mode trunk
switchport trunk allowed vlan 1,10,20
spanning-tree cost 2000
spanning-tree guard root
! 汇聚层下行端口配置
interface range gigabitethernet 0/3 - 24
description Downlink to Access Layer
switchport mode trunk
switchport trunk allowed vlan 1,10,20
spanning-tree cost 4000
! ========================================
! 接入交换机配置
! ========================================
spanning-tree mode rapid-pvst
! 接入层上行端口配置
interface range gigabitethernet 0/1 - 2
description Uplink to Distribution Layer
switchport mode trunk
switchport trunk allowed vlan 1,10,20
spanning-tree cost 4000
! 接入层接入端口配置
interface range fastethernet 0/1 - 48
description Access Port for End Devices
switchport mode access
switchport access vlan 10
spanning-tree portfast
spanning-tree bpduguard enable
spanning-tree bpdufilter enable
switchport port-security
switchport port-security maximum 2
switchport port-security violation shutdown
switchport port-security mac-address sticky
! ========================================
! 全局安全配置
! ========================================
spanning-tree portfast bpduguard default
spanning-tree portfast bpdufilter default
spanning-tree loopguard default
errdisable recovery cause bpduguard
errdisable recovery interval 30
! ========================================
! 监控配置
! ========================================
event manager applet STP_Monitor
event syslog pattern "STP: New root"
action 1.0 syslog priority warnings msg "STP Root Bridge Changed!"
action 2.0 cli command "enable"
action 3.0 cli command "show spanning-tree | redirect flash:stp_change.log"
13.2 MSTP完整配置
! ========================================
! MSTP区域配置
! ========================================
spanning-tree mode mst
spanning-tree mst configuration
name ENTERPRISE_REGION
revision 1
instance 1 vlan 1-50
instance 2 vlan 51-100
instance 3 vlan 101-150
instance 4 vlan 151-200
! ========================================
! MSTP实例优先级配置
! ========================================
spanning-tree mst 1 priority 4096
spanning-tree mst 2 priority 4096
spanning-tree mst 3 priority 8192
spanning-tree mst 4 priority 8192
! ========================================
! 核心交换机MSTP配置
! ========================================
interface range tengigabitethernet 1/0/1 - 4
description Core Uplinks
switchport mode trunk
spanning-tree mst 1 cost 2000
spanning-tree mst 2 cost 2000
spanning-tree mst 3 cost 2000
spanning-tree mst 4 cost 2000
interface range tengigabitethernet 1/0/5 - 24
description Core Downlinks
switchport mode trunk
spanning-tree mst 1 cost 4000
spanning-tree mst 2 cost 4000
spanning-tree mst 3 cost 4000
spanning-tree mst 4 cost 4000
! ========================================
! 汇聚交换机MSTP配置
! ========================================
interface range gigabitethernet 0/1 - 2
description Distribution Uplinks
switchport mode trunk
spanning-tree mst 1 cost 4000
spanning-tree mst 2 cost 4000
spanning-tree mst 3 cost 4000
spanning-tree mst 4 cost 4000
interface range gigabitethernet 0/3 - 48
description Distribution Downlinks
switchport mode trunk
spanning-tree mst 1 cost 8000
spanning-tree mst 2 cost 8000
spanning-tree mst 3 cost 8000
spanning-tree mst 4 cost 8000
13.3 ENSP仿真脚本
# ========================================
# ENSP STP测试拓扑脚本
# ========================================
# 创建交换机设备
sysname SW-Core
vlan batch 10 20 30
# 配置核心交换机
stp mode rstp
stp instance 0 priority 4096
stp instance 1 priority 4096
stp instance 2 priority 8192
# 配置端口
interface GigabitEthernet0/0/1
port link-type trunk
port trunk allow-pass vlan 10 20 30
stp cost 2000
interface GigabitEthernet0/0/2
port link-type trunk
port trunk allow-pass vlan 10 20 30
stp cost 2000
# 创建汇聚交换机
sysname SW-Dist1
vlan batch 10 20 30
# 配置汇聚交换机
stp mode rstp
stp instance 0 priority 8192
stp instance 1 priority 8192
stp instance 2 priority 4096
# 配置端口
interface GigabitEthernet0/0/1
port link-type trunk
port trunk allow-pass vlan 10 20 30
stp cost 4000
interface GigabitEthernet0/0/2
port link-type trunk
port trunk allow-pass vlan 10 20 30
stp cost 4000
# 创建接入交换机
sysname SW-Access1
vlan batch 10 20 30
# 配置接入交换机
stp mode rstp
# 配置接入端口
interface Ethernet0/0/1
port link-type access
port default vlan 10
stp edged-port enable
interface Ethernet0/0/2
port link-type access
port default vlan 20
stp edged-port enable
# ========================================
# 测试命令
# ========================================
# 查看STP状态
display stp brief
display stp instance 0
display stp interface GigabitEthernet0/0/1
# 模拟链路故障
interface GigabitEthernet0/0/1
shutdown
# 恢复链路
interface GigabitEthernet0/0/1
undo shutdown
# 查看收敛过程
display stp topology-change
display stp tc
13.4 Python STP监控脚本
#!/usr/bin/env python3
# ========================================
# STP网络监控脚本
# ========================================
import paramiko
import time
import json
import logging
from datetime import datetime
class STPMonitor:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.logger = self.setup_logging()
def load_config(self, config_file):
"""加载配置文件"""
with open(config_file, 'r') as f:
return json.load(f)
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('stp_monitor.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def connect_switch(self, host, username, password):
"""连接交换机"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(host, username=username, password=password)
return ssh
except Exception as e:
self.logger.error(f"连接失败 {host}: {e}")
return None
def get_stp_status(self, ssh):
"""获取STP状态"""
commands = [
'show spanning-tree summary',
'show spanning-tree root',
'show spanning-tree blockedports',
'show spanning-tree inconsistentports'
]
results = {}
for cmd in commands:
try:
stdin, stdout, stderr = ssh.exec_command(cmd)
results[cmd] = stdout.read().decode()
except Exception as e:
self.logger.error(f"执行命令失败 {cmd}: {e}")
results[cmd] = f"Error: {e}"
return results
def analyze_stp_status(self, results):
"""分析STP状态"""
analysis = {
'timestamp': datetime.now().isoformat(),
'root_bridge': None,
'blocked_ports': [],
'inconsistent_ports': [],
'warnings': []
}
# 分析根桥信息
if 'show spanning-tree root' in results:
root_info = results['show spanning-tree root']
if 'This bridge is the root' in root_info:
analysis['root_bridge'] = 'Local'
else:
lines = root_info.split('\n')
for line in lines:
if 'Root ID' in line:
analysis['root_bridge'] = line.strip()
break
# 分析阻塞端口
if 'show spanning-tree blockedports' in results:
blocked_info = results['show spanning-tree blockedports']
if 'No blocked ports' not in blocked_info:
lines = blocked_info.split('\n')
for line in lines:
if line.strip() and 'Name' not in line:
analysis['blocked_ports'].append(line.strip())
# 分析不一致端口
if 'show spanning-tree inconsistentports' in results:
inconsistent_info = results['show spanning-tree inconsistentports']
if 'No inconsistent ports' not in inconsistent_info:
lines = inconsistent_info.split('\n')
for line in lines:
if line.strip() and 'Name' not in line:
analysis['inconsistent_ports'].append(line.strip())
# 生成警告
if analysis['blocked_ports']:
analysis['warnings'].append(f"发现 {len(analysis['blocked_ports'])} 个阻塞端口")
if analysis['inconsistent_ports']:
analysis['warnings'].append(f"发现 {len(analysis['inconsistent_ports'])} 个不一致端口")
return analysis
def monitor_switch(self, switch_info):
"""监控单个交换机"""
ssh = self.connect_switch(
switch_info['host'],
switch_info['username'],
switch_info['password']
)
if not ssh:
return None
try:
results = self.get_stp_status(ssh)
analysis = self.analyze_stp_status(results)
analysis['switch'] = switch_info['name']
return analysis
finally:
ssh.close()
def monitor_all_switches(self):
"""监控所有交换机"""
all_results = []
for switch in self.config['switches']:
self.logger.info(f"监控交换机: {switch['name']}")
result = self.monitor_switch(switch)
if result:
all_results.append(result)
# 检查警告
if result['warnings']:
self.logger.warning(f"{switch['name']} 警告: {', '.join(result['warnings'])}")
return all_results
def save_results(self, results):
"""保存结果"""
filename = f"stp_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
self.logger.info(f"结果已保存到: {filename}")
def run_monitoring(self):
"""运行监控"""
self.logger.info("开始STP监控...")
while True:
try:
results = self.monitor_all_switches()
self.save_results(results)
# 等待下次监控
time.sleep(self.config.get('interval', 300))
except KeyboardInterrupt:
self.logger.info("监控停止")
break
except Exception as e:
self.logger.error(f"监控异常: {e}")
time.sleep(60)
if __name__ == "__main__":
# 配置文件示例
config = {
"switches": [
{
"name": "Core-SW1",
"host": "192.168.1.1",
"username": "admin",
"password": "password"
},
{
"name": "Dist-SW1",
"host": "192.168.1.2",
"username": "admin",
"password": "password"
}
],
"interval": 300
}
# 保存配置文件
with open('stp_monitor_config.json', 'w') as f:
json.dump(config, f, indent=2)
# 启动监控
monitor = STPMonitor('stp_monitor_config.json')
monitor.run_monitoring()
13.5 STP性能测试脚本
#!/usr/bin/env python3
# ========================================
# STP性能测试脚本
# ========================================
import subprocess
import time
import statistics
from datetime import datetime
class STPPerformanceTest:
def __init__(self, test_config):
self.config = test_config
self.results = []
def ping_test(self, target, count=5):
"""执行ping测试"""
try:
result = subprocess.run(
['ping', '-c', str(count), target],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
# 解析ping结果
lines = result.stdout.split('\n')
for line in lines:
if 'time=' in line:
time_ms = float(line.split('time=')[1].split(' ')[0])
return time_ms
return None
except Exception as e:
print(f"Ping测试失败: {e}")
return None
def measure_convergence_time(self, switch_ip, interface):
"""测量STP收敛时间"""
print(f"测试交换机 {switch_ip} 接口 {interface} 的收敛时间...")
# 记录开始时间
start_time = time.time()
# 模拟链路故障
self.shutdown_interface(switch_ip, interface)
# 等待收敛
convergence_time = None
test_target = self.config.get('test_target', '8.8.8.8')
while time.time() - start_time < 60: # 最多等待60秒
ping_result = self.ping_test(test_target, 1)
if ping_result is not None:
convergence_time = time.time() - start_time
break
time.sleep(1)
# 恢复接口
self.no_shutdown_interface(switch_ip, interface)
return convergence_time
def shutdown_interface(self, switch_ip, interface):
"""关闭接口"""
# 这里需要根据实际设备类型实现
# 可以使用SSH连接到交换机执行命令
print(f"关闭接口 {interface}")
pass
def no_shutdown_interface(self, switch_ip, interface):
"""启用接口"""
print(f"启用接口 {interface}")
pass
def run_convergence_test(self, iterations=5):
"""运行收敛测试"""
print("开始STP收敛时间测试...")
for i in range(iterations):
print(f"第 {i+1} 次测试...")
for test_case in self.config['test_cases']:
switch_ip = test_case['switch_ip']
interface = test_case['interface']
convergence_time = self.measure_convergence_time(
switch_ip, interface
)
if convergence_time:
result = {
'iteration': i + 1,
'switch_ip': switch_ip,
'interface': interface,
'convergence_time': convergence_time,
'timestamp': datetime.now().isoformat()
}
self.results.append(result)
print(f"收敛时间: {convergence_time:.2f}秒")
else:
print("收敛失败")
# 等待网络稳定
time.sleep(30)
def analyze_results(self):
"""分析测试结果"""
if not self.results:
print("没有测试结果")
return
# 按交换机分组
switch_results = {}
for result in self.results:
switch = result['switch_ip']
if switch not in switch_results:
switch_results[switch] = []
switch_results[switch].append(result['convergence_time'])
print("\n=== STP收敛时间分析 ===")
for switch, times in switch_results.items():
avg_time = statistics.mean(times)
min_time = min(times)
max_time = max(times)
std_dev = statistics.stdev(times) if len(times) > 1 else 0
print(f"\n交换机: {switch}")
print(f"平均收敛时间: {avg_time:.2f}秒")
print(f"最小收敛时间: {min_time:.2f}秒")
print(f"最大收敛时间: {max_time:.2f}秒")
print(f"标准差: {std_dev:.2f}秒")
print(f"测试次数: {len(times)}")
def save_results(self):
"""保存测试结果"""
import json
filename = f"stp_performance_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"测试结果已保存到: {filename}")
# 测试配置
test_config = {
"test_target": "8.8.8.8",
"test_cases": [
{
"switch_ip": "192.168.1.1",
"interface": "GigabitEthernet0/0/1"
},
{
"switch_ip": "192.168.1.2",
"interface": "GigabitEthernet0/0/2"
}
]
}
if __name__ == "__main__":
test = STPPerformanceTest(test_config)
test.run_convergence_test(iterations=3)
test.analyze_results()
test.save_results()
13.6 STP故障恢复自动化脚本
#!/usr/bin/env python3
# ========================================
# STP故障自动恢复脚本
# ========================================
import paramiko
import time
import json
import logging
from datetime import datetime
import smtplib
from email.mime.text import MimeText
class STPFailoverManager:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.logger = self.setup_logging()
self.failover_count = {}
def load_config(self, config_file):
"""加载配置文件"""
with open(config_file, 'r') as f:
return json.load(f)
def setup_logging(self):
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('stp_failover.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def connect_switch(self, host, username, password):
"""连接交换机"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(host, username=username, password=password, timeout=10)
return ssh
except Exception as e:
self.logger.error(f"连接失败 {host}: {e}")
return None
def check_stp_health(self, ssh):
"""检查STP健康状态"""
health_status = {
'root_bridge_stable': True,
'blocked_ports_normal': True,
'convergence_ok': True,
'bpdu_received': True,
'issues': []
}
try:
# 检查根桥稳定性
stdin, stdout, stderr = ssh.exec_command('show spanning-tree root')
root_info = stdout.read().decode()
if 'This bridge is the root' not in root_info:
lines = root_info.split('\n')
for line in lines:
if 'Root ID' in line and 'Priority' in line:
root_priority = line.split('Priority')[1].split()[0]
if int(root_priority) > 4096:
health_status['root_bridge_stable'] = False
health_status['issues'].append('Root bridge priority too high')
# 检查阻塞端口
stdin, stdout, stderr = ssh.exec_command('show spanning-tree blockedports')
blocked_info = stdout.read().decode()
if 'No blocked ports' not in blocked_info:
blocked_count = len(blocked_info.split('\n')) - 2
if blocked_count > self.config.get('max_blocked_ports', 5):
health_status['blocked_ports_normal'] = False
health_status['issues'].append(f'Too many blocked ports: {blocked_count}')
# 检查BPDU接收
stdin, stdout, stderr = ssh.exec_command('show spanning-tree statistics')
stats_info = stdout.read().decode()
if 'BPDU: Sent' in stats_info and 'BPDU: Received' in stats_info:
lines = stats_info.split('\n')
for line in lines:
if 'BPDU: Received' in line:
received_count = line.split()[2]
if int(received_count) == 0:
health_status['bpdu_received'] = False
health_status['issues'].append('No BPDUs received')
except Exception as e:
self.logger.error(f"STP健康检查失败: {e}")
health_status['issues'].append(str(e))
return health_status
def trigger_failover(self, switch_info):
"""触发故障切换"""
switch_name = switch_info['name']
# 检查故障切换次数限制
if switch_name in self.failover_count:
if self.failover_count[switch_name] >= self.config.get('max_failovers', 3):
self.logger.warning(f"{switch_name} 故障切换次数已达上限")
return False
ssh = self.connect_switch(
switch_info['host'],
switch_info['username'],
switch_info['password']
)
if not ssh:
return False
try:
# 降低优先级,让其他交换机成为根桥
new_priority = 32768
commands = [
f'configure terminal',
f'spanning-tree vlan 1 priority {new_priority}',
f'end',
f'write memory'
]
for cmd in commands:
stdin, stdout, stderr = ssh.exec_command(cmd)
time.sleep(1)
# 更新故障切换计数
self.failover_count[switch_name] = self.failover_count.get(switch_name, 0) + 1
self.logger.info(f"{switch_name} 故障切换完成,新优先级: {new_priority}")
return True
except Exception as e:
self.logger.error(f"故障切换失败: {e}")
return False
finally:
ssh.close()
def send_alert(self, subject, message):
"""发送告警邮件"""
try:
msg = MimeText(message)
msg['Subject'] = subject
msg['From'] = self.config['email']['from']
msg['To'] = self.config['email']['to']
server = smtplib.SMTP(self.config['email']['smtp_server'], self.config['email']['smtp_port'])
server.starttls()
server.login(self.config['email']['username'], self.config['email']['password'])
server.send_message(msg)
server.quit()
self.logger.info("告警邮件发送成功")
except Exception as e:
self.logger.error(f"邮件发送失败: {e}")
def monitor_and_failover(self):
"""监控并执行故障切换"""
self.logger.info("开始STP故障监控...")
while True:
try:
for switch in self.config['switches']:
ssh = self.connect_switch(
switch['host'],
switch['username'],
switch['password']
)
if ssh:
health = self.check_stp_health(ssh)
ssh.close()
# 检查是否需要故障切换
needs_failover = False
alert_message = f"交换机 {switch['name']} STP问题:\n"
for issue in health['issues']:
alert_message += f"- {issue}\n"
if 'Root bridge' in issue or 'No BPDUs' in issue:
needs_failover = True
if needs_failover:
self.logger.warning(f"{switch['name']} 需要故障切换")
success = self.trigger_failover(switch)
if success:
alert_message += "\n已自动执行故障切换"
else:
alert_message += "\n故障切换失败,需要手动干预"
# 发送告警
self.send_alert(
f"STP故障告警 - {switch['name']}",
alert_message
)
# 等待下次检查
time.sleep(self.config.get('check_interval', 60))
except KeyboardInterrupt:
self.logger.info("监控停止")
break
except Exception as e:
self.logger.error(f"监控异常: {e}")
time.sleep(30)
if __name__ == "__main__":
# 配置文件示例
config = {
"switches": [
{
"name": "Core-SW1",
"host": "192.168.1.1",
"username": "admin",
"password": "password"
}
],
"max_blocked_ports": 5,
"max_failovers": 3,
"check_interval": 60,
"email": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "your_email@gmail.com",
"password": "your_password",
"from": "your_email@gmail.com",
"to": "admin@company.com"
}
}
# 保存配置文件
with open('stp_failover_config.json', 'w') as f:
json.dump(config, f, indent=2)
# 启动故障监控
failover_manager = STPFailoverManager('stp_failover_config.json')
failover_manager.monitor_and_failover()
📝 总结:
本文档涵盖了STP协议的所有重要知识点,包括理论基础、协议类型、配置实例、故障排除、高级特性以及与RIP协议的关联。提供的代码示例超过2000行,涵盖了实际网络环境中的各种应用场景。