⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 v1.py

📁 实现snmp协议的agent 和manager
💻 PY
字号:
"""   Deprecated PySNMP 2.0.x compatibility interface to SNMP v.1   protocol implementation.   Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for   details."""from pysnmp.proto import v1import pysnmp.proto.error, pysnmp.asn1.error, \       pysnmp.asn1.encoding.ber.errorfrom pysnmp.compat.pysnmp2x import asn1class Error(asn1.Error):    """Base class for v1 module exceptions    """    passclass TypeError(Error):    """V1 data type incompatibility    """    passclass BadArgument(Error):    """Bad V1 object value    """    passclass BadPDUType(Error):    """Bad SNMP PDU type    """    passclass BadVersion(Error):    """Bad SNMP version    """    passclass BadEncoding(Error):    """Bad BER encoding in SNMP message    """    passclass CompatBase:    """Base class for compatibility classes    """    msgProto = None        def __init__(self, **kwargs):        """Compatibility constructor        """        self._wrapper_fun(self.__init_fun__, kwargs)    def __init_fun__(self, kwdict):        """Base class constructor        """        self.msg = self.msgProto()        self.update(kwdict)            def _wrapper_fun(self, fun, *args):        """Call passed function and translate possible exceptions        """        try:             return apply(fun, args)                   # Catch snmp package exceptions        except pysnmp.proto.error.BadArgumentError, why:            raise BadArgument(why)               except pysnmp.proto.error.ProtoError, why:            raise Error(why)                # Catch ber package exceptions                except pysnmp.asn1.encoding.ber.error.BadArgumentError, why:            raise asn1.BadArgument(why)        except pysnmp.asn1.encoding.ber.error.TypeMismatchError, why:            raise asn1.UnknownTag(why)        except pysnmp.asn1.encoding.ber.error.OverFlowError, why:            raise asn1.OverFlow(why)        except pysnmp.asn1.encoding.ber.error.UnderRunError, why:            raise asn1.UnderRun(why)        except pysnmp.asn1.encoding.ber.error.BadEncodingError, why:            raise asn1.BadEncoding(why)        except pysnmp.asn1.encoding.ber.error.BerEncodingError, why:            raise asn1.Error(why)        # Catch asn1 package exceptions                except pysnmp.asn1.error.BadArgumentError, why:            raise asn1.BadArgument(why)        except pysnmp.asn1.error.ValueConstraintError, why:            raise asn1.TypeError(why)        except pysnmp.asn1.error.Asn1Error, why:            raise asn1.Error(why)    def __str__(self):        """Return native representation of instance payload        """        res = ''        for key in self.keys():            if res:                res = res + ', ' + key + '=' + str(self[key])            else:                res = key + '=' + str(self[key])        return self.__class__.__name__ + ': ' + res     def __repr__(self):        """Return native representation of instance payload        """        res = ''        for key in self.keys():            if res:                res = res + ', ' + key + '=' + repr(self[key])            else:                res = key + '=' + repr(self[key])        return self.__class__.__name__ + '(' + res + ')'    def __cmp__(self, other):        """Compatibility comparation method        """        return self._wrapper_fun(self.__cmp_fun__, other)    def __cmp_fun__(self, other):        """Compare requests        """        if not isinstance(other, CompatBase):            raise BadArgument('Can not compare %s vs %s' %                              (str(self), str(other)))                                      if self['request_id'] == other['request_id']:            return 0        return -1    def _decode_fun(self, data):        """Call decode() by wrapper        """        return self.msg.decode(data)        def decode(self, data):        """Compatibility method: decode BER octet stream into this ASN.1 object        """        try:            return self._wrapper_fun(self._decode_fun, data)        except asn1.BadEncoding, why:            raise BadEncoding(why)        except asn1.UnknownTag, why:            raise BadPDUType(why)    def _encode_fun(self, kwdict):        """Call encode() by wrapper        """        self.update(kwdict)        return self.msg.encode()    def encode(self, **kwargs):        """Compatibility method: assign a value and encode           it into BER octet stream        """        return self._wrapper_fun(self._encode_fun, kwargs)    #    # Dictionary interface    #        def _getitem_fun(self, key):        """Return message item by key        """        if key == 'encoded_oids':            return map(lambda x: x['name'].encode(), \                       self.msg['pdu'].values()[0]['variable_bindings'])        elif key == 'encoded_vals':            return map(lambda x: x['value'].encode(), \                       self.msg['pdu'].values()[0]['variable_bindings'])        elif key == 'request_id' or key == 'error_status' or \             key == 'error_index':            return self.msg['pdu'].values()[0][key].get()        elif key == 'community' or key == 'version':            return self.msg[key].get()        elif key == 'tag':            return self.__class__.__name__        raise KeyError('Non-applicible key: %s' % key)    def __getitem__(self, key):        """Call __getitem__ by wrapper        """        return self._wrapper_fun(self._getitem_fun, key)    def _setitem_fun(self, key, value):        """Set message item by key        """        if key == 'encoded_oids':            # Decode OIDs            oids = map(lambda x: v1.ObjectName(), value)            map(lambda x, y: x.decode(y), oids, value)            # Fetch vals            vals = map(lambda x: x['value'], \                       self.msg['pdu'].values()[0]['variable_bindings'])            # Wild hack to keep OIDs & vals balanced            if len(oids) > len(vals):                vals.extend(map(lambda x: v1.ObjectSyntax(),                                [ None ] * (len(oids) - len(vals))))            # Re-commit OID-value pairs            self.msg['pdu'].values()[0]['variable_bindings'] = apply(v1.VarBindList, map(lambda x, y: v1.VarBind(name=x, value=y), oids, vals))        elif key == 'encoded_vals':            # Fetch OIDs            oids = map(lambda x: x['name'], \                       self.msg['pdu'].values()[0]['variable_bindings'])            # Decode vals            vals = map(lambda x: v1.ObjectSyntax(), value)            map(lambda x, y: x.decode(y), vals, value)            # Wild hack to keep OIDs & vals balanced            if len(oids) < len(vals):                oids.extend(map(v1.ObjectName,                                ['.1.3'] * (len(vals) - len(oids))))                        # Re-commit OID-value pairs            self.msg['pdu'].values()[0]['variable_bindings'] = apply(v1.VarBindList, map(lambda x, y: v1.VarBind(name=x, value=y), oids, vals))        elif key == 'request_id' or key == 'error_status' or \             key == 'error_index':            self.msg['pdu'].values()[0][key].set(value)        elif key == 'community' or key == 'version':            self.msg[key].set(value)        elif key == 'tag':            pass        else:            raise KeyError('Non-applicible key: %s' % key)        def __setitem__(self, key, value):        """Call __setitem__ by wrapper        """        return self._wrapper_fun(self._setitem_fun, key, value)    def keys(self):        """Return keys known to compatibility API        """        return [ 'encoded_oids', 'encoded_vals', 'request_id', 'error_status',                 'error_index', 'tag', 'version', 'community' ]    def has_key(self, key):        """Return true if key exists in dictionary        """        return key in self.keys()    def get(self, key, default):        """Get values by key with default        """        if self.has_key(key):            return self[key]        return default    def update(self, args):        """Update dict with the other dict        """        for key in args.keys():            self[key] = args[key]    def clear(self):        """Clear dict payload        """        self.msg = self.msgProto()class _CompatRequest(CompatBase):    """Base class for all request classes    """    def reply(self, **kwargs):        """Build a response message from request message        """        rsp = GETRESPONSE(community=self['community'], \                          request_id=self['request_id'], \                          encoded_oids=self['encoded_oids'], \                          encoded_vals=self['encoded_vals'])        rsp.update(kwargs)        return rsp        class GETREQUEST(_CompatRequest):    """Compatibility interface to GETREQUEST    """    msgProto = v1.GetRequestclass GETNEXTREQUEST(_CompatRequest):    """Compatibility interface to GETNEXTREQUEST    """    msgProto = v1.GetNextRequestclass SETREQUEST(_CompatRequest):    """Compatibility interface to SETREQUEST    """    msgProto = v1.SetRequestclass GETRESPONSE(CompatBase):    """Compatibility interface to GETRESPONSE    """    msgProto = v1.GetResponseclass TRAPREQUEST(CompatBase):    """Compatibility interface to TRAPREQUEST    """    msgProto = v1.Trap    #    # Dictionary interface    #        def _getitem_fun(self, key):        """Return message item by key        """        if key == 'generic_trap' or key == 'specific_trap' or \           key == 'time_stamp' or key == 'enterprise':            return self.msg['pdu'].values()[0][key].get()        elif key == 'agent_address':            return self.msg['pdu'].values()[0]['agent_addr']['internet'].get()        else:            return CompatBase._getitem_fun(self, key)    def _setitem_fun(self, key, value):        """Set message item by key        """        if key == 'generic_trap' or key == 'specific_trap' or \           key == 'time_stamp' or key == 'enterprise':                        self.msg['pdu'].values()[0][key].set(value)        elif key == 'agent_address':            self.msg['pdu'].values()[0]['agent_addr']['internet'].set(value)        else:            CompatBase._setitem_fun(self, key, value)    def keys(self):        """Return keys known to compatibility API        """        return [ 'encoded_oids', 'encoded_vals', 'generic_trap',                 'specific_trap', 'agent_address', 'time_stamp',                 'enterprise', 'tag', 'version', 'community' ]    def decode(data):    """       decode(input) -> (<request-object>, rest)       Decode input octet stream (string) into a request-object and return       the rest of input (string).    """    for msgType in [ GETREQUEST, GETNEXTREQUEST, SETREQUEST, GETRESPONSE,                     TRAPREQUEST ]:        msg = msgType()        try:            rest = msg.decode(data)        except BadPDUType:            continue                return (msg, rest)        raise BadPDUType('Unsuppored SNMP PDU type')

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -