⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 v2c.py

📁 实现snmp协议的agent 和manager
💻 PY
字号:
"""   Deprecated PySNMP 2.0.x compatibility interface to SNMP v.2c   protocol implementation.   Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for   details."""from pysnmp.proto import v2cfrom pysnmp.compat.pysnmp2x import v1, asn1class Error(v1.Error):    """Base class for v2c module exceptions    """    passclass BadArgument(Error):    """Bad v2c object value    """    passclass BadPDUType(Error):    """Bad SNMP PDU type    """    passclass BadVersion(Error):    """Bad SNMP version    """    passclass CompatBase(v1.CompatBase):    """Base class for compatibility classes    """    def _wrapper_fun(self, fun, *args):        """Call passed function and translate possible exceptions        """        try:            return apply(v1.CompatBase._wrapper_fun, [self, fun ] + list(args))        except v1.BadPDUType, why:            raise BadPDUType(why)        except v1.BadVersion, why:            raise BadVersion(why)        except v1.BadEncoding, why:            raise BadEncoding(why)    def _setitem_fun(self, key, value):        """Set message item by key        """        if key == 'encoded_oids':            # Decode OIDs            oids = map(lambda x: v2c.ObjectName(), value)            map(lambda x, y: x.decode(y), oids, value)            # Fetch vals            vals = map(lambda x: x['value'].values()[0], \                       self.msg['pdu'].values()[0]['variable_bindings'])            # Wild hack to keep OIDs & vals balanced            if len(oids) > len(vals):                vals.extend(map(v2c.Null,                                [ None ] * (len(oids) - len(vals))))            # Re-commit OID-value pairs            self.msg['pdu'].values()[0]['variable_bindings'] = apply(v2c.VarBindList, map(lambda x, y: v2c.VarBind(name=x, value=v2c.BindValue(value=y)), oids, vals))        elif key == 'encoded_vals':            # Fetch OIDs            oids = map(lambda x: x['name'], \                       self.msg['pdu'].values()[0]['variable_bindings'])            # Decode vals            vals = map(lambda x: v2c.BindValue(), value)            map(lambda x, y: x.decode(y), vals, value)            vals = map(lambda x: x.values()[0], vals)                        # Wild hack to keep OIDs & vals balanced            if len(oids) < len(vals):                oids.extend(map(v2c.ObjectName,                                ['.1.3'] * (len(vals) - len(oids))))                        # Re-commit OID-value pairs            self.msg['pdu'].values()[0]['variable_bindings'] = apply(v2c.VarBindList, map(lambda x, y: v2c.VarBind(name=x, value=v2c.BindValue(value=y)), oids, vals))        else:            v1.CompatBase._setitem_fun(self, key, value)    def decode(self, data):        """Compatibility method: decode BER octet stream into this ASN.1 object        """        try:            return v1.CompatBase.decode(self, data)        except v1.BadEncoding, why:            raise BadEncoding(why)        except v1.BadPDUType, why:            raise BadPDUType(why)class _CompatRequest(CompatBase):    """Base class for all request classes    """    def reply(self, **kwargs):        """Build a response message from request message        """        rsp = RESPONSE(community=self['community'], \                       request_id=self['request_id'], \                       encoded_oids=self['encoded_oids'], \                       encoded_vals=self['encoded_vals'])        rsp.update(kwargs)        return rspclass GETREQUEST(_CompatRequest):    """Compatibility interface to GETREQUEST    """    msgProto = v2c.GetRequestclass GETNEXTREQUEST(_CompatRequest):    """Compatibility interface to GETNEXTREQUEST    """    msgProto = v2c.GetNextRequestclass SETREQUEST(_CompatRequest):    """Compatibility interface to SETREQUEST    """    msgProto = v2c.SetRequestclass TRAP(_CompatRequest):    """Compatibility interface to TRAP message    """    msgProto = v2c.Trapclass INFORMREQUEST(_CompatRequest):    """Compatibility interface to INFORMREQUEST    """    msgProto = v2c.InformRequestclass REPORT(_CompatRequest):    """Compatibility interface to REPORT message    """    msgProto = v2c.Reportclass RESPONSE(CompatBase):    """Compatibility interface to RESPONSE message    """    msgProto = v2c.Response# An alias to RESPONSE classGETRESPONSE = RESPONSEclass GETBULKREQUEST(_CompatRequest):    """Compatibility interface to GETBULKREQUEST    """    msgProto = v2c.GetBulkRequest        #    # Dictionary interface    #        def _getitem_fun(self, key):        """Return message item by key        """        if key == 'non_repeaters' or \           key == 'max_repetitions':            return self.msg['pdu'].values()[0][key].get()        else:            return _CompatRequest._getitem_fun(self, key)    def _setitem_fun(self, key, value):        """Set message item by key        """        if key == 'non_repeaters' or \           key == 'max_repetitions':            return self.msg['pdu'].values()[0][key].set(value)        else:            return _CompatRequest._setitem_fun(self, key, value)    def keys(self):        """Return keys known to compatibility API        """        return [ 'encoded_oids', 'encoded_vals', 'request_id',                 'non_repeaters', 'max_repetitions', 'tag', 'version',                 'community' ]def decode(data):    """       decode(input) -> (<request-object>, rest)       Decode input octet stream (string) into a request-object and return       the rest of input (string).    """    for msgType in [ GETREQUEST, GETNEXTREQUEST, SETREQUEST, RESPONSE,                     INFORMREQUEST, TRAP, REPORT, GETBULKREQUEST]:        msg = msgType()        try:            rest = msg.decode(data)        except BadPDUType:            continue        except asn1.TypeError:            break                return (msg, rest)    return v1.decode(data)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -