⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.py

📁 实现snmp协议的agent 和manager
💻 PY
📖 第 1 页 / 共 2 页
字号:
"""   Deprecated PySNMP 1.x compatibility interface to SNMP v.1   protocol implementation.   Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for   details."""from pysnmp.proto import rfc1155, rfc1157from pysnmp.compat.pysnmp1x import ber, errorclass message(ber.ber):    """Depricated PySNMP 1.x compatibility SNMP message processing class    """    def __init__ (self, community='public', version=0):        """Compatibility constructor        """        self.community = community        self.version = version        self.request_id = 0            def _encode_bindings_fun(self, encoded_oids, encoded_vals):        """BER encode oids & values        """        # Decode OIDs        oids = map(lambda x: rfc1155.ObjectName(), encoded_oids)        map(lambda x, y: x.decode(y), oids, encoded_oids)        # Decode vals        vals = map(lambda x: rfc1155.ObjectSyntax(), encoded_vals)        map(lambda x, y: x.decode(y), vals, encoded_vals)        # Set defaults for missing vars        if len(oids) > len(vals):            vals.extend(map(lambda x: rfc1155.ObjectSyntax(),                            [ None ] * (len(oids) - len(vals))))        # Encode bindings        return apply(rfc1157.VarBindList, \                     map(lambda x, y: rfc1157.VarBind(name=x, value=y), \                         oids, vals)).encode()    def encode_bindings(self, encoded_oids, encoded_vals):        """Compatibility method: BER encode oids & values        """        return self._wrapper(self._encode_bindings_fun, \                             encoded_oids, encoded_vals)    def _decode_bindings_fun(self, octetString):        """BER decode bindings up to oids & values        """        # Decode bindings        varBindList = rfc1157.VarBindList()        varBindList.decode(octetString)        # Build encoded_oids and encoded_vals        encoded_oids = map(lambda x: x['name'].encode(), varBindList)        encoded_vals = map(lambda x: x['value'].encode(), varBindList)        return (encoded_oids, encoded_vals)    def decode_bindings(self, octetString):        """Compatibility method: BER decode bindings up to oids & values        """        return self._wrapper(self._decode_bindings_fun, octetString)    def _encode_snmp_pdu_fun(self, pduType, bindings):        """BER encode SNMP PDU        """        # Decode bindings        varBindList = rfc1157.VarBindList()        varBindList.decode(bindings)        if pduType == 'GETREQUEST':            pdu = rfc1157.GetRequestPdu()        elif pduType == 'SETREQUEST':            pdu = rfc1157.SetRequestPdu()        elif pduType == 'GETNEXTREQUEST':                        pdu = rfc1157.GetNextRequestPdu()        elif pduType == 'GETRESPONSE':                        pdu = rfc1157.GetResponsePdu()        else:            raise error.BadPDUType('Unrecognized PDU: %s' % pduType)        pdu['variable_bindings'] = varBindList                return pdu.encode()        def encode_snmp_pdu(self, pduType, bindings):        """Compatibility method: BER encode SNMP PDU        """        return self._wrapper(self._encode_snmp_pdu_fun, \                             pduType, bindings)    def _decode_snmp_pdu_fun(self, octetString):        """BER decode SNMP PDU        """        # Decode pdu        pdu = rfc1157.Pdus()        pdu.decode(octetString)        pdu = pdu.values()[0]        return (pdu.tagId, pdu['request_id'].get(), pdu['error_status'].get(),                pdu['error_index'].get(), pdu['variable_bindings'].encode())                    def decode_snmp_pdu(self, octetString):        """Compatibility method: BER decode SNMP PDU        """        return self._wrapper(self._decode_snmp_pdu_fun, octetString)    def _encode_request_sequence_fun(self, pdu):        """BER encode SNMP request sequence        """        # Decode pdu        requestPdu = rfc1157.Pdus()        requestPdu.decode(pdu)        # Create SNMP message and attach PDU        msg = rfc1157.Message(pdu=requestPdu)        # Update message options        msg['version'].set(self.version)        msg['community'].set(self.community)                return msg.encode()        def encode_request_sequence(self, pdu):        """Compatibility method: BER encode SNMP request sequence        """        return self._wrapper(self._encode_request_sequence_fun, pdu)    def _decode_response_sequence_fun(self, octetStream):        """BER decode SNMP response sequence        """        # Create and decode SNMP message        msg = rfc1157.Message()        msg.decode(octetStream)        return (msg['version'].get(), msg['community'].get(),                msg['pdu'].encode())        def decode_response_sequence(self, octetStream):        """Compatibility method: BER decode SNMP response sequence        """        return self._wrapper(self._decode_response_sequence_fun,                             octetStream)    def _encode_request_fun(self, pduType, encoded_oids, encoded_vals):        """Compatibility method: BER encode SNMP request message        """        # Decode OIDs        oids = map(lambda x: rfc1155.ObjectName(), encoded_oids)        map(lambda x, y: x.decode(y), oids, encoded_oids)        # Decode vals        vals = map(lambda x: rfc1155.ObjectSyntax(), encoded_vals)        map(lambda x, y: x.decode(y), vals, encoded_vals)        # Set defaults for missing vars        if len(oids) > len(vals):            vals.extend(map(lambda x: rfc1155.ObjectSyntax(),                            [ None ] * (len(oids) - len(vals))))        if pduType == 'GETREQUEST':            pdu = rfc1157.GetRequestPdu()        elif pduType == 'SETREQUEST':            pdu = rfc1157.SetRequestPdu()        elif pduType == 'GETNEXTREQUEST':                        pdu = rfc1157.GetNextRequestPdu()        else:            raise error.BadPDUType('Unrecognized request type: %s' % pduType)        # Store request ID for later verification        self.request_id = pdu['request_id']                # Attach bindings        pdu['variable_bindings'] = apply(rfc1157.VarBindList, \                                         map(lambda x, y: \                                             rfc1157.VarBind(name=x, value=y),\                                             oids, vals))        # Create a message        req = rfc1157.Message(pdu=rfc1157.Pdus(somepdu=pdu))        # Update message options        req['version'].set(self.version)        req['community'].set(self.community)                return req.encode()        def encode_request(self, pduType, encoded_oids, encoded_values):        """           encode_request(type, encoded_oids, encoded_values) -> SNMP message                      Encode Object IDs and values (lists of strings) into variables           bindings, then encode bindings into SNMP PDU (of specified type           (string)), then encode SNMP PDU into SNMP message.        """        return self._wrapper(self._encode_request_fun, \                             pduType, encoded_oids, encoded_values)    def _decode_response_fun(self, octetStream, mtype):        """Compatibility method: BER decode SNMP response message

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -