📄 snmpy.py
字号:
def _setattr_fun(self, attr, val): """Backend for __setattr__ method """ if attr in ['addr', 'oid', 'name', 'type', 'specific_type', 'uptime', 'session', 'variables']: raise SnmpError('Read-only attribute: %s' % attr) self.__dict__[attr] = val def __repr__(self): """Try to look similar to original implementation """ return self._wrapper_fun(self._repr_fun) def _repr_fun(self): """Backend for __repr__ method """ return '<SNMP_trap %s, %s, type=%d, stype=%d, uptime=%d>' % \ (self.addr, self.name, self.type, \ self.specific_type, self.uptime)# A repository of pending async sessionspendingSessions = {}class SNMP_Session(CompatBase): """SNMP session object """ def __init__(self, hostname, community='public', remote_port=0, local_port=0, retries=0, timeout=0, callback=None): """Initialize transport layer, store passed args """ return self._wrapper_fun(self._init_fun, hostname, community, remote_port, local_port, retries, timeout, callback) def _init_fun(self, hostname, community, remote_port, local_port, retries, timeout, callback): """Backend for __init__ method """ # Set defaults if hostname is None: hostname = '0.0.0.0' if community is None: community = 'public' if remote_port == 0: remote_port = 161 if retries == 0: retries = 3 if timeout == 0: timeout = 3 if callback is not None and not callable(callback): raise SnmpError('Non-callable callback function') if local_port == 0: # Create SNMP manager object self.mgr = role.manager((hostname, remote_port)) self.mgr.open() self.mgr.retries = retries self.mgr.timeout = timeout self.agt = None else: if callback is None: raise SnmpError('Agent role requires callback function') # Create SNMP agent object self.agt = role.agent((callback, None), [(hostname, local_port)]) self.agt.open() self.mgr = None # Expose public attributes self.addr = hostname self.community = community self.timeout = timeout self.remote_port = remote_port self.local_port = local_port self.retries = retries self.callback = callback # A repository of pending SNMP reqs (for async use) self.pendingReqs = {} def _request(self, oid, req, val=[v1.ObjectSyntax()]): """Performs a get request """ if self.mgr is None: raise SnmpError('Agent role is active') # Convert to list if not done yet if type(oid) != ListType: oid = [ oid ] for idx in range(len(oid)): if isinstance(oid[idx], snmpmibnode): oid[idx] = oid[idx].oid # Add community name req['community'].set(self.community) # Build and set Object ID & variables bindings req['pdu'].values()[0]['variable_bindings'].extend( map(lambda x, y, self=self: v1.VarBind( name=v1.ObjectName(x), value=y), oid, val)) # Encode SNMP request message, send it to SNMP agent if self.callback is None: # ...and receive a response (answer, src) = self.mgr.send_and_receive(req.encode()) else: # Register pending request self.pendingReqs[req] = time() + self.timeout # Register pending session if not pendingSessions.has_key(self): pendingSessions[self] = 1 # Send request message return self.mgr.send(req.encode()) # Decode SNMP response rsp = v1.GetResponse(); rsp.decode(answer) # Make sure response matches request if not req.match(rsp): raise GetError('Unmatched response: %s vs %s' % (req, rsp)) # Fetch Object ID's and associated values oids = map(lambda x: x['name'].get(), \ rsp['pdu'].values()[0]['variable_bindings']) vals = map(lambda x: x['value'].values()[0].values()[0], \ rsp['pdu'].values()[0]['variable_bindings']) # Check for remote SNMP agent failure if rsp['pdu'].values()[0]['error_status']: raise SnmpError(str(rsp['pdu'].values()[0]['error_status']) + \ ' at ' + str(oids[rsp['pdu'].values()[0] ['error_index'].get()-1])) miboids = [] for oid, val in map(None, oids, vals): miboid = snmpmibnode(self, oid, None) miboid.value = val miboids.append(miboid) if len(miboids) == 1: return miboids[0] else: return miboids def get(self, oid): """Perform SNMP GET request """ return self._wrapper_fun(self._get_fun, oid) def _get_fun(self, oid): """Backend for get method """ return self._request(oid, v1.GetRequest()) def getnext(self, oid): """Perform SNMP GETNEXT request """ return self._wrapper_fun(self._getnext_fun, oid) def _getnext_fun(self, oid): """Backend for getnext method """ return self._request(oid, v1.GetNextRequest()) # A strange allias thepowerfulgetnext = getnext def set(self, oid, val, valType='string'): """Perform SNMP SET request """ return self._wrapper_fun(self._set_fun, oid, val, valType) def _set_fun(self, oid, val, valType): """Backend for set method """ valObj = None for syntax in [ v1.SimpleSyntax, v1.ApplicationSyntax]: for (objName, objType) in map(None, syntax.choiceNames, syntax.choiceComponents): if valType == objName: valObj = v1.ObjectSyntax(syntax=syntax(value=objType(val))) break if valObj is None: raise SnmpError('Unknown ASN.1 value type name: %s' % valType) return self._request(oid, v1.SetRequest(), [ valObj ]) def mibnode(self, name, extension=None): """Return a MIBNODE object """ return snmpmibnode(self, name, extension) def fileno(self): """Return transport fileno """ return self._wrapper_fun(self._fileno_fun) def _fileno_fun(self): """Backend for fileno method """ if self.mgr is not None: return self.mgr.get_socket().fileno() if self.agt is not None: return self.agt.get_socket().fileno() raise SnmpError('Transport level not initialized') def read(self): """Read SNMP response from transport level, decode it and invoke the callback routine """ return self._wrapper_fun(self._read_fun) def _read_fun(self): """Backend for read method """ if self.mgr is not None: (answer, src) = self.mgr.read() elif self.agt is not None: (answer, src) = self.agt.read() rsp = v1.Trap(); rsp.decode(answer) return self.callback( snmptrap(self, rsp['pdu']['trap']\ ['agent_addr']['internet'].get(), rsp['pdu']['trap']['enterprise'].get(), rsp['pdu']['trap']['generic_trap'].get(), rsp['pdu']['trap']['specific_trap'].get(), rsp['pdu']['trap']['time_stamp'].get(), rsp['pdu']['trap']['variable_bindings'])) else: raise SnmpError('Transport level not initialized') rsp = v1.GetResponse(); rsp.decode(answer) # Make sure response matches any of pending requests for req in self.pendingReqs.keys(): if req.match(rsp): del self.pendingReqs[req] if len(self.pendingReqs) == 0: del pendingSessions[self] break else: raise SnmpError('Unmatched response: %s' % rsp) # Fetch Object ID's and associated values oids = map(lambda x: x['name'].get(), \ rsp['pdu'].values()[0]['variable_bindings']) vals = map(lambda x: x['value'].values()[0].values()[0], \ rsp['pdu'].values()[0]['variable_bindings']) # Check for remote SNMP agent failure if rsp['pdu'].values()[0]['error_status']: raise SnmpError(str(rsp['pdu'].values()[0]['error_status']) + \ ' at ' + str(oids[rsp['pdu'].values()[0] ['error_index'].get()-1])) miboids = [] for oid, val in map(None, oids, vals): miboid = snmpmibnode(self, oid, None) miboid.value = val miboids.append(miboid) if len(miboids) == 1: return self.callback(miboids[0]) else: return self.callback(miboids) # Module functionsdef session(hostname, community, remote_port, local_port, retries, timeout, callback=None): """Return SNMP session object """ return SNMP_Session(hostname, community, remote_port, local_port, retries, timeout, callback) def checkoid(oid): """Return true if given OID is syntaxically valid XXX """ try: v1.ObjectIdentifier(oid) except: return 0 return 1def issuboid(miboid1, miboid2): """Compare two MIB oids, return 1 if equal """ if isinstance(miboid1, snmpmibnode): miboid1 = miboid1.oid if isinstance(miboid2, snmpmibnode): miboid2 = miboid2.oid try: if v1.ObjectIdentifier(miboid1) == v1.ObjectIdentifier(miboid2): return 1 else: return 0 except pysnmp.error.PySnmpError, why: raise SnmpError(why)def merge(filename): """Load and parse MIB file """ raise NotImplementedError('merge: function not implemented')def addmibdir(): """Add a path to MIB search path """ raise NotImplementedError('addmibdir: function not implemented')def timeout(): """Return the estimated time till timeout """ raise NotImplementedError('timeout: function not implemented')def handletimeout(): """Expire long pending SNMP requests """ now = time() # Walk over pending sessions & reqs for ses in pendingSessions.keys(): for req in ses.pendingReqs.keys(): if ses.pendingReqs[req] < now: del ses.pendingReqs[req] if len(ses.pendingReqs) == 0: del pendingSessions[ses] raise SnmpTimeout(req)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -