Simple Network Management Protocol - 理论基础与实践应用
基于UDP协议,开销小,响应快速
支持多种操作类型和自定义MIB
提供丰富的设备状态信息
支持几乎所有网络设备厂商
| 版本 | 发布时间 | 主要特性 | RFC文档 |
|---|---|---|---|
| SNMPv1 | 1987年 | 基础功能,简单认证 | RFC 1157 |
| SNMPv2c | 1993年 | 改进性能,批量操作 | RFC 1901-1908 |
| SNMPv3 | 1998年 | 强化安全,加密认证 | RFC 3410-3418 |
Manager ←→ Agent 通信流程: 1. Manager 发送 SNMP 请求 (Get/Set/GetNext) 2. Agent 接收并处理请求 3. Agent 从 MIB 获取/设置数据 4. Agent 返回响应给 Manager 5. Agent 可主动发送 Trap/Inform
| 级别 | 认证 | 加密 | 描述 |
|---|---|---|---|
| noAuthNoPriv | ❌ | ❌ | 无认证无加密 |
| authNoPriv | ✅ | ❌ | 认证不加密 |
| authPriv | ✅ | ✅ | 认证且加密 |
获取指定OID的值
snmpget
获取下一个OID的值
snmpgetnext
批量获取多个值
snmpbulkget
设置指定OID的值
snmpset
1. Manager构建GET请求PDU 2. 包含要查询的OID列表 3. 发送UDP数据包到Agent (端口161) 4. Agent解析请求,查询MIB 5. 构建响应PDU,包含请求的值 6. 返回响应给Manager
1. Manager构建SET请求PDU 2. 包含OID和要设置的值 3. 发送请求到Agent 4. Agent验证写入权限和值的有效性 5. 更新MIB中的对应对象 6. 返回成功/失败响应
| 错误状态 | 值 | 含义 |
|---|---|---|
| noError | 0 | 操作成功 |
| tooBig | 1 | 响应过大 |
| noSuchName | 2 | OID不存在 |
| badValue | 3 | 值无效 |
| readOnly | 4 | 只读对象 |
| genErr | 5 | 一般错误 |
iso(1) → org(3) → dod(6) → internet(1) → private(4) → enterprises(1)
↓
mgmt(2) → mib-2(1) → system(1) → sysDescr(1)
| OID前缀 | 含义 | 用途 |
|---|---|---|
| .1.3.6.1.2.1 | mgmt.mib-2 | 标准MIB-II对象 |
| .1.3.6.1.4.1 | private.enterprises | 厂商私有MIB |
| .1.3.6.1.6.3 | snmpMPDStats | SNMP协议统计 |
system组包含设备基本信息: - sysDescr (1): 系统描述 - sysObjectID (2): 设备OID - sysUpTime (3): 系统运行时间 - sysContact (4): 管理员联系信息 - sysName (5): 系统名称 - sysLocation (6): 设备位置 - sysServices (7): 提供的服务
interfaces组包含网络接口信息: - ifNumber (1): 接口数量 - ifTable (2): 接口信息表 - ifIndex: 接口索引 - ifDescr: 接口描述 - ifType: 接口类型 - ifMtu: MTU值 - ifSpeed: 接口速度 - ifAdminStatus: 管理状态 - ifOperStatus: 操作状态
ip组包含IP层信息: - ipForwarding (1): IP转发状态 - ipDefaultTTL (2): 默认TTL - ipInReceives (3): 接收数据包数 - ipOutRequests (4): 发送请求数 - ipRoutingTable (21): 路由表
| 厂商 | Enterprise ID | OID前缀 |
|---|---|---|
| Cisco | 9 | .1.3.6.1.4.1.9 |
| HP | 11 | .1.3.6.1.4.1.11 |
| Juniper | 2636 | .1.3.6.1.4.1.2636 |
| Microsoft | 311 | .1.3.6.1.4.1.311 |
-- MIB定义示例
MY-MIB DEFINITIONS ::= BEGIN
IMPORTS
MODULE-IDENTITY, OBJECT-TYPE, Integer32 FROM SNMPv2-SMI
DisplayString FROM SNMPv2-TC;
myMib MODULE-IDENTITY
LAST-UPDATED "202301010000Z"
ORGANIZATION "Example Corp"
CONTACT-INFO "support@example.com"
DESCRIPTION "Example MIB for demonstration"
::= { enterprises 99999 }
myDevice OBJECT IDENTIFIER ::= { myMib 1 }
sysStatus OBJECT-TYPE
SYNTAX INTEGER { up(1), down(2) }
MAX-ACCESS read-only
STATUS current
DESCRIPTION "Device status"
::= { myDevice 1 }
END
USM安全要素: 1. 用户名 (Username) 2. 认证协议 (Authentication Protocol) 3. 认证密码 (Authentication Key) 4. 加密协议 (Privacy Protocol) 5. 加密密码 (Privacy Key)
| 协议 | 强度 | 特点 |
|---|---|---|
| HMAC-MD5 | 中等 | 兼容性好,性能较好 |
| HMAC-SHA | 高 | 安全性更高,推荐使用 |
| 算法 | 密钥长度 | 安全性 |
|---|---|---|
| DES | 56位 | 基础加密,不推荐新部署 |
| AES | 128/256位 | 强加密,推荐使用 |
VACM组件: 1. Group (组): 用户集合 2. Security Model (安全模型): SNMPv1/v2c/v3 3. Security Level (安全级别): noAuthNoPriv, authNoPriv, authPriv 4. Context (上下文): MIB视图 5. View (视图): 可访问的OID集合 6. Access Policy (访问策略): 读写权限控制
# SNMPv3用户配置 createUser admin MD5 "authpass" AES "privpass" rouser admin authPriv # 访问控制配置 access admin "" any authPriv exact all all none none
# 基础SNMPv2c配置 [Huawei] snmp-agent [Huawei] snmp-agent sys-info version v2c [Huawei] snmp-agent community read cipher public [Huawei] snmp-agent community write cipher private [Huawei] snmp-agent sys-info contact admin@example.com [Huawei] snmp-agent sys-info location DataCenter-Rack01 [Huawei] snmp-agent target-host trap-hostname NMS address 192.168.1.100 params securityname public v2c
# SNMPv3用户配置 [Huawei] snmp-agent [Huawei] snmp-agent sys-info version v3 [Huawei] snmp-agent mib-view included iso-view 1.3.6.1 [Huawei] snmp-agent group v3 admin privacy read-view iso-view write-view iso-view [Huawei] snmp-agent usm-user v3 admin admin authentication-mode md5 cipher AuthPass123 privacy-mode aes cipher PrivPass123 [Huawei] snmp-agent target-host trap-hostname NMS address 192.168.1.100 params securityname admin v3 privacy
# Trap通知配置 [Huawei] snmp-agent trap enable [Huawei] snmp-agent trap enable standard [Huawei] snmp-agent trap source GigabitEthernet0/0/0 [Huawei] snmp-agent trap queue-size 1000 [Huawei] snmp-agent trap life 12
# SNMPv2c配置 Switch(config)# snmp-server community public RO Switch(config)# snmp-server community private RW Switch(config)# snmp-server location "DataCenter-Rack01" Switch(config)# snmp-server contact "admin@example.com" Switch(config)# snmp-server host 192.168.1.100 public Switch(config)# snmp-server enable traps # SNMPv3配置 Switch(config)# snmp-server group admin v3 priv read admin write admin Switch(config)# snmp-server user admin admin v3 auth md5 AuthPass123 priv aes 128 PrivPass123 Switch(config)# snmp-server host 192.168.1.100 version 3 priv admin
# Ubuntu/Debian sudo apt-get install snmpd snmp # CentOS/RHEL sudo yum install net-snmp net-snmp-utils # 启动服务 sudo systemctl enable snmpd sudo systemctl start snmpd
# /etc/snmp/snmpd.conf # 基础配置 sysLocation "DataCenter-Rack01" sysContact "admin@example.com" sysName "Linux-Server-01" # SNMPv2c配置 rocommunity public rwcommunity private # SNMPv3配置 createUser admin MD5 "AuthPass123" AES "PrivPass123" rouser admin authPriv # 访问控制 com2sec readonly default public com2sec readwrite default private group MyROGroup v2c readonly group MyRWGroup v2c readwrite view all included .1 80 access MyROGroup "" any noauth exact all none none access MyRWGroup "" any noauth exact all all none
# PowerShell命令 # 安装SNMP功能 Add-WindowsFeature -Name SNMP-Service # 配置SNMP服务 Set-ItemProperty -Path "HKLM:\SYSTEM\CurrentControlSet\Services\SNMP\Parameters\TrapConfiguration" -Name "public" -Value "192.168.1.100" Set-ItemProperty -Path "HKLM:\SYSTEM\CurrentControlSet\Services\SNMP\Parameters\ValidCommunities" -Name "public" -Value 16 -Type DWORD Set-ItemProperty -Path "HKLM:\SYSTEM\CurrentControlSet\Services\SNMP\Parameters\ValidCommunities" -Name "private" -Value 4 -Type DWORD
# 使用snmpwalk测试 snmpwalk -v2c -c public 192.168.1.1 .1.3.6.1.2.1.1.1.0 snmpwalk -v3 -u admin -l authPriv -a MD5 -A AuthPass123 -x AES -X PrivPass123 192.168.1.1 # 使用snmpget测试 snmpget -v2c -c public 192.168.1.1 .1.3.6.1.2.1.1.5.0 snmpget -v3 -u admin -l authPriv -a MD5 -A AuthPass123 -x AES -X PrivPass123 192.168.1.1 .1.3.6.1.2.1.1.3.0
# 启动Trap接收器 snmptrapd -f -C -c /etc/snmp/snmptrapd.conf # 发送测试Trap snmptrap -v2c -c public 192.168.1.100 "" .1.3.6.1.6.3.1.1.5.1
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pysnmp.hlapi import *
class SNMPManager:
def __init__(self, host, community='public', version=2c):
self.host = host
self.community = community
self.version = version
def get(self, oid):
"""获取单个OID值"""
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication:
print(f"SNMP错误: {error_indication}")
elif error_status:
print(f"SNMP错误: {error_status.prettyPrint()}")
else:
for var_bind in var_binds:
print(f"{var_bind[0]} = {var_bind[1]}")
def walk(self, oid):
"""遍历OID树"""
for (error_indication, error_status, error_index, var_binds) in nextCmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False):
if error_indication:
print(f"SNMP错误: {error_indication}")
break
elif error_status:
print(f"SNMP错误: {error_status.prettyPrint()}")
break
else:
for var_bind in var_binds:
print(f"{var_bind[0]} = {var_bind[1]}")
# 使用示例
if __name__ == "__main__":
manager = SNMPManager('192.168.1.1')
# 获取系统描述
manager.get('1.3.6.1.2.1.1.1.0')
# 遍历接口信息
manager.walk('1.3.6.1.2.1.2.2')
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pysnmp.hlapi import *
class SNMPv3Manager:
def __init__(self, host, username, auth_key, priv_key,
auth_protocol=usmHMACMD5AuthProtocol,
priv_protocol=usmAesCfb128Protocol):
self.host = host
self.username = username
self.auth_key = auth_key
self.priv_key = priv_key
self.auth_protocol = auth_protocol
self.priv_protocol = priv_protocol
def get(self, oid):
"""SNMPv3 GET操作"""
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
UsmUserData(self.username, self.auth_key, self.priv_key,
authProtocol=self.auth_protocol,
privProtocol=self.priv_protocol),
UdpTransportTarget((self.host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
return self._handle_response(error_indication, error_status,
error_index, var_binds)
def set(self, oid, value):
"""SNMPv3 SET操作"""
error_indication, error_status, error_index, var_binds = next(
setCmd(SnmpEngine(),
UsmUserData(self.username, self.auth_key, self.priv_key,
authProtocol=self.auth_protocol,
privProtocol=self.priv_protocol),
UdpTransportTarget((self.host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid), value))
)
return self._handle_response(error_indication, error_status,
error_index, var_binds)
def _handle_response(self, error_indication, error_status,
error_index, var_binds):
"""处理SNMP响应"""
if error_indication:
return {'success': False, 'error': str(error_indication)}
elif error_status:
return {'success': False, 'error': f"{error_status.prettyPrint()} at {error_index}"}
else:
result = {}
for var_bind in var_binds:
result[str(var_bind[0])] = str(var_bind[1])
return {'success': True, 'data': result}
# 使用示例
if __name__ == "__main__":
v3_manager = SNMPv3Manager('192.168.1.1', 'admin', 'AuthPass123', 'PrivPass123')
# 获取系统信息
result = v3_manager.get('1.3.6.1.2.1.1.1.0')
if result['success']:
print(f"系统描述: {result['data']}")
else:
print(f"错误: {result['error']}")
import org.snmp4j.*;
import org.snmp4j.transport.DefaultUdpTransportMapping;
import org.snmp4j.mp.SnmpConstants;
import org.snmp4j.smi.*;
import org.snmp4j.util.*;
public class SNMPManager {
private Snmp snmp;
private String address;
public SNMPManager(String address) {
this.address = address;
}
public void start() throws Exception {
TransportMapping transport = new DefaultUdpTransportMapping();
snmp = new Snmp(transport);
transport.listen();
}
public String get(String oid) throws Exception {
ResponseEvent response = null;
PDU pdu = new PDU();
pdu.add(new VariableBinding(new OID(oid)));
pdu.setType(PDU.GET);
CommunityTarget target = new CommunityTarget();
target.setCommunity(new OctetString("public"));
target.setAddress(new UdpAddress(address + "/161"));
target.setRetries(2);
target.setTimeout(1500);
target.setVersion(SnmpConstants.version2c);
response = snmp.send(pdu, target);
if (response != null) {
PDU responsePDU = response.getResponse();
if (responsePDU != null) {
return responsePDU.getVariableBindings().toString();
}
}
return null;
}
public void walk(String oid) throws Exception {
TreeUtils treeUtils = new TreeUtils(snmp, new DefaultPDUFactory());
OID targetOID = new OID(oid);
List events = treeUtils.getSubtree(new CommunityTarget(), targetOID);
for (TreeEvent event : events) {
if (event != null) {
VariableBinding[] varBindings = event.getVariableBindings();
if (varBindings != null) {
for (VariableBinding varBinding : varBindings) {
System.out.println(varBinding.getOid() + " = " + varBinding.getVariable());
}
}
}
}
}
public static void main(String[] args) {
try {
SNMPManager manager = new SNMPManager("192.168.1.1");
manager.start();
// 获取系统描述
String result = manager.get("1.3.6.1.2.1.1.1.0");
System.out.println("系统描述: " + result);
// 遍历接口信息
manager.walk("1.3.6.1.2.1.2.2");
} catch (Exception e) {
e.printStackTrace();
}
}
}
#include <net-snmp/net-snmp-config.h>
#include <net-snmp/net-snmp-includes.h>
#include <iostream>
#include <string>
class SNMPClient {
private:
struct snmp_session session, *ss;
char *host;
public:
SNMPClient(const char* hostname) {
host = strdup(hostname);
init_snmp("snmpapp");
}
~SNMPClient() {
free(host);
}
std::string get(const char* oid_str) {
oid anOID[MAX_OID_LEN];
size_t anOID_len = MAX_OID_LEN;
snmp_sess_init(&session);
session.peername = host;
session.version = SNMP_VERSION_2c;
session.community = (u_char*)"public";
session.community_len = strlen("public");
ss = snmp_open(&session);
if (!ss) {
return "无法打开SNMP会话";
}
if (!snmp_parse_oid(oid_str, anOID, &anOID_len)) {
snmp_close(ss);
return "无效的OID";
}
struct snmp_pdu *pdu = snmp_pdu_create(SNMP_MSG_GET);
snmp_add_null_var(pdu, anOID, anOID_len);
struct snmp_pdu *response;
int status = snmp_synch_response(ss, pdu, &response);
std::string result;
if (status == STAT_SUCCESS && response->errstat == SNMP_ERR_NOERROR) {
char buffer[1024];
snprint_variable(buffer, sizeof(buffer),
response->variables->name,
response->variables->name_length,
response->variables);
result = buffer;
} else {
result = "SNMP请求失败";
}
if (response) snmp_free_pdu(response);
snmp_close(ss);
return result;
}
};
int main() {
SNMPClient client("192.168.1.1");
// 获取系统描述
std::string result = client.get("1.3.6.1.2.1.1.1.0");
std::cout << "系统描述: " << result << std::endl;
return 0;
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pysnmp.hlapi import *
from concurrent.futures import ThreadPoolExecutor
import time
class BulkSNMPManager:
def __init__(self, hosts, community='public'):
self.hosts = hosts
self.community = community
def bulk_get(self, oids, max_threads=10):
"""批量获取多个主机的多个OID"""
results = {}
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for host in self.hosts:
for oid in oids:
future = executor.submit(self._get_single, host, oid)
futures.append((host, oid, future))
for host, oid, future in futures:
try:
result = future.result(timeout=5)
if host not in results:
results[host] = {}
results[host][oid] = result
except Exception as e:
if host not in results:
results[host] = {}
results[host][oid] = f"错误: {str(e)}"
return results
def _get_single(self, host, oid):
"""单个SNMP GET操作"""
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication:
return f"SNMP错误: {error_indication}"
elif error_status:
return f"SNMP错误: {error_status.prettyPrint()}"
else:
for var_bind in var_binds:
return str(var_bind[1])
# 使用示例
if __name__ == "__main__":
hosts = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
oids = [
'1.3.6.1.2.1.1.1.0', # 系统描述
'1.3.6.1.2.1.1.3.0', # 系统运行时间
'1.3.6.1.2.1.1.5.0' # 系统名称
]
manager = BulkSNMPManager(hosts)
start_time = time.time()
results = manager.bulk_get(oids)
end_time = time.time()
print(f"批量获取完成,耗时: {end_time - start_time:.2f}秒")
for host, data in results.items():
print(f"\n主机: {host}")
for oid, value in data.items():
print(f" {oid}: {value}")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pysnmp.entity.rfc3413 import ntforg
from pysnmp.entity import engine, config
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.smi import view, compiler
import time
class TrapReceiver:
def __init__(self, listen_port=162):
self.snmpEngine = engine.SnmpEngine()
self.listen_port = listen_port
self._setup_receiver()
def _setup_receiver(self):
"""设置Trap接收器"""
# 配置传输
config.addTransport(
self.snmpEngine,
udp.domainName + (self.listen_port,),
udp.UdpTransport().openServerMode(('0.0.0.0', self.listen_port))
)
# 配置通知接收器
self.ntfOrg = ntforg.NotificationOriginator(self.snmpEngine)
# 注册回调函数
self.ntfOrg.cbfun = self._process_trap
def _process_trap(self, sendPduHandle, errorIndication,
errorStatus, errorIndex, cbCtx):
"""处理接收到的Trap"""
if errorIndication:
print(f"Trap接收错误: {errorIndication}")
return
if errorStatus:
print(f"Trap状态错误: {errorStatus}")
return
# 解析Trap内容
pdu = cbCtx['pdu']
print(f"收到Trap - 类型: {pdu.getTagSet()}")
print(f"发送者: {cbCtx['transportAddress']}")
print(f"时间戳: {pdu.getTimeStamp()}")
# 显示变量绑定
for varBind in pdu.getVariableBindings():
print(f" {varBind[0]} = {varBind[1]}")
print("-" * 50)
def start(self):
"""启动Trap接收器"""
print(f"Trap接收器启动,监听端口: {self.listen_port}")
try:
self.snmpEngine.runDispatcher()
except KeyboardInterrupt:
print("\n停止Trap接收器")
finally:
self.snmpEngine.transportDispatcher.closeDispatcher()
# 使用示例
if __name__ == "__main__":
receiver = TrapReceiver()
receiver.start()
# 诊断步骤 1. 检查网络连通性 ping 192.168.1.1 2. 检查SNMP端口 telnet 192.168.1.1 161 nmap -p 161 192.168.1.1 3. 检查防火墙规则 iptables -L -n | grep 161 firewall-cmd --list-ports 4. 验证SNMP服务状态 show snmp-agent # 华为设备 show snmp # Cisco设备
# 认证问题排查 1. 检查community字符串配置 snmpwalk -v2c -c public 192.168.1.1 system 2. 验证SNMPv3用户配置 snmpwalk -v3 -u admin -l authPriv -a MD5 -A AuthPass123 -x AES -X PrivPass123 192.168.1.1 3. 检查访问控制列表 display acl all # 华为设备 show access-lists # Cisco设备 4. 验证用户权限 display snmp-agent trap queue # 华为设备 show snmp user # Cisco设备
# 性能优化措施 1. 使用GetBulk替代多次Get snmpbulkget -v2c -c public 192.168.1.1 system 0 10 2. 调整超时和重试参数 snmpwalk -t 10 -r 3 -v2c -c public 192.168.1.1 3. 限制查询范围 snmpwalk -v2c -c public 192.168.1.1 .1.3.6.1.2.1.2.2.1 4. 使用批量查询工具 # 实现并发查询脚本
# 使用tcpdump抓取SNMP流量 tcpdump -i eth0 -n -s 0 -w snmp.pcap udp and port 161 # 使用Wireshark分析 # 过滤条件: udp.port == 161 or udp.port == 162 # 分析要点: # 1. 检查SNMP版本是否正确 # 2. 验证community字符串 # 3. 查看OID是否有效 # 4. 检查响应状态码 # 5. 分析响应时间
# 华为设备日志 display snmp-agent packet statistics display snmp-agent trap queue display logbuffer | include SNMP # Cisco设备日志 show logging | include SNMP show snmp statistics show snmp packets # Linux SNMP日志 tail -f /var/log/syslog | grep snmp journalctl -u snmpd -f
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import time
from collections import defaultdict
from pysnmp.hlapi import *
class DistributedSNMPManager:
def __init__(self):
self.regions = {
'datacenter': ['10.0.1.0/24', '10.0.2.0/24'],
'office': ['10.1.1.0/24'],
'warehouse': ['10.2.1.0/24']
}
self.region_managers = {}
self.global_stats = defaultdict(dict)
def setup_region_manager(self, region_name, subnet):
"""设置区域管理器"""
manager = RegionSNMPManager(region_name, subnet)
self.region_managers[region_name] = manager
return manager
def start_monitoring(self):
"""启动分布式监控"""
threads = []
for region_name, subnets in self.regions.items():
for subnet in subnets:
manager = self.setup_region_manager(region_name, subnet)
thread = threading.Thread(target=manager.monitor_loop)
thread.daemon = True
thread.start()
threads.append(thread)
# 全局数据聚合线程
global_thread = threading.Thread(target=self.aggregate_data)
global_thread.daemon = True
global_thread.start()
threads.append(global_thread)
return threads
def aggregate_data(self):
"""聚合各区域数据"""
while True:
for region_name, manager in self.region_managers.items():
self.global_stats[region_name] = manager.get_stats()
print("=== 全局监控统计 ===")
for region, stats in self.global_stats.items():
print(f"{region}: {len(stats)} 设备在线")
time.sleep(60) # 每分钟聚合一次
class RegionSNMPManager:
def __init__(self, region_name, subnet):
self.region_name = region_name
self.subnet = subnet
self.devices = self._discover_devices()
self.stats = {}
def _discover_devices(self):
"""发现网络中的SNMP设备"""
# 简化的设备发现逻辑
# 实际应用中可以使用nmap或自定义扫描
devices = []
base_ip = '.'.join(self.subnet.split('.')[:-1])
for i in range(1, 255):
ip = f"{base_ip}.{i}"
if self._test_snmp(ip):
devices.append(ip)
return devices
def _test_snmp(self, ip):
"""测试SNMP连通性"""
try:
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity('1.3.6.1.2.1.1.1.0')),
timeout=1,
retries=0)
)
return error_indication is None and error_status == 0
except:
return False
def monitor_loop(self):
"""区域监控循环"""
while True:
for device in self.devices:
self._collect_device_stats(device)
time.sleep(30) # 30秒采集一次
def _collect_device_stats(self, device):
"""采集设备统计信息"""
oids = {
'sysDescr': '1.3.6.1.2.1.1.1.0',
'sysUpTime': '1.3.6.1.2.1.1.3.0',
'sysName': '1.3.6.1.2.1.1.5.0'
}
device_stats = {}
for name, oid in oids.items():
try:
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((device, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication is None and error_status == 0:
for var_bind in var_binds:
device_stats[name] = str(var_bind[1])
else:
device_stats[name] = "获取失败"
except:
device_stats[name] = "连接失败"
self.stats[device] = device_stats
def get_stats(self):
"""获取区域统计信息"""
return self.stats.copy()
# 使用示例
if __name__ == "__main__":
manager = DistributedSNMPManager()
threads = manager.start_monitoring()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n停止分布式监控")
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import time
from pysnmp.hlapi import *
from datetime import datetime
class IoTSNMPMonitor:
def __init__(self):
self.iot_devices = {
'sensor_001': {'ip': '192.168.100.10', 'type': 'temperature'},
'sensor_002': {'ip': '192.168.100.11', 'type': 'humidity'},
'sensor_003': {'ip': '192.168.100.12', 'type': 'pressure'},
'gateway_001': {'ip': '192.168.100.1', 'type': 'gateway'}
}
self.alert_thresholds = {
'temperature': {'min': 18, 'max': 28},
'humidity': {'min': 40, 'max': 70},
'pressure': {'min': 980, 'max': 1050}
}
def monitor_iot_devices(self):
"""监控IoT设备"""
while True:
timestamp = datetime.now().isoformat()
for device_id, device_info in self.iot_devices.items():
data = self._collect_iot_data(device_info['ip'], device_info['type'])
# 处理数据
processed_data = self._process_iot_data(device_id, data, timestamp)
# 检查告警
self._check_alerts(device_id, processed_data)
# 存储数据
self._store_data(device_id, processed_data)
time.sleep(10) # 10秒采集间隔
def _collect_iot_data(self, ip, device_type):
"""采集IoT设备数据"""
oid_mapping = {
'temperature': '1.3.6.1.4.1.9999.1.1.0',
'humidity': '1.3.6.1.4.1.9999.1.2.0',
'pressure': '1.3.6.1.4.1.9999.1.3.0',
'gateway': '1.3.6.1.4.1.9999.2.1.0'
}
oid = oid_mapping.get(device_type, '1.3.6.1.2.1.1.1.0')
try:
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication is None and error_status == 0:
for var_bind in var_binds:
return float(var_bind[1])
else:
return None
except:
return None
def _process_iot_data(self, device_id, raw_data, timestamp):
"""处理IoT数据"""
device_type = self.iot_devices[device_id]['type']
processed = {
'device_id': device_id,
'device_type': device_type,
'timestamp': timestamp,
'raw_value': raw_data,
'status': 'online' if raw_data is not None else 'offline'
}
if raw_data is not None:
# 数据平滑处理
processed['smoothed_value'] = self._smooth_data(device_id, raw_data)
# 趋势分析
processed['trend'] = self._analyze_trend(device_id, raw_data)
return processed
def _check_alerts(self, device_id, data):
"""检查告警条件"""
device_type = self.iot_devices[device_id]['type']
if device_type in self.alert_thresholds and data.get('raw_value') is not None:
value = data['raw_value']
thresholds = self.alert_thresholds[device_type]
if value < thresholds['min'] or value > thresholds['max']:
alert = {
'device_id': device_id,
'device_type': device_type,
'value': value,
'threshold': thresholds,
'timestamp': data['timestamp'],
'severity': 'warning' if abs(value - thresholds['min']) < 5 or abs(value - thresholds['max']) < 5 else 'critical'
}
self._send_alert(alert)
def _send_alert(self, alert):
"""发送告警"""
print(f"🚨 告警: {json.dumps(alert, indent=2, ensure_ascii=False)}")
# 这里可以集成邮件、短信、Webhook等告警方式
# send_email(alert)
# send_sms(alert)
# webhook_send(alert)
def _store_data(self, device_id, data):
"""存储数据"""
# 存储到时序数据库(如InfluxDB、Prometheus等)
print(f"存储数据: {device_id} -> {data}")
# 示例:存储到文件
with open(f"iot_data_{device_id}.json", "a") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
# 使用示例
if __name__ == "__main__":
monitor = IoTSNMPMonitor()
monitor.monitor_iot_devices()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import requests
import time
from pysnmp.hlapi import *
from datetime import datetime
class CloudSNMPBridge:
def __init__(self, cloud_config):
self.cloud_config = cloud_config
self.devices = self._load_device_config()
self.cloud_endpoint = cloud_config['endpoint']
self.api_key = cloud_config['api_key']
def _load_device_config(self):
"""加载设备配置"""
return {
'router_001': {
'ip': '10.0.1.1',
'type': 'network',
'metrics': ['cpu', 'memory', 'interface']
},
'switch_001': {
'ip': '10.0.1.2',
'type': 'network',
'metrics': ['port_status', 'traffic']
},
'server_001': {
'ip': '10.0.2.10',
'type': 'server',
'metrics': ['disk', 'load', 'network']
}
}
def bridge_to_cloud(self):
"""桥接SNMP数据到云平台"""
while True:
timestamp = datetime.now().isoformat()
for device_id, device_config in self.devices.items():
# 收集SNMP数据
snmp_data = self._collect_device_metrics(device_config)
# 转换为云平台格式
cloud_data = self._transform_to_cloud_format(
device_id, snmp_data, timestamp
)
# 发送到云平台
self._send_to_cloud(cloud_data)
time.sleep(60) # 每分钟发送一次
def _collect_device_metrics(self, device_config):
"""收集设备指标"""
metrics = {}
ip = device_config['ip']
for metric_type in device_config['metrics']:
if metric_type == 'cpu':
metrics['cpu_usage'] = self._get_cpu_usage(ip)
elif metric_type == 'memory':
metrics['memory_usage'] = self._get_memory_usage(ip)
elif metric_type == 'interface':
metrics['interface_stats'] = self._get_interface_stats(ip)
elif metric_type == 'port_status':
metrics['port_status'] = self._get_port_status(ip)
elif metric_type == 'traffic':
metrics['traffic_stats'] = self._get_traffic_stats(ip)
elif metric_type == 'disk':
metrics['disk_usage'] = self._get_disk_usage(ip)
elif metric_type == 'load':
metrics['system_load'] = self._get_system_load(ip)
elif metric_type == 'network':
metrics['network_stats'] = self._get_network_stats(ip)
return metrics
def _get_cpu_usage(self, ip):
"""获取CPU使用率"""
oid = '1.3.6.1.4.1.2021.11.9.0' # Net-SNMP CPU使用率OID
try:
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication is None and error_status == 0:
for var_bind in var_binds:
return float(var_bind[1])
except:
pass
return None
def _get_memory_usage(self, ip):
"""获取内存使用率"""
# 实现内存使用率获取逻辑
return None
def _get_interface_stats(self, ip):
"""获取接口统计信息"""
interface_oids = [
'1.3.6.1.2.1.2.2.1.10', # ifInOctets
'1.3.6.1.2.1.2.2.1.16' # ifOutOctets
]
stats = {}
for oid in interface_oids:
try:
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((ip, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication is None and error_status == 0:
for var_bind in var_binds:
stats[oid] = int(var_bind[1])
except:
pass
return stats
def _transform_to_cloud_format(self, device_id, snmp_data, timestamp):
"""转换为云平台格式"""
cloud_data = {
'device_id': device_id,
'timestamp': timestamp,
'source': 'snmp',
'metrics': []
}
for metric_name, value in snmp_data.items():
if value is not None:
cloud_data['metrics'].append({
'name': metric_name,
'value': value,
'unit': self._get_metric_unit(metric_name),
'type': 'gauge' if isinstance(value, (int, float)) else 'string'
})
return cloud_data
def _get_metric_unit(self, metric_name):
"""获取指标单位"""
unit_mapping = {
'cpu_usage': 'percent',
'memory_usage': 'percent',
'disk_usage': 'percent',
'interface_stats': 'bytes',
'traffic_stats': 'bytes',
'port_status': 'boolean'
}
return unit_mapping.get(metric_name, 'unknown')
def _send_to_cloud(self, data):
"""发送数据到云平台"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.api_key}'
}
try:
response = requests.post(
self.cloud_endpoint,
headers=headers,
json=data,
timeout=10
)
if response.status_code == 200:
print(f"✅ 数据发送成功: {data['device_id']}")
else:
print(f"❌ 数据发送失败: {response.status_code}")
except Exception as e:
print(f"❌ 发送异常: {str(e)}")
# 云平台配置示例
cloud_config = {
'endpoint': 'https://api.monitoring.cloud.com/v1/metrics',
'api_key': 'your-api-key-here',
'region': 'us-west-2'
}
# 使用示例
if __name__ == "__main__":
bridge = CloudSNMPBridge(cloud_config)
bridge.bridge_to_cloud()
# 华为设备高级安全配置 [Huawei] snmp-agent [Huawei] snmp-agent sys-info version v3 [Huawei] snmp-agent packet-size 1500 [Huawei] snmp-agent local-engineid 800007DB030018E226011E00 # 创建安全用户组 [Huawei] snmp-agent group v3 sec-admin privacy read sec-view write sec-view [Huawei] snmp-agent group v3 sec-readonly auth read sec-view # 创建管理员用户 [Huawei] snmp-agent usm-user v3 admin admin authentication-mode sha cipher SecureAuth2023 privacy-mode aes 256 cipher SecurePriv2023 # 创建只读用户 [Huawei] snmp-agent usm-user v3 readonly readonly authentication-mode md5 cipher ReadOnlyAuth123 # 配置视图 [Huawei] snmp-agent mib-view included sec-view 1.3.6.1 [Huawei] snmp-agent mib-view excluded sec-view 1.3.6.1.4.1.2021.100 # 配置Trap安全 [Huawei] snmp-agent target-host trap-hostname SEC-NMS address 192.168.1.100 params securityname admin v3 privacy [Huawei] snmp-agent trap enable standard authentication-failure [Huawei] snmp-agent trap enable standard warmstart [Huawei] snmp-agent trap enable standard linkdown [Huawei] snmp-agent trap enable standard linkup # 访问控制 [Huawei] acl 2000 [Huawei-acl-basic-2000] rule 5 permit source 192.168.1.0 0.0.0.255 [Huawei-acl-basic-2000] rule 10 deny source any [Huawei] snmp-agent acl 2000
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from pysnmp.hlapi import *
import asyncio
import time
class OptimizedSNMPManager:
def __init__(self):
self.batch_size = 50
self.max_workers = 20
self.timeout = 2
self.retries = 1
async def optimized_bulk_walk(self, host, start_oid, max_repetitions=50):
"""优化的批量遍历"""
results = {}
current_oid = start_oid
while True:
# 使用GetBulk获取批量数据
batch_results = await self._get_bulk_async(
host, current_oid, max_repetitions
)
if not batch_results:
break
# 处理批量结果
for oid, value in batch_results.items():
results[oid] = value
current_oid = oid
# 检查是否到达分支末尾
if not self._is_next_branch(start_oid, current_oid, batch_results):
break
return results
async def _get_bulk_async(self, host, oid, max_repetitions):
"""异步GetBulk操作"""
loop = asyncio.get_event_loop()
def get_bulk():
error_indication, error_status, error_index, var_binds = next(
bulkCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((host, 161)),
ContextData(),
0, max_repetitions,
ObjectType(ObjectIdentity(oid)))
)
if error_indication:
return {}
elif error_status:
return {}
else:
results = {}
for var_bind in var_binds:
results[str(var_bind[0])] = str(var_bind[1])
return results
return await loop.run_in_executor(None, get_bulk)
def _is_next_branch(self, start_oid, current_oid, results):
"""检查是否还有下一个分支"""
# 简化的分支检查逻辑
return len(results) > 0
async def parallel_device_query(self, devices, oids):
"""并行查询多个设备"""
tasks = []
for device in devices:
for oid in oids:
task = self._query_device_async(device, oid)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 组织结果
organized_results = {}
device_oid_pairs = [(d, o) for d in devices for o in oids]
for i, result in enumerate(results):
device, oid = device_oid_pairs[i]
if device not in organized_results:
organized_results[device] = {}
if isinstance(result, Exception):
organized_results[device][oid] = f"错误: {str(result)}"
else:
organized_results[device][oid] = result
return organized_results
async def _query_device_async(self, host, oid):
"""异步查询单个设备"""
loop = asyncio.get_event_loop()
def query():
error_indication, error_status, error_index, var_binds = next(
getCmd(SnmpEngine(),
CommunityData('public'),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)))
)
if error_indication:
return f"SNMP错误: {error_indication}"
elif error_status:
return f"SNMP错误: {error_status.prettyPrint()}"
else:
for var_bind in var_binds:
return str(var_bind[1])
return await loop.run_in_executor(None, query)
# 使用示例
async def main():
manager = OptimizedSNMPManager()
devices = ['192.168.1.1', '192.168.1.2', '192.168.1.3']
oids = [
'1.3.6.1.2.1.1.1.0', # 系统描述
'1.3.6.1.2.1.1.3.0', # 系统运行时间
'1.3.6.1.2.1.1.5.0' # 系统名称
]
start_time = time.time()
results = await manager.parallel_device_query(devices, oids)
end_time = time.time()
print(f"并行查询完成,耗时: {end_time - start_time:.2f}秒")
for device, data in results.items():
print(f"\n设备: {device}")
for oid, value in data.items():
print(f" {oid}: {value}")
if __name__ == "__main__":
asyncio.run(main())