⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 snmpy.py

📁 实现snmp协议的agent 和manager
💻 PY
📖 第 1 页 / 共 2 页
字号:
    def _setattr_fun(self, attr, val):        """Backend for __setattr__ method        """        if attr in ['addr', 'oid', 'name', 'type', 'specific_type',                    'uptime', 'session', 'variables']:            raise SnmpError('Read-only attribute: %s' % attr)        self.__dict__[attr] = val    def __repr__(self):        """Try to look similar to original implementation        """        return self._wrapper_fun(self._repr_fun)            def _repr_fun(self):        """Backend for __repr__ method        """        return '<SNMP_trap %s, %s, type=%d, stype=%d, uptime=%d>' % \               (self.addr, self.name, self.type, \                self.specific_type, self.uptime)# A repository of pending async sessionspendingSessions = {}class SNMP_Session(CompatBase):    """SNMP session object    """    def __init__(self, hostname, community='public', remote_port=0,                 local_port=0, retries=0, timeout=0, callback=None):        """Initialize transport layer, store passed args        """        return self._wrapper_fun(self._init_fun, hostname, community,                                 remote_port, local_port, retries, timeout,                                 callback)    def _init_fun(self, hostname, community, remote_port, local_port,                  retries, timeout, callback):        """Backend for __init__ method        """        # Set defaults        if hostname is None: hostname = '0.0.0.0'        if community is None: community = 'public'        if remote_port == 0: remote_port = 161        if retries == 0: retries = 3        if timeout == 0: timeout = 3        if callback is not None and not callable(callback):            raise SnmpError('Non-callable callback function')         if local_port == 0:            # Create SNMP manager object            self.mgr = role.manager((hostname, remote_port))            self.mgr.open()            self.mgr.retries = retries            self.mgr.timeout = timeout            self.agt = None        else:            if callback is None:                raise SnmpError('Agent role requires callback function')            # Create SNMP agent object            self.agt = role.agent((callback, None), [(hostname, local_port)])            self.agt.open()            self.mgr = None        # Expose public attributes        self.addr = hostname        self.community = community        self.timeout = timeout        self.remote_port = remote_port        self.local_port = local_port        self.retries = retries        self.callback = callback        # A repository of pending SNMP reqs (for async use)        self.pendingReqs = {}    def _request(self, oid, req, val=[v1.ObjectSyntax()]):        """Performs a get request        """        if self.mgr is None:            raise SnmpError('Agent role is active')        # Convert to list if not done yet        if type(oid) != ListType:            oid = [ oid ]               for idx in range(len(oid)):            if isinstance(oid[idx], snmpmibnode):                oid[idx] = oid[idx].oid         # Add community name        req['community'].set(self.community)        # Build and set Object ID & variables bindings        req['pdu'].values()[0]['variable_bindings'].extend(            map(lambda x, y, self=self: v1.VarBind(            name=v1.ObjectName(x), value=y), oid, val))        # Encode SNMP request message, send it to SNMP agent                if self.callback is None:            # ...and receive a response            (answer, src) = self.mgr.send_and_receive(req.encode())        else:            # Register pending request            self.pendingReqs[req] = time() + self.timeout            # Register pending session            if not pendingSessions.has_key(self):                pendingSessions[self] = 1            # Send request message            return self.mgr.send(req.encode())        # Decode SNMP response        rsp = v1.GetResponse(); rsp.decode(answer)        # Make sure response matches request        if not req.match(rsp):            raise GetError('Unmatched response: %s vs %s' % (req, rsp))        # Fetch Object ID's and associated values        oids = map(lambda x: x['name'].get(), \                   rsp['pdu'].values()[0]['variable_bindings'])        vals = map(lambda x: x['value'].values()[0].values()[0], \                   rsp['pdu'].values()[0]['variable_bindings'])        # Check for remote SNMP agent failure        if rsp['pdu'].values()[0]['error_status']:            raise SnmpError(str(rsp['pdu'].values()[0]['error_status']) + \                            ' at ' + str(oids[rsp['pdu'].values()[0]                                              ['error_index'].get()-1]))        miboids = []        for oid, val in map(None, oids, vals):            miboid = snmpmibnode(self, oid, None)            miboid.value = val            miboids.append(miboid)        if len(miboids) == 1:            return miboids[0]        else:                    return miboids    def get(self, oid):        """Perform SNMP GET request        """        return self._wrapper_fun(self._get_fun, oid)    def _get_fun(self, oid):        """Backend for get method        """        return self._request(oid, v1.GetRequest())    def getnext(self, oid):        """Perform SNMP GETNEXT request        """        return self._wrapper_fun(self._getnext_fun, oid)    def _getnext_fun(self, oid):        """Backend for getnext method        """        return self._request(oid, v1.GetNextRequest())    # A strange allias    thepowerfulgetnext = getnext    def set(self, oid, val, valType='string'):        """Perform SNMP SET request        """        return self._wrapper_fun(self._set_fun, oid, val, valType)    def _set_fun(self, oid, val, valType):        """Backend for set method        """        valObj = None        for syntax in [ v1.SimpleSyntax, v1.ApplicationSyntax]:            for (objName, objType) in map(None, syntax.choiceNames,                                          syntax.choiceComponents):                if valType == objName:                    valObj = v1.ObjectSyntax(syntax=syntax(value=objType(val)))                    break                        if valObj is None:            raise SnmpError('Unknown ASN.1 value type name: %s' % valType)                return self._request(oid, v1.SetRequest(), [ valObj ])    def mibnode(self, name, extension=None):        """Return a MIBNODE object        """        return snmpmibnode(self, name, extension)    def fileno(self):        """Return transport fileno        """        return self._wrapper_fun(self._fileno_fun)    def _fileno_fun(self):        """Backend for fileno method        """        if self.mgr is not None:            return self.mgr.get_socket().fileno()        if self.agt is not None:            return self.agt.get_socket().fileno()        raise SnmpError('Transport level not initialized')          def read(self):        """Read SNMP response from transport level, decode it and           invoke the callback routine        """        return self._wrapper_fun(self._read_fun)    def _read_fun(self):        """Backend for read method        """        if self.mgr is not None:            (answer, src) = self.mgr.read()        elif self.agt is not None:            (answer, src) = self.agt.read()            rsp = v1.Trap(); rsp.decode(answer)            return self.callback(                snmptrap(self, rsp['pdu']['trap']\                         ['agent_addr']['internet'].get(),                         rsp['pdu']['trap']['enterprise'].get(),                         rsp['pdu']['trap']['generic_trap'].get(),                         rsp['pdu']['trap']['specific_trap'].get(),                         rsp['pdu']['trap']['time_stamp'].get(),                         rsp['pdu']['trap']['variable_bindings']))        else:            raise SnmpError('Transport level not initialized')        rsp = v1.GetResponse(); rsp.decode(answer)        # Make sure response matches any of pending requests        for req in self.pendingReqs.keys():            if req.match(rsp):                del self.pendingReqs[req]                if len(self.pendingReqs) == 0:                    del pendingSessions[self]                break        else:            raise SnmpError('Unmatched response: %s' % rsp)        # Fetch Object ID's and associated values        oids = map(lambda x: x['name'].get(), \                   rsp['pdu'].values()[0]['variable_bindings'])        vals = map(lambda x: x['value'].values()[0].values()[0], \                   rsp['pdu'].values()[0]['variable_bindings'])        # Check for remote SNMP agent failure        if rsp['pdu'].values()[0]['error_status']:            raise SnmpError(str(rsp['pdu'].values()[0]['error_status']) + \                            ' at ' + str(oids[rsp['pdu'].values()[0]                                              ['error_index'].get()-1]))        miboids = []        for oid, val in map(None, oids, vals):            miboid = snmpmibnode(self, oid, None)            miboid.value = val            miboids.append(miboid)        if len(miboids) == 1:            return self.callback(miboids[0])        else:                    return self.callback(miboids)        # Module functionsdef session(hostname, community, remote_port, local_port, retries,            timeout, callback=None):    """Return SNMP session object    """    return SNMP_Session(hostname, community, remote_port, local_port,                        retries, timeout, callback)    def checkoid(oid):    """Return true if given OID is syntaxically valid XXX    """    try:        v1.ObjectIdentifier(oid)            except:        return 0    return 1def issuboid(miboid1, miboid2):    """Compare two MIB oids, return 1 if equal    """    if isinstance(miboid1, snmpmibnode):        miboid1 = miboid1.oid    if isinstance(miboid2, snmpmibnode):        miboid2 = miboid2.oid    try:        if v1.ObjectIdentifier(miboid1) == v1.ObjectIdentifier(miboid2):            return 1        else:            return 0    except pysnmp.error.PySnmpError, why:        raise SnmpError(why)def merge(filename):    """Load and parse MIB file    """    raise NotImplementedError('merge: function not implemented')def addmibdir():    """Add a path to MIB search path    """    raise NotImplementedError('addmibdir: function not implemented')def timeout():    """Return the estimated time till timeout    """    raise NotImplementedError('timeout: function not implemented')def handletimeout():    """Expire long pending SNMP requests    """    now = time()    # Walk over pending sessions & reqs    for ses in pendingSessions.keys():        for req in ses.pendingReqs.keys():            if ses.pendingReqs[req] < now:                del ses.pendingReqs[req]                if len(ses.pendingReqs) == 0:                    del pendingSessions[ses]                raise SnmpTimeout(req)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -