⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 snmpy.py

📁 实现snmp协议的agent 和manager
💻 PY
📖 第 1 页 / 共 2 页
字号:
"""   Compatibility interface to the snmpy package. The degree of compatibility   is mostly limited by the lack of MIB parser in PySNMP.   For more information on snmpy project, please, refer to   http://www.sf.net/projects/snmpy/   Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for   details.   Initial copyrights for prototype code and API are as follows:   Written by Anthony Baxter, arb@connect.com.au   Copyright (C) 1994,1995,1996 Anthony Baxter.   Modified extensively by Tim O'Malley, timo@bbn.com.   The changes are Copyright (C) 1996 of BBN.    Permission is hereby granted, free of charge, to any person obtaining   a copy of this source file to use, copy, modify, merge, or publish it   subject to the following conditions:   The above copyright notice and this permission notice shall be included   in all copies or in any new file that contains a substantial portion of   this file.   THE  AUTHOR  MAKES  NO  REPRESENTATIONS ABOUT  THE  SUITABILITY  OF   THE  SOFTWARE FOR  ANY  PURPOSE.  IT IS  PROVIDED  "AS IS"  WITHOUT   EXPRESS OR  IMPLIED WARRANTY.  THE AUTHOR DISCLAIMS  ALL WARRANTIES   WITH  REGARD TO  THIS  SOFTWARE, INCLUDING  ALL IMPLIED  WARRANTIES   OF   MERCHANTABILITY,  FITNESS   FOR  A   PARTICULAR  PURPOSE   AND   NON-INFRINGEMENT  OF THIRD  PARTY  RIGHTS. IN  NO  EVENT SHALL  THE   AUTHOR  BE LIABLE  TO  YOU  OR ANY  OTHER  PARTY  FOR ANY  SPECIAL,   INDIRECT,  OR  CONSEQUENTIAL  DAMAGES  OR  ANY  DAMAGES  WHATSOEVER   WHETHER IN AN  ACTION OF CONTRACT, NEGLIGENCE,  STRICT LIABILITY OR   ANY OTHER  ACTION ARISING OUT OF  OR IN CONNECTION WITH  THE USE OR   PERFORMANCE OF THIS SOFTWARE."""from exceptions import Exceptionfrom time import timefrom types import ListTypefrom pysnmp.proto import v1from pysnmp.mapping.udp import rolefrom pysnmp.asn1 import baseimport pysnmp.error, pysnmp.mapping.udp.error# Module exceptionsclass SnmpError(Exception): passclass SnmpTimeout(Exception): pass# Module constantsversion = 0.73SNMP_TRAP_PORT = 162class CompatBase:    """Base class for compatibility classes    """    def _wrapper_fun(self, fun, *args):        """        """        try:            return apply(fun, args)        # Catch transport-speciic, timeout exceptions        except pysnmp.mapping.udp.error.NoResponseError, why:            raise SnmpTimeout(why)        except pysnmp.mapping.udp.error.IdleTimeoutError, why:            raise SnmpTimeout(why)        # Render the rest of PySNMP exceptions to one        except pysnmp.error.PySnmpError, why:            raise SnmpError(why)class snmpmibnode(CompatBase):    """A single MIB object (variable-binding)    """    def __init__(self, parent, name, extension):        """Class constructor: initialize object internals from passed OID        """        self._wrapper_fun(self._init_fun, parent, name, extension)    def _init_fun(self, parent, name, extension):        """The backend method for class constructor        """        oid = ''        try:            if name[0] != '.':                oid = '.1.3.6.1.2.1'            oid = '%s.%s' % (oid, name)            if extension:                oid = '%s.%s' % (oid, extension)        except:            raise SnmpError('Malformed OID name/extension: %s/%s' %                            (name, extension))        # Keep a real OID object around        self._oid = v1.ObjectIdentifier(oid)        self._value = None        self._session = parent    def __getattr__(self, attr):        """Translate some attributes        """        return self._wrapper_fun(self._getattr_fun, attr)    def _getattr_fun(self, attr):        """Backend for __getattr__ method        """        if attr == 'oid':            return self._oid.get()        elif attr == 'name':            return self.oid        elif attr == 'index':            return 0L        elif attr == 'value':            if self._value is None:                return None            else:                self._value.get()        elif attr == 'type':            # More types to add?            if self._value is None:                return 0            else:                if isinstance(self._value, v1.ObjectIdentifier):                    return 1                elif isinstance(self._value, v1.OctetString):                    return 2                elif isinstance(self._value, v1.Integer):                    return 3                elif isinstance(self._value, v1.NetworkAddress):                    return 4                elif isinstance(self._value, v1.IpAddress):                    return 5                elif isinstance(self._value, v1.Counter):                    return 6                elif isinstance(self._value, v1.Gauge):                    return 7                elif isinstance(self._value, v1.TimeTicks):                    return 8                elif isinstance(self._value, v1.Opaque):                    return 9                elif isinstance(self._value, v1.Null):                    return 10                elif isinstance(self._value, v1.Counter64):                    return 11                # XXX may be continued                else:                    raise SnmpError('Unsupported value type: %s' %                                    self._value)        elif attr == 'session':            return self._session        elif attr == 'description':            return ''        elif attr == 'enums':            raise SnmpError('Feature not implemented')        raise AttributeError, attr    def __setattr__(self, attr, val):        """Set certain attributes        """        return self._wrapper_fun(self._setattr_fun, attr, val)        def _setattr_fun(self, attr, val):        """Backend for __setattr__ method         """        if attr in ['oid', 'name', 'index', 'type', 'description', 'enums']:            raise SnmpError('Read-only attribute: %s' % attr)        elif attr == 'value':            if not isinstance(val, base.SimpleAsn1Object):                raise SnmpError('Non ASN1 object given for value: %s' \                                % str(val))            self._value = val        self.__dict__[attr] = val    def __repr__(self):        """Try to look similar to original implementation        """        return self._wrapper_fun(self._repr_fun)            def _repr_fun(self):        """Backend for __repr__ method        """        return '<SNMP_mibnode %s, %u, %s>' % (self.name, self.type, self.value)    # Mapping object API    def __len__(self):        """Returns number of sub-OIDs in SNMP table (can't be implemented           w/o MIB parser)        """        return 0    def __getitem__(self, key):        """Support creating child OIDs by subscription        """        return self._wrapper_fun(self._getitem_fun, key)            def _getitem_fun(self, key):        """Backend for __getitem__ method        """        try:            key = '%u' % key        except:            pass        return snmpmibnode(self._session, self.oid, key)    def __setitem__(self, key, value):        """Support creating child OIDs by subscription        """        return self._wrapper_fun(self._setitem_fun, key, value)            def _setitem_fun(self, key, value):        """Backend for __setitem__ method        """        try:            key = '%u' % key        except:            pass        self._session.set(self[key], value)    # Indirect access to session object    def get(self):        """Indirect GET request access to parent session object        """        return self._session.get(self.oid)    def getnext(self):        """Indirect GETNEXT request access to parent session object        """        return self._session.getnext(self.oid)    def set(self, val):        """Indirect SET request access to parent session object        """        return self._session.set(self.oid, val)    # Misc ops on OID object        def oidlist(self):        """Return a list of sub-OIDs        """        return list(self._oid)    def nodes(self):        """Return child sub-nodes        """        raise SnmpError('Feature not implemented')    def enums(self):        """Not clear what this does so far        """        raise SnmpError('Feature not implemented')class snmptrap(CompatBase):    """A SNMP TRAP request object    """    def __init__(self, parent, addr, name, trap_type, specific_type,                 uptime, varbind):        """Class constructor: initialize object internals from passed params        """                self._wrapper_fun(self._init_fun, parent, addr, name, trap_type,                          specific_type, uptime, varbind)            def _init_fun(self, parent, addr, name, trap_type, specific_type,                  uptime, varbind):        """Backend for __init__ method        """        # Keep a real Trap object around        self.req = v1.Trap()        self.req['pdu']['trap']['agent_addr']['internet'].set(addr)        self.req['pdu']['trap']['enterprise'].set(name)        self.req['pdu']['trap']['generic_trap'].set(trap_type)        self.req['pdu']['trap']['specific_trap'].set(specific_type)        self.req['pdu']['trap']['time_stamp'].set(uptime)        self.req['pdu']['trap']['variable_bindings'] = varbind        # Non-masked public attributes        self._session = parent    def __getattr__(self, attr):        """Translate some attributes        """        return self._wrapper_fun(self._getattr_fun, attr)        def _getattr_fun(self, attr):        """Backend for __getattr__ method        """        if attr == 'addr':            return self.req['pdu']['trap']['agent_addr']['internet'].get()        elif attr == 'oid':            return self.req['pdu']['trap']['enterprise'].get()        elif attr == 'name':            return self.oid        elif attr == 'type':            return self.req['pdu']['trap']['generic_trap'].get()        elif attr == 'specific_type':            return self.req['pdu']['trap']['specific_trap'].get()        elif attr == 'uptime':            return self.req['pdu']['trap']['time_stamp'].get()        elif attr == 'session':            return self._session        elif attr == 'variables':            # Fetch Object ID's and associated values            oids = map(lambda x: x['name'].get(), \                       self.req['pdu'].values()[0]['variable_bindings'])            vals = map(lambda x: x['value'].values()[0].values()[0], \                       self.req['pdu'].values()[0]['variable_bindings'])            miboids = []            for oid, val in map(None, oids, vals):                miboid = snmpmibnode(self, oid, None)                miboid.value = val                miboids.append(miboid)            if len(miboids) == 1:                return miboids[0]            else:                        return miboids        raise AttributeError, attr    def __setattr__(self, attr, val):        """Set certain attributes        """        return self._wrapper_fun(self._setattr_fun, attr, val)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -