找回密码
 立即注册
首页 业界区 安全 网络自动化学习-基于PySNMP的批量巡检(练习版) ...

网络自动化学习-基于PySNMP的批量巡检(练习版)

筒濂 昨天 10:20
一、摘要

本文介绍了一个基于SNMP协议的网络设备批量巡检脚本,该脚本使用pysnmp库实现了对网络设备接口状态的自动采集和分析。简要介绍了SNMP协议并对比了不同网络设备管理方式的优缺点,讲解了pysnmp库的使用方法,最后对脚本的功能、使用场景和局限性进行了分析。
二、练习环境


  • 使用华三HCL模拟器进行设备模拟,确认本地电脑可ping通模拟设备。具体可参考我的文章[网络自动化学习笔记-H3C 模拟器(HCL)基础环境配置]。
1.png


  • 辅助工具
    [ManageEngine MibBrowser]
    通过工具,使用SNMP查询设备信息,辅助验证脚本功能。
  • Python环境


  • Python 3.14
  • pysnmp,pysmi 库
  • VS code(可选)
三、SNMP协议简介

SNMP(Simple Network Management Protocol)是一种用于网络管理的标准协议,它允许网络管理员监控网络设备的状态、配置和性能。
提醒

只使用SNMP进行巡检和性能分析,不建议使用SNMP进行配置,尤其是实际网络中尽量不要使用配置功能
可能因空格,缩进,标识符等原因,导致CLI界面对应配置项不可用。
3.1 SNMP在网络设备管理中的作用


  • 实时监控:可以实时获取设备的运行状态,如接口状态、CPU使用率、内存使用情况等
  • 故障诊断:当网络出现问题时,可以通过SNMP快速定位故障点
  • 性能分析:通过收集历史数据,可以分析设备的性能趋势,预测潜在问题
  • - 配置管理:【不推荐,不建议】可以远程配置设备参数,减少人工操作
四、CLI,SNMP,Netconf的不同方式对比

4.1 Paramiko CLI方式

优势

  • 可以执行任意CLI命令,功能全面
  • 易操作易理解,因为多数工程师才能熟练使用CLI命令行。
  • 可以获取设备特有的信息
  • 对设备的控制能力强
劣势

  • 依赖设备的CLI接口,不同设备命令格式不同
  • 解析命令输出复杂,容易出错
  • 需要回读屏显获取数据,不方便
  • 效率低,不适合大规模设备管理
  • 安全性依赖于SSH配置
4.2 SNMP方式

优势

  • 标准化,跨设备兼容性好
  • 效率高,适合批量操作
  • 实时性好,响应速度快
  • 实现简单,开发成本低
劣势

  • 功能相对有限,某些设备特有信息无法获取,或需要依赖厂家私有MIB。
  • 安全性较差(v1/v2c)
  • 依赖设备的SNMP实现质量
4.3 NETconf方式

优势

  • 基于XML/JSON,数据结构清晰
  • 支持配置和状态的获取与修改
  • 安全性高,支持多种认证方式
  • 功能强大,可扩展性好
劣势

  • 实现复杂,开发成本高
  • 对设备的支持程度不如SNMP广泛
  • 占用带宽较大,不适合带宽有限的环境
  • 学习曲线较陡峭
五、Pysnmp库介绍

Pysnmp是一个纯Python实现的SNMP库,支持SNMP v1、v2c和v3版本。它提供了丰富的API,使得开发者可以方便地实现SNMP管理功能。
5.1 版本区别

Pysnmp有两个主要版本:

  • 传统版本(v4.x):使用同步API,导入方式为from pysnmp.hlapi import *
  • 最新版本(v5.x):支持异步API,导入方式为from pysnmp.hlapi.v3arch.asyncio import *
本文使用的是最新版本的pysnmp,它支持异步操作,更适合处理大量设备的并发查询。
5.2 常见Pysnmp函数示例

以下示例均来自官方网站【GET Operation - PySNMP 7.1 Documentation】,并稍了做修改。
5.2.1 get操作(获取单个OID的值)
  1. import asyncio
  2. from pysnmp.hlapi.v3arch.asyncio import *
  3. async def run():
  4. errorIndication, errorStatus, errorIndex, varBinds = await get_cmd(
  5. SnmpEngine(),
  6. CommunityData('readtest', mpModel=1),
  7. await UdpTransportTarget.create(('10.6.6.6', 161)),
  8. ContextData(),
  9. ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysName', 0)),
  10. ObjectType(ObjectIdentity('IF-MIB', 'ifName', 2))
  11. )
  12. for varBind in varBinds:
  13. print("-----------------------------")
  14. print(f"查询到的内容是{varBind}")
  15. print(f"sysName的OID值是{varBind[0]}")
  16. print(f"ifName的OID值是{varBind[1]}")
  17. print(f"sysName的可读化输出值是{varBind[0].prettyPrint()}")
  18. print(f"ifName的可读化输出值是{varBind[1].prettyPrint()}")
  19. print(f"{varBind[0].prettyPrint()} = {varBind[1].prettyPrint()}")
  20. print("-----------------------------")
  21. asyncio.run(run())
复制代码
示例运行截图如下
2.jpeg

5.2.2 next操作(获取下一个OID的值)
  1. import asyncio
  2. from pysnmp.hlapi.v3arch.asyncio import *
  3. async def run():
  4.     errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
  5.         SnmpEngine(),
  6.         CommunityData('readtest', mpModel=1),
  7.         await UdpTransportTarget.create(('10.3.3.3', 161)),
  8.         ContextData(),
  9.         ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysContact', 0)),      #会返回sysContact的下一个OID值
  10.         ObjectType(ObjectIdentity('IF-MIB', 'ifName', 2)),          #会返回ifName.2的下一个OID值
  11.         lexicographicMode=False
  12.     )
  13.     for varBind in varBinds:
  14.         print("-----------------------------")
  15.         print(varBind[1].prettyPrint())
  16.         print("-----------------------------")
  17.    
  18. asyncio.run(run())
复制代码
next操作会获取下一个值,如下图所示
3.jpeg

4.jpeg

示例运行截图如下
5.jpeg

脚本查询sysContact,实际返回值是sysName的值,查询ifName.2,实际返回值是ifName.3的值。
5.2.3 bulk操作(批量获取多个OID的值)
  1. import asyncio
  2. from pysnmp.hlapi.v3arch.asyncio import *
  3. async def run():
  4.     errorIndication, errorStatus, errorIndex, varBinds = await bulk_cmd(
  5.         SnmpEngine(),
  6.         CommunityData('readtest', mpModel=1),
  7.         await UdpTransportTarget.create(('10.3.3.3', 161)),
  8.         ContextData(),
  9.         0, 100,                                #循环查询100次。
  10.         ObjectType(ObjectIdentity('IF-MIB', 'ifName'))
  11.     )
  12.     for varBind in varBinds:
  13.         print(varBind)
  14.    
  15. asyncio.run(run())
复制代码
示例运行结果如下
6.jpeg

因为是循环取值,结果显示范围远远超出了ifName的边界。
六、脚本介绍

6.1 功能概述

本次练习的脚本是一个基于SNMP v2c的模拟批量设备巡检工具,主要功能包括:

  • 设备IP管理:从devices.txt文件读取设备IP列表,支持空行、注释行和重复IP的自动处理
  • SNMP查询:使用异步方式批量查询设备的接口信息,包括接口索引、描述、名称、管理状态和运行状态
  • 结果处理:将查询结果保存到CSV文件,并生成详细的日志文件
  • 错误处理:对查询过程中的错误进行捕获和记录
6.2 脚本内容


  • 导入模块
  • 构建日志配置函数,IP文件读取函数,SNMP查询配置,配置格式化函数,查询函数,结果保存函数
  • 主函数调用构建的功能函数,脚本入口。
  1. import asyncio
  2. import csv
  3. import logging
  4. import time
  5. from datetime import datetime
  6. from pysnmp.hlapi.v3arch.asyncio import *
  7. # 配置日志函数
  8. def setup_logger(log_file):
  9.     """设置日志配置"""
  10.     logger = logging.getLogger(log_file)
  11.     logger.setLevel(logging.INFO)
  12.    
  13.     # 清除之前的处理器
  14.     for handler in logger.handlers[:]:
  15.         logger.removeHandler(handler)
  16.    
  17.     # 创建文件处理器
  18.     file_handler = logging.FileHandler(log_file, encoding='utf-8')
  19.     file_handler.setLevel(logging.INFO)
  20.     file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
  21.     logger.addHandler(file_handler)
  22.    
  23.     # 创建控制台处理器
  24.     stream_handler = logging.StreamHandler()
  25.     stream_handler.setLevel(logging.INFO)
  26.     stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
  27.     logger.addHandler(stream_handler)
  28.    
  29.     return logger
  30. # 读取设备IP列表
  31. def read_device_ips(file_path, logger=None):
  32.     ips = []
  33.     try:
  34.         with open(file_path, 'r', encoding='utf-8') as f:
  35.             for line in f:
  36.                 stripped_line = line.strip()
  37.                 if not stripped_line or stripped_line.startswith("#"):
  38.                     continue    #跳过空行和注释行
  39.                 ips.append(stripped_line)
  40.         
  41.         set_ip = set(ips)
  42.         if len(set_ip) == len(ips):
  43.             message = f"读取到的IP地址列表,共{len(ips)}个IP地址,没有重复的IP地址"
  44.             print(message)
  45.             if logger:
  46.                 logger.info(message)
  47.         else:
  48.             count_duplicates = len(ips) - len(set_ip)
  49.             message = f"读取到的IP地址列表,共{len(ips)}个IP地址,有{count_duplicates}个重复的IP地址,已自动去重"
  50.             print(message)
  51.             if logger:
  52.                 logger.info(message)
  53.             ips = list(set_ip)
  54.             
  55.         message = f"成功读取{len(ips)}个有效设备IP"
  56.         print(message)
  57.         if logger:
  58.             logger.info(message)
  59.         return ips
  60.     except Exception as e:
  61.         message = f"读取设备IP文件失败: {e}"
  62.         print(message)
  63.         if logger:
  64.             logger.error(message)
  65.         return []
  66. # 定义SNMP查询配置
  67. PORT_CONFIG = [
  68.     ("IF-MIB", "ifIndex","端口索引"),
  69.     ("IF-MIB", "ifDescr","端口描述"),
  70.     ("IF-MIB", "ifName","端口名称"),
  71.     ("IF-MIB", "ifAdminStatus","物理状态"),
  72.     ("IF-MIB", "ifOperStatus","协商状态")
  73. ]
  74. #构建OID路径,并返回格式化的MIB列表、 表格映射、 起始OID路径
  75. def format_config(config=PORT_CONFIG):
  76.     format_mib = []
  77.     format_table = {}
  78.     start_name = ""
  79.     for item in config:
  80.         if len(item) == 3:
  81.             format_mib.append(ObjectType(ObjectIdentity(item[0], item[1])))
  82.             format_table[item[1]] = item[2] # 端口描述
  83.             start_name = f"{item[0]}::{item[1]}"
  84.         else:
  85.             format_mib.append(ObjectType(ObjectIdentity(item[0], item[1])))
  86.             format_table[item[1]] = item[1]
  87.             start_name = f"{item[0]}::{item[1]}"
  88.         print(f"format_table is [{format_table}]")
  89.         print(f"start_name is {start_name}")
  90.     print(f"format_mib is {format_mib}")
  91.     return format_mib, format_table, start_name
  92. async def get_query(ip, community, config=PORT_CONFIG, logger=None):
  93.     """
  94.     使用next_cmd自动遍历所有接口的信息,直到没有更多数据为止。
  95.     通过分析返回的OID,判断是否继续查询下一个接口。
  96.     """
  97.     try:
  98.         snmpEngine = SnmpEngine()
  99.         transport = await UdpTransportTarget.create((ip, 161))
  100.         
  101.         # 存储结果的字典
  102.         interface_details = {}
  103.         hostname = ""
  104.         message = "开始遍历所有接口..."
  105.         print(message)
  106.         if logger:
  107.             logger.info(f"{ip} - {message}")
  108.         
  109.         # 首先查询sysName
  110.         errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
  111.             snmpEngine,
  112.             CommunityData(community, mpModel=1),
  113.             transport,
  114.             ContextData(),
  115.             ObjectType(ObjectIdentity("SNMPv2-MIB", "sysName")),
  116.             lexicographicMode=False
  117.         )
  118.         
  119.         # 处理sysName
  120.         if not errorIndication and not errorStatus and varBinds:
  121.             for varBind in varBinds:
  122.                 try:
  123.                     if isinstance(varBind, ObjectType):
  124.                         oid = varBind[0]
  125.                         val = varBind[1]
  126.                     elif isinstance(varBind, tuple) and len(varBind) >= 2:
  127.                         oid, val = varBind[0], varBind[1]
  128.                     else:
  129.                         continue
  130.                     
  131.                     oid_str = oid.prettyPrint()
  132.                     val_str = val.prettyPrint()
  133.                     
  134.                     if oid_str == 'SNMPv2-MIB::sysName.0':
  135.                         hostname = val_str
  136.                         message = f"系统名称: {hostname}"
  137.                         print(message)
  138.                         if logger:
  139.                             logger.info(f"{ip} - {message}")
  140.                 except Exception as e:
  141.                     message = f"处理sysName异常: {e}"
  142.                     print(message)
  143.                     if logger:
  144.                         logger.error(f"{ip} - {message}")
  145.                     continue
  146.         
  147.         # 遍历config中的每个OID进行查询
  148.         for item in config:
  149.             mib, oid_name, desc = item
  150.             message = f"\n开始查询 {mib}::{oid_name}..."
  151.             print(message)
  152.             if logger:
  153.                 logger.info(f"{ip} - 开始查询 {mib}::{oid_name}...")
  154.             
  155.             # 统计当前OID查询到的条目数
  156.             item_count = 0
  157.             non_expected_oid = None
  158.             
  159.             # 执行初始的next_cmd查询
  160.             errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
  161.                 snmpEngine,
  162.                 CommunityData(community, mpModel=1),
  163.                 transport,
  164.                 ContextData(),
  165.                 ObjectType(ObjectIdentity(mib, oid_name)),
  166.                 lexicographicMode=False
  167.             )
  168.             
  169.             while not errorIndication and not errorStatus and varBinds:
  170.                 # 处理当前批次的数据
  171.                 last_oid = None
  172.                
  173.                 for varBind in varBinds:
  174.                     try:
  175.                         # 解包varBind
  176.                         if isinstance(varBind, ObjectType):
  177.                             oid = varBind[0]
  178.                             val = varBind[1]
  179.                         elif isinstance(varBind, tuple) and len(varBind) >= 2:
  180.                             oid, val = varBind[0], varBind[1]
  181.                         else:
  182.                             continue
  183.                         
  184.                         oid_str = oid.prettyPrint()
  185.                         val_str = val.prettyPrint()
  186.                         
  187.                         # 检查是否是当前OID的延续
  188.                         if oid_str.startswith(f"{mib}::{oid_name}."):
  189.                             # 提取索引
  190.                             index = str(oid[-1])  # 直接从OID对象获取最后一个数字
  191.                            
  192.                             # 初始化接口信息
  193.                             if index not in interface_details:
  194.                                 interface_details[index] = {}
  195.                                 for config_item in config:
  196.                                     interface_details[index][config_item[1]] = None
  197.                            
  198.                             # 更新接口信息
  199.                             interface_details[index][oid_name] = val_str
  200.                             last_oid = oid
  201.                             item_count += 1
  202.                         else:
  203.                             non_expected_oid = oid_str
  204.                             break
  205.                     except Exception as e:
  206.                         message = f"处理数据异常: {e}"
  207.                         print(message)
  208.                         if logger:
  209.                             logger.error(f"{ip} - {message}")
  210.                         continue
  211.                
  212.                 # 继续下一次查询
  213.                 if non_expected_oid:
  214.                     break
  215.                 else:
  216.                     errorIndication, errorStatus, errorIndex, varBinds = await next_cmd(
  217.                         snmpEngine,
  218.                         CommunityData(community, mpModel=1),
  219.                         transport,
  220.                         ContextData(),
  221.                         ObjectType(ObjectIdentity(mib, oid_name, index)),  # 从上次的最后一个值继续
  222.                         lexicographicMode=False  # 只遍历当前OID子树
  223.                     )
  224.             
  225.             # 输出当前OID查询结果
  226.             message = f"获取到: {mib}::{oid_name} 条目共计 {item_count} 个。"
  227.             print(message)
  228.             if logger:
  229.                 logger.info(f"{ip} - {message}")
  230.             if non_expected_oid:
  231.                 message = f"非预期OID: {non_expected_oid}"
  232.                 print(message)
  233.                 if logger:
  234.                     logger.info(f"{ip} - {message}")
  235.         
  236.         # 输出总体结果
  237.         interface_count = len(interface_details)
  238.         config_count = len(config)
  239.         message = f"\n共找到 {interface_count} 个接口,查询了它们 {config_count} 项信息。"
  240.         print(message)
  241.         if logger:
  242.             logger.info(f"{ip} - {message}")
  243.         
  244.         # 按索引排序输出前10个接口示例
  245.         print("前10个接口示例:")
  246.         if logger:
  247.             logger.info(f"{ip} - 前10个接口示例:")
  248.         for idx, details in sorted(interface_details.items(), key=lambda x: int(x[0]))[:10]:
  249.             interface_message = f"  ifIndex.{idx}: {details.get('ifName', 'N/A')}"
  250.             print(interface_message)
  251.             if logger:
  252.                 logger.info(f"{ip} - {interface_message}")
  253.         
  254.         snmpEngine.close_dispatcher()
  255.         return interface_details, hostname
  256.         
  257.     except Exception as e:
  258.         print(f"异常: {e}")
  259.         return {}, ""
  260. # 保存查询结果到CSV文件
  261. def save_results(interface_details, hostname, ip, file_path, config=PORT_CONFIG):
  262.     try:
  263.         
  264.         with open(file_path, 'a', newline='', encoding='utf-8-sig') as f:
  265.             writer = csv.writer(f)
  266.             print(f"正在保存结果到 {file_path}...")
  267.             
  268.             # 写入表头
  269.             writer.writerow(["=" * 80])
  270.             line = f"主机名={hostname};设备IP={ip};检查时间:{time.strftime('%Y-%m-%d %H:%M:%S')}"
  271.             writer.writerow([line])
  272.             writer.writerow(["=" * 80])
  273.             
  274.             # 动态生成表头
  275.             headers = [item[2] for item in config]
  276.             writer.writerow(headers)
  277.             
  278.             # 写入数据
  279.             for interface in interface_details.values():
  280.                 row = []
  281.                 for item in config:
  282.                     row.append(interface.get(item[1], ''))
  283.                 writer.writerow(row)
  284.         print(f"成功保存查询结果到{file_path}")
  285.     except Exception as e:
  286.         print(f"保存查询结果失败: {e}")
  287. async def main():
  288.     # 配置参数
  289.     device_file = 'devices.txt'  # 设备IP文件
  290.     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  291.     port_result_file = f'snmp_port_results_{timestamp}.csv'  # 端口结果文件
  292.     port_log_file = f'snmp_port_results_log_{timestamp}.log'  # 端口日志文件
  293.     community = 'readtest'  # SNMP团体名
  294.    
  295.     # 设置日志
  296.     port_logger = setup_logger(port_log_file)
  297.    
  298.     # 读取设备IP
  299.     device_ips = read_device_ips(device_file, logger=port_logger)
  300.     if not device_ips:
  301.         port_logger.error("没有设备IP可查询")
  302.         return
  303.    
  304.     # 批量查询设备信息
  305.     for ip in device_ips:
  306.         port_logger.info(f"开始查询设备: {ip}")
  307.         
  308.         # 获取端口信息和主机名
  309.         interface_details, hostname = await get_query(ip, community, config=PORT_CONFIG, logger=port_logger)
  310.         
  311.         # 保存端口结果
  312.         if interface_details:
  313.             save_results(interface_details, hostname, ip, port_result_file, config=PORT_CONFIG)
  314.             port_logger.info(f"{ip} - 成功保存端口结果到 {port_result_file}")
  315.         
  316.         # 避免查询过快,添加短暂延迟
  317.         await asyncio.sleep(1)
  318.     port_logger.info("批量查询完成")
  319. if __name__ == "__main__":
  320.     asyncio.run(main())
复制代码
6.3 脚本输出结果保存

7.jpeg

6.4 脚本运行日志保存

8.jpeg

6.5 脚本局限性


  • 仅支持SNMP v2c:没有对SNMP v3进行适配,安全性较差
  • 功能有限:目前只支持接口状态的查询,无法获取CPU、内存、温度等系统资源信息
  • 可扩展性不足:添加新的查询项需要修改代码,不够灵活
  • 错误处理简单:对于SNMP查询失败的情况,处理方式较为简单
  • 配置固定:SNMP团体名、查询参数等配置硬编码在脚本中,不够灵活
6.6 后续练习中可能尝试的功能


  • 添加系统资源监控:增加对CPU使用率、内存使用情况、设备温度等信息的查询
  • 支持SNMP v3:添加对SNMP v3的支持,提高安全性
  • 配置文件化:将配置参数移到配置文件中,方便用户修改
  • 结果可视化:添加简单的结果可视化功能,如生成状态图表,或日志格式化提升可读性
  • 告警机制:添加异常状态的告警功能,如接口down时发送通知
七、结语

SNMP作为一种成熟的网络管理协议,在网络设备监控和管理方面发挥着重要作用。本文介绍的批量巡检脚本利用pysnmp库实现了对网络设备接口状态的自动采集和分析,为网络管理员提供了一种高效、便捷的设备管理工具。
虽然该脚本目前存在一些局限性,但其基本功能已经能够满足日常网络巡检的需求。通过进一步的扩展和优化,它可以成为网络管理工作中的得力助手。
在实际应用中,网络管理员可以根据具体需求对脚本进行定制和扩展,例如添加更多的监控指标、实现更复杂的告警机制等,从而构建一个更加完善的网络监控系统。
SNMP协议虽然有其局限性,但在网络设备管理领域仍然是一种不可替代的工具。结合其他管理协议和工具,如NETconf、REST API等,可以构建一个更加全面、高效的网络管理解决方案。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册