一、摘要
本文介绍了一个基于SNMP协议的网络设备批量巡检脚本,该脚本使用pysnmp库实现了对网络设备接口状态的自动采集和分析。简要介绍了SNMP协议并对比了不同网络设备管理方式的优缺点,讲解了pysnmp库的使用方法,最后对脚本的功能、使用场景和局限性进行了分析。
二、练习环境
- 使用华三HCL模拟器进行设备模拟,确认本地电脑可ping通模拟设备。具体可参考我的文章[网络自动化学习笔记-H3C 模拟器(HCL)基础环境配置]。
- 辅助工具
[ManageEngine MibBrowser]
通过工具,使用SNMP查询设备信息,辅助验证脚本功能。
- Python环境
- Python 3.14
- pysnmp,pysmi 库
- VS code(可选)
三、SNMP协议简介
SNMP(Simple Network Management Protocol)是一种用于网络管理的标准协议,它允许网络管理员监控网络设备的状态、配置和性能。
提醒
只使用SNMP进行巡检和性能分析,不建议使用SNMP进行配置,尤其是实际网络中尽量不要使用配置功能。
可能因空格,缩进,标识符等原因,导致CLI界面对应配置项不可用。
3.1 SNMP在网络设备管理中的作用
- 实时监控:可以实时获取设备的运行状态,如接口状态、CPU使用率、内存使用情况等
- 故障诊断:当网络出现问题时,可以通过SNMP快速定位故障点
- 性能分析:通过收集历史数据,可以分析设备的性能趋势,预测潜在问题
- - 配置管理:【不推荐,不建议】可以远程配置设备参数,减少人工操作
四、CLI,SNMP,Netconf的不同方式对比
4.1 Paramiko CLI方式
优势:
- 可以执行任意CLI命令,功能全面
- 易操作易理解,因为多数工程师才能熟练使用CLI命令行。
- 可以获取设备特有的信息
- 对设备的控制能力强
劣势:
- 依赖设备的CLI接口,不同设备命令格式不同
- 解析命令输出复杂,容易出错
- 需要回读屏显获取数据,不方便
- 效率低,不适合大规模设备管理
- 安全性依赖于SSH配置
4.2 SNMP方式
优势:
- 标准化,跨设备兼容性好
- 效率高,适合批量操作
- 实时性好,响应速度快
- 实现简单,开发成本低
劣势:
- 功能相对有限,某些设备特有信息无法获取,或需要依赖厂家私有MIB。
- 安全性较差(v1/v2c)
- 依赖设备的SNMP实现质量
4.3 NETconf方式
优势:
- 基于XML/JSON,数据结构清晰
- 支持配置和状态的获取与修改
- 安全性高,支持多种认证方式
- 功能强大,可扩展性好
劣势:
- 实现复杂,开发成本高
- 对设备的支持程度不如SNMP广泛
- 占用带宽较大,不适合带宽有限的环境
- 学习曲线较陡峭
五、Pysnmp库介绍
Pysnmp是一个纯Python实现的SNMP库,支持SNMP v1、v2c和v3版本。它提供了丰富的API,使得开发者可以方便地实现SNMP管理功能。
5.1 版本区别
Pysnmp有两个主要版本:
- 传统版本(v4.x):使用同步API,导入方式为from pysnmp.hlapi import *
- 最新版本(v5.x):支持异步API,导入方式为from pysnmp.hlapi.v3arch.asyncio import *
本文使用的是最新版本的pysnmp,它支持异步操作,更适合处理大量设备的并发查询。
5.2 常见Pysnmp函数示例
以下示例均来自官方网站【GET Operation - PySNMP 7.1 Documentation】,并稍了做修改。
5.2.1 get操作(获取单个OID的值)
- import asyncio
- from pysnmp.hlapi.v3arch.asyncio import *
- async def run():
- errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
- SnmpEngine(),
- CommunityData('readtest', mpModel=1),
- await UdpTransportTarget.create(('10.6.6.6', 161)),
- ContextData(),
- ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0)),
- ObjectType(ObjectIdentity('IF-MIB', 'ifName', 2))
- )
- for varBind in varBinds:
- print("-----------------------------")
- print(f"查询到的内容是{varBind}")
- print(f"sysName的OID值是{varBind[0]}")
- print(f"ifName的OID值是{varBind[1]}")
- print(f"sysName的可读化输出值是{varBind[0].prettyPrint()}")
- print(f"ifName的可读化输出值是{varBind[1].prettyPrint()}")
- print(f"{varBind[0].prettyPrint()} = {varBind[1].prettyPrint()}")
- print("-----------------------------")
- asyncio.run(run())
复制代码 示例运行截图如下
5.2.2 next操作(获取下一个OID的值)
- import asyncio
- from pysnmp.hlapi.v3arch.asyncio import *
- async def run():
- errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
- SnmpEngine(),
- CommunityData('readtest', mpModel=1),
- await UdpTransportTarget.create(('10.3.3.3', 161)),
- ContextData(),
- ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)), #会返回sysContact的下一个OID值
- ObjectType(ObjectIdentity('IF-MIB', 'ifName', 2)), #会返回ifName.2的下一个OID值
- lexicographicMode=False
- )
- for varBind in varBinds:
- print("-----------------------------")
- print(varBind[1].prettyPrint())
- print("-----------------------------")
-
- asyncio.run(run())
复制代码 next操作会获取下一个值,如下图所示
示例运行截图如下
脚本查询sysContact,实际返回值是sysName的值,查询ifName.2,实际返回值是ifName.3的值。
5.2.3 bulk操作(批量获取多个OID的值)
- import asyncio
- from pysnmp.hlapi.v3arch.asyncio import *
- async def run():
- errorIndication, errorStatus, errorIndex, varBinds = await bulk_cmd(
- SnmpEngine(),
- CommunityData('readtest', mpModel=1),
- await UdpTransportTarget.create(('10.3.3.3', 161)),
- ContextData(),
- 0, 100, #循环查询100次。
- ObjectType(ObjectIdentity('IF-MIB', 'ifName'))
- )
- for varBind in varBinds:
- print(varBind)
-
- asyncio.run(run())
复制代码 示例运行结果如下
因为是循环取值,结果显示范围远远超出了ifName的边界。
六、脚本介绍
6.1 功能概述
本次练习的脚本是一个基于SNMP v2c的模拟批量设备巡检工具,主要功能包括:
- 设备IP管理:从devices.txt文件读取设备IP列表,支持空行、注释行和重复IP的自动处理
- SNMP查询:使用异步方式批量查询设备的接口信息,包括接口索引、描述、名称、管理状态和运行状态
- 结果处理:将查询结果保存到CSV文件,并生成详细的日志文件
- 错误处理:对查询过程中的错误进行捕获和记录
6.2 脚本内容
- 导入模块
- 构建日志配置函数,IP文件读取函数,SNMP查询配置,配置格式化函数,查询函数,结果保存函数
- 主函数调用构建的功能函数,脚本入口。
- import asyncio
- import csv
- import logging
- import time
- from datetime import datetime
- from pysnmp.hlapi.v3arch.asyncio import *
- # 配置日志函数
- def setup_logger(log_file):
- """设置日志配置"""
- logger = logging.getLogger(log_file)
- logger.setLevel(logging.INFO)
-
- # 清除之前的处理器
- for handler in logger.handlers[:]:
- logger.removeHandler(handler)
-
- # 创建文件处理器
- file_handler = logging.FileHandler(log_file, encoding='utf-8')
- file_handler.setLevel(logging.INFO)
- file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(file_handler)
-
- # 创建控制台处理器
- stream_handler = logging.StreamHandler()
- stream_handler.setLevel(logging.INFO)
- stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
- logger.addHandler(stream_handler)
-
- return logger
- # 读取设备IP列表
- def read_device_ips(file_path, logger=None):
- ips = []
- try:
- with open(file_path, 'r', encoding='utf-8') as f:
- for line in f:
- stripped_line = line.strip()
- if not stripped_line or stripped_line.startswith("#"):
- continue #跳过空行和注释行
- ips.append(stripped_line)
-
- set_ip = set(ips)
- if len(set_ip) == len(ips):
- message = f"读取到的IP地址列表,共{len(ips)}个IP地址,没有重复的IP地址"
- print(message)
- if logger:
- logger.info(message)
- else:
- count_duplicates = len(ips) - len(set_ip)
- message = f"读取到的IP地址列表,共{len(ips)}个IP地址,有{count_duplicates}个重复的IP地址,已自动去重"
- print(message)
- if logger:
- logger.info(message)
- ips = list(set_ip)
-
- message = f"成功读取{len(ips)}个有效设备IP"
- print(message)
- if logger:
- logger.info(message)
- return ips
- except Exception as e:
- message = f"读取设备IP文件失败: {e}"
- print(message)
- if logger:
- logger.error(message)
- return []
- # 定义SNMP查询配置
- PORT_CONFIG = [
- ("IF-MIB", "ifIndex","端口索引"),
- ("IF-MIB", "ifDescr","端口描述"),
- ("IF-MIB", "ifName","端口名称"),
- ("IF-MIB", "ifAdminStatus","物理状态"),
- ("IF-MIB", "ifOperStatus","协商状态")
- ]
- #构建OID路径,并返回格式化的MIB列表、 表格映射、 起始OID路径
- def format_config(config=PORT_CONFIG):
- format_mib = []
- format_table = {}
- start_name = ""
- for item in config:
- if len(item) == 3:
- format_mib.append(ObjectType(ObjectIdentity(item[0], item[1])))
- format_table[item[1]] = item[2] # 端口描述
- start_name = f"{item[0]}::{item[1]}"
- else:
- format_mib.append(ObjectType(ObjectIdentity(item[0], item[1])))
- format_table[item[1]] = item[1]
- start_name = f"{item[0]}::{item[1]}"
- print(f"format_table is [{format_table}]")
- print(f"start_name is {start_name}")
- print(f"format_mib is {format_mib}")
- return format_mib, format_table, start_name
- async def get_query(ip, community, config=PORT_CONFIG, logger=None):
- """
- 使用next_cmd自动遍历所有接口的信息,直到没有更多数据为止。
- 通过分析返回的OID,判断是否继续查询下一个接口。
- """
- try:
- snmpEngine = SnmpEngine()
- transport = await UdpTransportTarget.create((ip, 161))
-
- # 存储结果的字典
- interface_details = {}
- hostname = ""
- message = "开始遍历所有接口..."
- print(message)
- if logger:
- logger.info(f"{ip} - {message}")
-
- # 首先查询sysName
- errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
- snmpEngine,
- CommunityData(community, mpModel=1),
- transport,
- ContextData(),
- ObjectType(ObjectIdentity("SNMPv2-MIB", "sysName")),
- lexicographicMode=False
- )
-
- # 处理sysName
- if not errorIndication and not errorStatus and varBinds:
- for varBind in varBinds:
- try:
- if isinstance(varBind, ObjectType):
- oid = varBind[0]
- val = varBind[1]
- elif isinstance(varBind, tuple) and len(varBind) >= 2:
- oid, val = varBind[0], varBind[1]
- else:
- continue
-
- oid_str = oid.prettyPrint()
- val_str = val.prettyPrint()
-
- if oid_str == 'SNMPv2-MIB::sysName.0':
- hostname = val_str
- message = f"系统名称: {hostname}"
- print(message)
- if logger:
- logger.info(f"{ip} - {message}")
- except Exception as e:
- message = f"处理sysName异常: {e}"
- print(message)
- if logger:
- logger.error(f"{ip} - {message}")
- continue
-
- # 遍历config中的每个OID进行查询
- for item in config:
- mib, oid_name, desc = item
- message = f"\n开始查询 {mib}::{oid_name}..."
- print(message)
- if logger:
- logger.info(f"{ip} - 开始查询 {mib}::{oid_name}...")
-
- # 统计当前OID查询到的条目数
- item_count = 0
- non_expected_oid = None
-
- # 执行初始的next_cmd查询
- errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
- snmpEngine,
- CommunityData(community, mpModel=1),
- transport,
- ContextData(),
- ObjectType(ObjectIdentity(mib, oid_name)),
- lexicographicMode=False
- )
-
- while not errorIndication and not errorStatus and varBinds:
- # 处理当前批次的数据
- last_oid = None
-
- for varBind in varBinds:
- try:
- # 解包varBind
- if isinstance(varBind, ObjectType):
- oid = varBind[0]
- val = varBind[1]
- elif isinstance(varBind, tuple) and len(varBind) >= 2:
- oid, val = varBind[0], varBind[1]
- else:
- continue
-
- oid_str = oid.prettyPrint()
- val_str = val.prettyPrint()
-
- # 检查是否是当前OID的延续
- if oid_str.startswith(f"{mib}::{oid_name}."):
- # 提取索引
- index = str(oid[-1]) # 直接从OID对象获取最后一个数字
-
- # 初始化接口信息
- if index not in interface_details:
- interface_details[index] = {}
- for config_item in config:
- interface_details[index][config_item[1]] = None
-
- # 更新接口信息
- interface_details[index][oid_name] = val_str
- last_oid = oid
- item_count += 1
- else:
- non_expected_oid = oid_str
- break
- except Exception as e:
- message = f"处理数据异常: {e}"
- print(message)
- if logger:
- logger.error(f"{ip} - {message}")
- continue
-
- # 继续下一次查询
- if non_expected_oid:
- break
- else:
- errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
- snmpEngine,
- CommunityData(community, mpModel=1),
- transport,
- ContextData(),
- ObjectType(ObjectIdentity(mib, oid_name, index)), # 从上次的最后一个值继续
- lexicographicMode=False # 只遍历当前OID子树
- )
-
- # 输出当前OID查询结果
- message = f"获取到: {mib}::{oid_name} 条目共计 {item_count} 个。"
- print(message)
- if logger:
- logger.info(f"{ip} - {message}")
- if non_expected_oid:
- message = f"非预期OID: {non_expected_oid}"
- print(message)
- if logger:
- logger.info(f"{ip} - {message}")
-
- # 输出总体结果
- interface_count = len(interface_details)
- config_count = len(config)
- message = f"\n共找到 {interface_count} 个接口,查询了它们 {config_count} 项信息。"
- print(message)
- if logger:
- logger.info(f"{ip} - {message}")
-
- # 按索引排序输出前10个接口示例
- print("前10个接口示例:")
- if logger:
- logger.info(f"{ip} - 前10个接口示例:")
- for idx, details in sorted(interface_details.items(), key=lambda x: int(x[0]))[:10]:
- interface_message = f" ifIndex.{idx}: {details.get('ifName', 'N/A')}"
- print(interface_message)
- if logger:
- logger.info(f"{ip} - {interface_message}")
-
- snmpEngine.close_dispatcher()
- return interface_details, hostname
-
- except Exception as e:
- print(f"异常: {e}")
- return {}, ""
- # 保存查询结果到CSV文件
- def save_results(interface_details, hostname, ip, file_path, config=PORT_CONFIG):
- try:
-
- with open(file_path, 'a', newline='', encoding='utf-8-sig') as f:
- writer = csv.writer(f)
- print(f"正在保存结果到 {file_path}...")
-
- # 写入表头
- writer.writerow(["=" * 80])
- line = f"主机名={hostname};设备IP={ip};检查时间:{time.strftime('%Y-%m-%d %H:%M:%S')}"
- writer.writerow([line])
- writer.writerow(["=" * 80])
-
- # 动态生成表头
- headers = [item[2] for item in config]
- writer.writerow(headers)
-
- # 写入数据
- for interface in interface_details.values():
- row = []
- for item in config:
- row.append(interface.get(item[1], ''))
- writer.writerow(row)
- print(f"成功保存查询结果到{file_path}")
- except Exception as e:
- print(f"保存查询结果失败: {e}")
- async def main():
- # 配置参数
- device_file = 'devices.txt' # 设备IP文件
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- port_result_file = f'snmp_port_results_{timestamp}.csv' # 端口结果文件
- port_log_file = f'snmp_port_results_log_{timestamp}.log' # 端口日志文件
- community = 'readtest' # SNMP团体名
-
- # 设置日志
- port_logger = setup_logger(port_log_file)
-
- # 读取设备IP
- device_ips = read_device_ips(device_file, logger=port_logger)
- if not device_ips:
- port_logger.error("没有设备IP可查询")
- return
-
- # 批量查询设备信息
- for ip in device_ips:
- port_logger.info(f"开始查询设备: {ip}")
-
- # 获取端口信息和主机名
- interface_details, hostname = await get_query(ip, community, config=PORT_CONFIG, logger=port_logger)
-
- # 保存端口结果
- if interface_details:
- save_results(interface_details, hostname, ip, port_result_file, config=PORT_CONFIG)
- port_logger.info(f"{ip} - 成功保存端口结果到 {port_result_file}")
-
- # 避免查询过快,添加短暂延迟
- await asyncio.sleep(1)
- port_logger.info("批量查询完成")
- if __name__ == "__main__":
- asyncio.run(main())
复制代码 6.3 脚本输出结果保存
6.4 脚本运行日志保存
6.5 脚本局限性
- 仅支持SNMP v2c:没有对SNMP v3进行适配,安全性较差
- 功能有限:目前只支持接口状态的查询,无法获取CPU、内存、温度等系统资源信息
- 可扩展性不足:添加新的查询项需要修改代码,不够灵活
- 错误处理简单:对于SNMP查询失败的情况,处理方式较为简单
- 配置固定:SNMP团体名、查询参数等配置硬编码在脚本中,不够灵活
6.6 后续练习中可能尝试的功能
- 添加系统资源监控:增加对CPU使用率、内存使用情况、设备温度等信息的查询
- 支持SNMP v3:添加对SNMP v3的支持,提高安全性
- 配置文件化:将配置参数移到配置文件中,方便用户修改
- 结果可视化:添加简单的结果可视化功能,如生成状态图表,或日志格式化提升可读性
- 告警机制:添加异常状态的告警功能,如接口down时发送通知
七、结语
SNMP作为一种成熟的网络管理协议,在网络设备监控和管理方面发挥着重要作用。本文介绍的批量巡检脚本利用pysnmp库实现了对网络设备接口状态的自动采集和分析,为网络管理员提供了一种高效、便捷的设备管理工具。
虽然该脚本目前存在一些局限性,但其基本功能已经能够满足日常网络巡检的需求。通过进一步的扩展和优化,它可以成为网络管理工作中的得力助手。
在实际应用中,网络管理员可以根据具体需求对脚本进行定制和扩展,例如添加更多的监控指标、实现更复杂的告警机制等,从而构建一个更加完善的网络监控系统。
SNMP协议虽然有其局限性,但在网络设备管理领域仍然是一种不可替代的工具。结合其他管理协议和工具,如NETconf、REST API等,可以构建一个更加全面、高效的网络管理解决方案。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |