⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 snmp.py

📁 Network Administration Visualized 网络管理可视化源码
💻 PY
字号:
# -*- coding: ISO8859-1 -*-## Copyright 2003 Norwegian University of Science and Technology# Copyright 2006 UNINETT AS## This file is part of Network Administration Visualized (NAV)## NAV is free software; you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation; either version 2 of the License, or# (at your option) any later version.## NAV is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with NAV; if not, write to the Free Software# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA## Authors: Sigurd Gartmann#          Morten Vold <morten.vold@uninett.no>#"""This module is a higher level interface to SNMP query functionalityfor NAV, as pysnmp2 is quite low-level and tedious to work with.The module uses the version 2 branch of pysnmp."""import reimport exceptionsimport pysnmp # Version 2from pysnmp import role, v1, v2c, asn1# Ugly hack to escape inconsistencies in pysnmp2try:    v1.RESPONSEexcept:    v1.RESPONSE = v1.GETRESPONSE    class TimeOutException(exceptions.Exception):    def __init__(self, err_msg=None):        exceptions.Exception.__init__(self)        if err_msg is not None:            self.err_msg = str(err_msg)        else:            self.err_msg = ''    def __str__(self):        return self.err_msg    def __repr__(self):        return self.__class__.__name__ + '(' + self.err_msg + ')'    class NameResolverException(exceptions.Exception):    def __init__(self, err_msg=None):        exceptions.Exception.__init__(self)        if err_msg is not None:            self.err_msg = str(err_msg)        else:            self.err_msg = ''    def __str__(self):        return self.err_msg    def __repr__(self):        return self.__class__.__name__ + '(' + self.err_msg + ')'    class Snmp(object):    """Simple class that provides snmpget, snmpwalk and snmpjog(tm)    functionality.  Snmpget returns the result as one string.    Snmpwalk returns the subtree as an array containing OID,    value-pairs.  Snmpjog returns the result as snmpwalk does, the    difference is that snmpjog chops off (each line) the OID-prefix    used in the query.    """    def __init__(self, host, community="public", version="1", port=161,                 retries=3, timeout=1, reporttype=None):        """        Makes a new Snmp-object.        host: hostname        community: community (password), defaults to "public"        port: port, defaults to "161"        """                self.host = host        self.community = community        self.version = str(version)        self.port = int(port)        self.retries = retries        self.timeout = timeout        self.reporttype = reporttype        self.handle = role.manager((host,port))    def get(self,query = "1.3.6.1.2.1.1.1.0"):        """        Does snmpget query on the host.        query: OID to use in the query        returns the result as a string.        """        if not query.startswith("."):            query = "." + query        # Choose protocol version specific module        try:            snmp = eval('v' + self.version)        except (NameError, AttributeError):            raise 'Unsupported SNMP protocol version: %s' % (self.version)        objectid = asn1.OBJECTID()        oid = objectid.encode(query)        # Create SNMP GET request        req = snmp.GETREQUEST()        req['community'] = self.community        req['encoded_oids'] = [oid]        # Create SNMP response message framework        rsp = snmp.RESPONSE()        # Encode SNMP request message and try to send it to SNMP agent and        # receive a response        try:            (answer, src) = self.handle.send_and_receive(req.encode())        except role.NoResponse, e:            #timeout            raise TimeOutException(e)        except role.NetworkError, n:            #dns            raise NameResolverException(n)        # Decode raw response/answer        rsp.decode(answer)        # Fetch Object ID's and associated values        oids = [objectid.decode(o)[0] for o in rsp['encoded_oids']]        values = [asn1.decode(v)[0] for v in rsp['encoded_vals']]        # Check for remote SNMP agent failure        if rsp['error_status']:            raise "Snmp error %s at %s (%s, %s)" % \                  (rsp['error_status'],                   rsp['error_index'],                   oids[rsp['error_index']-1],                   values[rsp['error_index']-1])        # Since we're only asking for one value, only return the first        # result, decoded to its correct Python data type        return values[0]()    def walk(self,query = "1.3.6.1.2.1.1.1.0"):        """        Does snmpwalk on the host.        query: OID to use in the query        returns an array containing key-value-pairs, where the        returned OID is the key.        """        if not query.startswith("."):            query = "." + query                try:            snmp = eval('v' + self.version)        except (NameError, AttributeError):            raise 'Unsupported SNMP protocol version: %s' % (self.version)        result = []        root_oid = asn1.OBJECTID()        root_oid.encode(query)        # Create SNMP GET/GETNEXT request        req = snmp.GETREQUEST()        nextReq = snmp.GETNEXTREQUEST()        for r in (req, nextReq):            r['community'] = self.community            r['encoded_oids'] = [root_oid.encode()]        # Create a response message framework        rsp = snmp.RESPONSE()        # Traverse agent MIB        while 1:            # Encode SNMP request message and try to send it to SNMP agent and            # receive a response            try:                (answer, src) = self.handle.send_and_receive(req.encode())            except role.NoResponse, e:                raise TimeOutException(e)            except role.NetworkError, n:                raise NameResolverException(n)            # Decode raw response/answer            rsp.decode(answer)            # Fetch Object ID's and associated values            oids = [asn1.OBJECTID().decode(o)[0] for o in rsp['encoded_oids']]            values = [asn1.decode(v)[0] for v in rsp['encoded_vals']]            # Check for remote SNMP agent failure            if rsp['error_status']:                # SNMP agent reports 'no such name' when walk is over                if rsp['error_status'] == 2:                    # Switch to using GETNEXT requests if GET request failed                    if not (req is nextReq):                        req = nextReq                        continue                    else:                        # If GETNEXT also failed, we return whatever we got                        return result                else:                    raise "Snmp error %s at %s (%s, %s)" % \                          (rsp['error_status'],                           rsp['error_index'],                           oids[rsp['error_index']-1],                           values[rsp['error_index']-1])            # If the current GETNEXT response came from outside the            # tree we are traversing, get the hell out of here, we're            # done walking the subtree.            if not root_oid.isaprefix(oids[0]):                return result            else:                result.append((oids[0], values[0]()))            # Update request ID            req['request_id'] += 1            # Switch over GETNEXT PDU for if not done            if not (req is nextReq):                req = nextReq            # Load the next request with the OID received in the last response            req['encoded_oids'] = rsp['encoded_oids']        def jog(self,query = "1.3.6.1.2.1.1.1.0"):        """Does a modified snmpwalk on the host. The query OID is        chopped off the returned OID for each line in the result.        query: OID to use in the query        returns an array containing key-value-pairs, where the key is        the returned OID minus the OID in the query, i.e query: 1.2.3,        snmpwalk returned oid:1.2.3.4.5, snmpjog returned key: 4.5        """        walked = self.walk(query)        result = []        if walked:            for oid,value in walked:                #found = re.search(query,oid)                key = re.sub('\.?' + query + '\.?','',oid)                result.append((key,value))                return result

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -