⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 message.py

📁 实现snmp协议的agent 和manager
💻 PY
📖 第 1 页 / 共 2 页
字号:
        """        if not octetStream:            raise error.EmptyResponse('Empty SNMP message')                # Create and decode SNMP message        rsp = rfc1157.Message()        rsp.decode(octetStream)        # Check response validness        if rsp['version'].get() != self.version:            raise error.BadVersion('Unmatched SNMP versions: %d/%d' %                                   (rsp['version'].get(), self.version))        if rsp['community'].get() != self.community:            raise error.BadCommunity('Unmatched SNMP community names: %s/%s' \                                     % (rsp['community'].get(),self.community))        # Retain curios check %-/        if mtype == 'GETRESPONSE' and rsp['pdu'].has_key('get_response'):            pass        elif mtype == 'GETREQUEST' and rsp['pdu'].has_key('get_request'):            pass        elif mtype == 'SETREQUEST' and rsp['pdu'].has_key('set_request'):            pass        elif mtype == 'GETNEXTREQUEST' and \             rsp['pdu'].has_key('get_next_request'):            pass        else:            raise error.BadPDUType('Unexpected PDU type %s/%s' %                                   (rsp['pdu'].keys()[0], mtype))        # Handle SNMP errors        pdu = rsp['pdu'].values()[0]        if pdu['error_status']:            raise error.SNMPError(pdu['error_status'].get(),\                                  pdu['error_index'].get())        # Make sure request ID's matched        if pdu['request_id'] != self.request_id:            raise error.BadRequestID ('Unmatched request/response IDs: %d/%d'\                                      % (pdu['request_id'], self.request_id))        # Build encoded_oids and encoded_vals        encoded_oids = map(lambda x: x['name'].encode(), pdu['variable_bindings'])        encoded_vals = map(lambda x: x['value'].encode(), pdu['variable_bindings'])        return (encoded_oids, encoded_vals)    def decode_response(self, message, mtype='GETRESPONSE'):        """           decode_response(message[, type]) -> (encoded_oids, encoded_values)                      Decode SNMP message (string) of specified type (default is           'GETRESPONSE'), return lists of encoded Object IDs and their           values (lists of strings).        """        return self._wrapper(self._decode_response_fun, message, mtype)    # Trap stuff        def _encode_snmp_trap_pdu_fun(self, enterprise, address, generic, \                                   specific, timeticks, bindings):        """BER encode SNMP TRAP PDU        """        # Decode bindings        varBindList = rfc1157.VarBindList()        varBindList.decode(bindings)        # Create PDU and attach bindings        pdu = rfc1157.TrapPdu(variable_bindings=varBindList)        # Load options        pdu['enterprise'].set(enterprise)        pdu['agent_addr']['internet'].set(address)        pdu['generic_trap'].set(generic)        pdu['specific_trap'].set(specific)        pdu['time_stamp'].set(timeticks)                return pdu.encode()    def encode_snmp_trap_pdu(self, enterprise, address, generic,                             specific, timeticks, bindings):        """Compatibility method: BER encode SNMP TRAP PDU        """        return self._wrapper(self._encode_snmp_trap_pdu_fun, enterprise,\                             address, generic, specific, timeticks,\                             bindings)    def _decode_snmp_trap_pdu_fun(self, octetString):        """BER decode SNMP TRAP PDU        """        # Decode pdu        pdu = rfc1157.TrapPdu()        pdu.decode(octetString)        return (pdu.tagId, pdu['enterprise'].get(), \                pdu['agent_addr']['internet'].get(), \                pdu['generic_trap'].get(), pdu['specific_trap'].get(), \                pdu['time_stamp'].get(), \                pdu['variable_bindings'].encode())                    def decode_snmp_trap_pdu(self, octetString):        """Compatibility method: BER decode SNMP TRAP PDU        """        return self._wrapper(self._decode_snmp_trap_pdu_fun, octetString)    def _encode_trap_fun(self, enterprise, address, generic, specific,\                         timeticks, encoded_oids, encoded_vals):        """Compatibility method: BER encode SNMP TRAP message        """        # Decode OIDs        oids = map(lambda x: rfc1155.ObjectName(), encoded_oids)        map(lambda x, y: x.decode(y), oids, encoded_oids)        # Decode vals        vals = map(lambda x: rfc1155.ObjectSyntax(), encoded_vals)        map(lambda x, y: x.decode(y), vals, encoded_vals)        # Set defaults for missing vars        if len(oids) > len(vals):            vals.extend(map(lambda x: rfc1155.ObjectSyntax(),                            [ None ] * (len(oids) - len(vals))))        pdu = rfc1157.TrapPdu()        # Load trap options        pdu['enterprise'].set(enterprise)        pdu['agent_addr']['internet'].set(address)        pdu['generic_trap'].set(generic)        pdu['specific_trap'].set(specific)        pdu['time_stamp'].set(timeticks)                # Attach bindings        pdu['variable_bindings'] = apply(rfc1157.VarBindList,\                                         map(lambda x, y: \                                             rfc1157.VarBind(name=x, value=y),\                                             oids, vals))        # Create a message        req = rfc1157.Message(pdu=rfc1157.Pdus(somepdu=pdu))        # Update message options        req['version'].set(self.version)        req['community'].set(self.community)                return req.encode()    def encode_trap(self, enterprise, address, generic, specific,\                    timeticks, encoded_oids=[], encoded_vals=[]):        """           encode_trap(type, encoded_oids, encoded_values) -> SNMP message                      Encode Object IDs and values (lists of strings) into variables           bindings, then encode bindings into SNMP PDU (of specified type           (string)), then encode SNMP PDU into SNMP message.        """        return self._wrapper(self._encode_trap_fun, enterprise, address, \                             generic, specific, timeticks, encoded_oids, \                             encoded_vals)    def _decode_trap_fun(self, octetStream):        """Compatibility method: BER decode SNMP TRAP message        """        if not octetStream:            raise error.EmptyResponse('Empty SNMP message')                # Create and decode SNMP message        req = rfc1157.Message()        req.decode(octetStream)        # Check response validness        if req['version'].get() != self.version:            raise error.BadVersion('Unmatched SNMP versions: %d/%d' %                                   (req['version'].get(), self.version))        if req['community'].get() != self.community:            raise error.BadCommunity('Unmatched SNMP community names: %s/%s' \                                     % (req['community'].get(),self.community))        if not req['pdu'].has_key('trap'):            raise error.BadPDUType('Unexpected PDU type %s' % req['pdu'].keys()[0])        # Build encoded_oids and encoded_vals        encoded_oids = map(lambda x: x['name'].encode(), req['pdu'].values()[0]['variable_bindings'])        encoded_vals = map(lambda x: x['value'].encode(), req['pdu'].values()[0]['variable_bindings'])        return (req['pdu'].values()[0]['enterprise'].get(),                req['pdu'].values()[0]['agent_addr']['internet'].get(),                req['pdu'].values()[0]['generic_trap'].get(),                req['pdu'].values()[0]['specific_trap'].get(),                req['pdu'].values()[0]['time_stamp'].get(),                encoded_oids, encoded_vals)    def decode_trap(self, octetStream):        """           decode_trap(message) -> (enterprise, address, generic,                                    specific, timeticks, encoded_oids,                                     encoded_vals)                      Decode SNMP TRAP message (string).        """        return self._wrapper(self._decode_trap_fun, octetStream)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -