📄 message.py
字号:
""" Deprecated PySNMP 1.x compatibility interface to SNMP v.1 protocol implementation. Copyright 1999-2002 by Ilya Etingof <ilya@glas.net>. See LICENSE for details."""from pysnmp.proto import rfc1155, rfc1157from pysnmp.compat.pysnmp1x import ber, errorclass message(ber.ber): """Depricated PySNMP 1.x compatibility SNMP message processing class """ def __init__ (self, community='public', version=0): """Compatibility constructor """ self.community = community self.version = version self.request_id = 0 def _encode_bindings_fun(self, encoded_oids, encoded_vals): """BER encode oids & values """ # Decode OIDs oids = map(lambda x: rfc1155.ObjectName(), encoded_oids) map(lambda x, y: x.decode(y), oids, encoded_oids) # Decode vals vals = map(lambda x: rfc1155.ObjectSyntax(), encoded_vals) map(lambda x, y: x.decode(y), vals, encoded_vals) # Set defaults for missing vars if len(oids) > len(vals): vals.extend(map(lambda x: rfc1155.ObjectSyntax(), [ None ] * (len(oids) - len(vals)))) # Encode bindings return apply(rfc1157.VarBindList, \ map(lambda x, y: rfc1157.VarBind(name=x, value=y), \ oids, vals)).encode() def encode_bindings(self, encoded_oids, encoded_vals): """Compatibility method: BER encode oids & values """ return self._wrapper(self._encode_bindings_fun, \ encoded_oids, encoded_vals) def _decode_bindings_fun(self, octetString): """BER decode bindings up to oids & values """ # Decode bindings varBindList = rfc1157.VarBindList() varBindList.decode(octetString) # Build encoded_oids and encoded_vals encoded_oids = map(lambda x: x['name'].encode(), varBindList) encoded_vals = map(lambda x: x['value'].encode(), varBindList) return (encoded_oids, encoded_vals) def decode_bindings(self, octetString): """Compatibility method: BER decode bindings up to oids & values """ return self._wrapper(self._decode_bindings_fun, octetString) def _encode_snmp_pdu_fun(self, pduType, bindings): """BER encode SNMP PDU """ # Decode bindings varBindList = rfc1157.VarBindList() varBindList.decode(bindings) if pduType == 'GETREQUEST': pdu = rfc1157.GetRequestPdu() elif pduType == 'SETREQUEST': pdu = rfc1157.SetRequestPdu() elif pduType == 'GETNEXTREQUEST': pdu = rfc1157.GetNextRequestPdu() elif pduType == 'GETRESPONSE': pdu = rfc1157.GetResponsePdu() else: raise error.BadPDUType('Unrecognized PDU: %s' % pduType) pdu['variable_bindings'] = varBindList return pdu.encode() def encode_snmp_pdu(self, pduType, bindings): """Compatibility method: BER encode SNMP PDU """ return self._wrapper(self._encode_snmp_pdu_fun, \ pduType, bindings) def _decode_snmp_pdu_fun(self, octetString): """BER decode SNMP PDU """ # Decode pdu pdu = rfc1157.Pdus() pdu.decode(octetString) pdu = pdu.values()[0] return (pdu.tagId, pdu['request_id'].get(), pdu['error_status'].get(), pdu['error_index'].get(), pdu['variable_bindings'].encode()) def decode_snmp_pdu(self, octetString): """Compatibility method: BER decode SNMP PDU """ return self._wrapper(self._decode_snmp_pdu_fun, octetString) def _encode_request_sequence_fun(self, pdu): """BER encode SNMP request sequence """ # Decode pdu requestPdu = rfc1157.Pdus() requestPdu.decode(pdu) # Create SNMP message and attach PDU msg = rfc1157.Message(pdu=requestPdu) # Update message options msg['version'].set(self.version) msg['community'].set(self.community) return msg.encode() def encode_request_sequence(self, pdu): """Compatibility method: BER encode SNMP request sequence """ return self._wrapper(self._encode_request_sequence_fun, pdu) def _decode_response_sequence_fun(self, octetStream): """BER decode SNMP response sequence """ # Create and decode SNMP message msg = rfc1157.Message() msg.decode(octetStream) return (msg['version'].get(), msg['community'].get(), msg['pdu'].encode()) def decode_response_sequence(self, octetStream): """Compatibility method: BER decode SNMP response sequence """ return self._wrapper(self._decode_response_sequence_fun, octetStream) def _encode_request_fun(self, pduType, encoded_oids, encoded_vals): """Compatibility method: BER encode SNMP request message """ # Decode OIDs oids = map(lambda x: rfc1155.ObjectName(), encoded_oids) map(lambda x, y: x.decode(y), oids, encoded_oids) # Decode vals vals = map(lambda x: rfc1155.ObjectSyntax(), encoded_vals) map(lambda x, y: x.decode(y), vals, encoded_vals) # Set defaults for missing vars if len(oids) > len(vals): vals.extend(map(lambda x: rfc1155.ObjectSyntax(), [ None ] * (len(oids) - len(vals)))) if pduType == 'GETREQUEST': pdu = rfc1157.GetRequestPdu() elif pduType == 'SETREQUEST': pdu = rfc1157.SetRequestPdu() elif pduType == 'GETNEXTREQUEST': pdu = rfc1157.GetNextRequestPdu() else: raise error.BadPDUType('Unrecognized request type: %s' % pduType) # Store request ID for later verification self.request_id = pdu['request_id'] # Attach bindings pdu['variable_bindings'] = apply(rfc1157.VarBindList, \ map(lambda x, y: \ rfc1157.VarBind(name=x, value=y),\ oids, vals)) # Create a message req = rfc1157.Message(pdu=rfc1157.Pdus(somepdu=pdu)) # Update message options req['version'].set(self.version) req['community'].set(self.community) return req.encode() def encode_request(self, pduType, encoded_oids, encoded_values): """ encode_request(type, encoded_oids, encoded_values) -> SNMP message Encode Object IDs and values (lists of strings) into variables bindings, then encode bindings into SNMP PDU (of specified type (string)), then encode SNMP PDU into SNMP message. """ return self._wrapper(self._encode_request_fun, \ pduType, encoded_oids, encoded_values) def _decode_response_fun(self, octetStream, mtype): """Compatibility method: BER decode SNMP response message
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -