📄 snmpy.py
字号:
""" Compatibility interface to the snmpy package. The degree of compatibility is mostly limited by the lack of MIB parser in PySNMP. For more information on snmpy project, please, refer to http://www.sf.net/projects/snmpy/ Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for details. Initial copyrights for prototype code and API are as follows: Written by Anthony Baxter, arb@connect.com.au Copyright (C) 1994,1995,1996 Anthony Baxter. Modified extensively by Tim O'Malley, timo@bbn.com. The changes are Copyright (C) 1996 of BBN. Permission is hereby granted, free of charge, to any person obtaining a copy of this source file to use, copy, modify, merge, or publish it subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or in any new file that contains a substantial portion of this file. THE AUTHOR MAKES NO REPRESENTATIONS ABOUT THE SUITABILITY OF THE SOFTWARE FOR ANY PURPOSE. IT IS PROVIDED "AS IS" WITHOUT EXPRESS OR IMPLIED WARRANTY. THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT OF THIRD PARTY RIGHTS. IN NO EVENT SHALL THE AUTHOR BE LIABLE TO YOU OR ANY OTHER PARTY FOR ANY SPECIAL, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE, STRICT LIABILITY OR ANY OTHER ACTION ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE."""from exceptions import Exceptionfrom time import timefrom types import ListTypefrom pysnmp.proto import v1from pysnmp.mapping.udp import rolefrom pysnmp.asn1 import baseimport pysnmp.error, pysnmp.mapping.udp.error# Module exceptionsclass SnmpError(Exception): passclass SnmpTimeout(Exception): pass# Module constantsversion = 0.73SNMP_TRAP_PORT = 162class CompatBase: """Base class for compatibility classes """ def _wrapper_fun(self, fun, *args): """ """ try: return apply(fun, args) # Catch transport-speciic, timeout exceptions except pysnmp.mapping.udp.error.NoResponseError, why: raise SnmpTimeout(why) except pysnmp.mapping.udp.error.IdleTimeoutError, why: raise SnmpTimeout(why) # Render the rest of PySNMP exceptions to one except pysnmp.error.PySnmpError, why: raise SnmpError(why)class snmpmibnode(CompatBase): """A single MIB object (variable-binding) """ def __init__(self, parent, name, extension): """Class constructor: initialize object internals from passed OID """ self._wrapper_fun(self._init_fun, parent, name, extension) def _init_fun(self, parent, name, extension): """The backend method for class constructor """ oid = '' try: if name[0] != '.': oid = '.1.3.6.1.2.1' oid = '%s.%s' % (oid, name) if extension: oid = '%s.%s' % (oid, extension) except: raise SnmpError('Malformed OID name/extension: %s/%s' % (name, extension)) # Keep a real OID object around self._oid = v1.ObjectIdentifier(oid) self._value = None self._session = parent def __getattr__(self, attr): """Translate some attributes """ return self._wrapper_fun(self._getattr_fun, attr) def _getattr_fun(self, attr): """Backend for __getattr__ method """ if attr == 'oid': return self._oid.get() elif attr == 'name': return self.oid elif attr == 'index': return 0L elif attr == 'value': if self._value is None: return None else: self._value.get() elif attr == 'type': # More types to add? if self._value is None: return 0 else: if isinstance(self._value, v1.ObjectIdentifier): return 1 elif isinstance(self._value, v1.OctetString): return 2 elif isinstance(self._value, v1.Integer): return 3 elif isinstance(self._value, v1.NetworkAddress): return 4 elif isinstance(self._value, v1.IpAddress): return 5 elif isinstance(self._value, v1.Counter): return 6 elif isinstance(self._value, v1.Gauge): return 7 elif isinstance(self._value, v1.TimeTicks): return 8 elif isinstance(self._value, v1.Opaque): return 9 elif isinstance(self._value, v1.Null): return 10 elif isinstance(self._value, v1.Counter64): return 11 # XXX may be continued else: raise SnmpError('Unsupported value type: %s' % self._value) elif attr == 'session': return self._session elif attr == 'description': return '' elif attr == 'enums': raise SnmpError('Feature not implemented') raise AttributeError, attr def __setattr__(self, attr, val): """Set certain attributes """ return self._wrapper_fun(self._setattr_fun, attr, val) def _setattr_fun(self, attr, val): """Backend for __setattr__ method """ if attr in ['oid', 'name', 'index', 'type', 'description', 'enums']: raise SnmpError('Read-only attribute: %s' % attr) elif attr == 'value': if not isinstance(val, base.SimpleAsn1Object): raise SnmpError('Non ASN1 object given for value: %s' \ % str(val)) self._value = val self.__dict__[attr] = val def __repr__(self): """Try to look similar to original implementation """ return self._wrapper_fun(self._repr_fun) def _repr_fun(self): """Backend for __repr__ method """ return '<SNMP_mibnode %s, %u, %s>' % (self.name, self.type, self.value) # Mapping object API def __len__(self): """Returns number of sub-OIDs in SNMP table (can't be implemented w/o MIB parser) """ return 0 def __getitem__(self, key): """Support creating child OIDs by subscription """ return self._wrapper_fun(self._getitem_fun, key) def _getitem_fun(self, key): """Backend for __getitem__ method """ try: key = '%u' % key except: pass return snmpmibnode(self._session, self.oid, key) def __setitem__(self, key, value): """Support creating child OIDs by subscription """ return self._wrapper_fun(self._setitem_fun, key, value) def _setitem_fun(self, key, value): """Backend for __setitem__ method """ try: key = '%u' % key except: pass self._session.set(self[key], value) # Indirect access to session object def get(self): """Indirect GET request access to parent session object """ return self._session.get(self.oid) def getnext(self): """Indirect GETNEXT request access to parent session object """ return self._session.getnext(self.oid) def set(self, val): """Indirect SET request access to parent session object """ return self._session.set(self.oid, val) # Misc ops on OID object def oidlist(self): """Return a list of sub-OIDs """ return list(self._oid) def nodes(self): """Return child sub-nodes """ raise SnmpError('Feature not implemented') def enums(self): """Not clear what this does so far """ raise SnmpError('Feature not implemented')class snmptrap(CompatBase): """A SNMP TRAP request object """ def __init__(self, parent, addr, name, trap_type, specific_type, uptime, varbind): """Class constructor: initialize object internals from passed params """ self._wrapper_fun(self._init_fun, parent, addr, name, trap_type, specific_type, uptime, varbind) def _init_fun(self, parent, addr, name, trap_type, specific_type, uptime, varbind): """Backend for __init__ method """ # Keep a real Trap object around self.req = v1.Trap() self.req['pdu']['trap']['agent_addr']['internet'].set(addr) self.req['pdu']['trap']['enterprise'].set(name) self.req['pdu']['trap']['generic_trap'].set(trap_type) self.req['pdu']['trap']['specific_trap'].set(specific_type) self.req['pdu']['trap']['time_stamp'].set(uptime) self.req['pdu']['trap']['variable_bindings'] = varbind # Non-masked public attributes self._session = parent def __getattr__(self, attr): """Translate some attributes """ return self._wrapper_fun(self._getattr_fun, attr) def _getattr_fun(self, attr): """Backend for __getattr__ method """ if attr == 'addr': return self.req['pdu']['trap']['agent_addr']['internet'].get() elif attr == 'oid': return self.req['pdu']['trap']['enterprise'].get() elif attr == 'name': return self.oid elif attr == 'type': return self.req['pdu']['trap']['generic_trap'].get() elif attr == 'specific_type': return self.req['pdu']['trap']['specific_trap'].get() elif attr == 'uptime': return self.req['pdu']['trap']['time_stamp'].get() elif attr == 'session': return self._session elif attr == 'variables': # Fetch Object ID's and associated values oids = map(lambda x: x['name'].get(), \ self.req['pdu'].values()[0]['variable_bindings']) vals = map(lambda x: x['value'].values()[0].values()[0], \ self.req['pdu'].values()[0]['variable_bindings']) miboids = [] for oid, val in map(None, oids, vals): miboid = snmpmibnode(self, oid, None) miboid.value = val miboids.append(miboid) if len(miboids) == 1: return miboids[0] else: return miboids raise AttributeError, attr def __setattr__(self, attr, val): """Set certain attributes """ return self._wrapper_fun(self._setattr_fun, attr, val)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -