📄 messagedispatcherimpl.java
字号:
/*_############################################################################
_##
_## SNMP4J - MessageDispatcherImpl.java
_##
_## Copyright 2003-2007 Frank Fock and Jochen Katz (SNMP4J.org)
_##
_## Licensed under the Apache License, Version 2.0 (the "License");
_## you may not use this file except in compliance with the License.
_## You may obtain a copy of the License at
_##
_## http://www.apache.org/licenses/LICENSE-2.0
_##
_## Unless required by applicable law or agreed to in writing, software
_## distributed under the License is distributed on an "AS IS" BASIS,
_## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_## See the License for the specific language governing permissions and
_## limitations under the License.
_##
_##########################################################################*/
package org.snmp4j;
import java.io.IOException;
import java.util.*;
import org.snmp4j.asn1.*;
import org.snmp4j.event.*;
import org.snmp4j.log.*;
import org.snmp4j.mp.*;
import org.snmp4j.smi.*;
import java.nio.ByteBuffer;
import org.snmp4j.transport.UnsupportedAddressClassException;
/**
* The <code>MessageDispatcherImpl</code> decodes and dispatches incoming
* messages using {@link MessageProcessingModel} instances and encodes
* and sends outgoing messages using an appropriate {@link TransportMapping}
* instances.
* <p>
* The method {@link #processMessage} will be called from a
* <code>TransportMapping</code> whereas the method {@link #sendPdu} will be
* called by the application.
*
* @see Snmp
* @see TransportMapping
* @see MessageProcessingModel
* @see MPv1
* @see MPv2c
* @see MPv3
*
* @author Frank Fock
* @version 1.8
*/
public class MessageDispatcherImpl implements MessageDispatcher {
private static final LogAdapter logger =
LogFactory.getLogger(MessageDispatcherImpl.class);
private Vector mpm = new Vector(3);
private Hashtable transportMappings = new Hashtable(5);
private int nextTransactionID = new Random().nextInt(Integer.MAX_VALUE-2)+1;
transient private Vector commandResponderListeners;
private transient Vector counterListeners;
private transient Vector authenticationFailureListeners;
private boolean checkOutgoingMsg = true;
/**
* Default constructor creates a message dispatcher without any associated
* message processing models.
*/
public MessageDispatcherImpl() {
}
/**
* Adds a message processing model to this message dispatcher. If a message
* processing model with the same ID as the supplied one already exists it
* will not be changed. Please call {@link #removeMessageProcessingModel}
* before to replace a message processing model.
* @param model
* a MessageProcessingModel instance.
*/
public synchronized void addMessageProcessingModel(MessageProcessingModel model) {
while (mpm.size() <= model.getID()) {
mpm.add(null);
}
if (mpm.get(model.getID()) == null) {
mpm.set(model.getID(), model);
}
}
/**
* Removes a message processing model from this message dispatcher.
* @param model
* a previously added MessageProcessingModel instance.
*/
public synchronized void removeMessageProcessingModel(MessageProcessingModel model) {
mpm.set(model.getID(), null);
}
/**
* Adds a transport mapping. When an outgoing message is processed where
* no specific transport mapping has been specified, then the
* message dispatcher will use the transport mapping
* that supports the supplied address class of the target.
* @param transport
* a TransportMapping instance. If there is already another transport
* mapping registered that supports the same address class, then
* <code>transport</code> will not be registered. To make sure that
* <code>transport</code> is registered in any case, call
* {@link #removeTransportMapping(TransportMapping transport)} before.
*/
public void addTransportMapping(TransportMapping transport) {
if (transportMappings.get(transport.getSupportedAddressClass()) == null) {
transportMappings.put(transport.getSupportedAddressClass(), transport);
}
}
/**
* Removes a transport mapping.
* @param transport
* a previously added TransportMapping instance.
* @return
* the supplied TransportMapping if it has been successfully removed,
* <code>null</code>otherwise.
*/
public TransportMapping removeTransportMapping(TransportMapping transport) {
return (TransportMapping)
transportMappings.remove(transport.getSupportedAddressClass());
}
/**
* Gets a collection of all registered transport mappings.
* @return
* a Collection instance.
*/
public Collection getTransportMappings() {
return transportMappings.values();
}
public synchronized int getNextRequestID() {
int nextID = nextTransactionID++;
if (nextID <= 0) {
nextID = 1;
nextTransactionID = 2;
}
return nextID;
}
protected PduHandle createPduHandle() {
return new PduHandle(getNextRequestID());
}
/**
* Sends a message using the <code>TransportMapping</code> that has been
* assigned for the supplied address type.
*
* @param transport
* the transport mapping to be used to send the message.
* @param destAddress
* the transport address where to send the message. The
* <code>destAddress</code> must be compatible with the supplied
* <code>transport</code>.
* @param message
* the SNMP message to send.
* @throws IOException
* if an I/O error occured while sending the message or if there is
* no transport mapping defined for the supplied address type.
*/
protected void sendMessage(TransportMapping transport,
Address destAddress, byte[] message)
throws IOException
{
if (destAddress instanceof GenericAddress) {
destAddress = ((GenericAddress)destAddress).getAddress();
}
if (transport != null) {
transport.sendMessage(destAddress, message);
}
else {
String txt = "No transport mapping for address class: "+
destAddress.getClass().getName()+"="+destAddress;
logger.error(txt);
throw new IOException(txt);
}
}
/**
* Returns a transport mapping that can handle the supplied address.
* @param destAddress
* an Address instance.
* @return
* a <code>TransportMapping</code> instance that can be used to sent
* a SNMP message to <code>destAddress</code> or <code>null</code> if
* such a transport mapping does not exists.
* @since 1.6
*/
public TransportMapping getTransport(Address destAddress) {
Class addressClass = destAddress.getClass();
TransportMapping transport =
(TransportMapping) transportMappings.get(addressClass);
return transport;
}
/**
* Actually decodes and dispatches an incoming SNMP message using the supplied
* message processing model.
*
* @param sourceTransport
* a <code>TransportMapping</code> that matches the incomingAddress type.
* @param mp
* a <code>MessageProcessingModel</code> to process the message.
* @param incomingAddress
* the <code>Address</code> from the entity that sent this message.
* @param wholeMessage
* the <code>BERInputStream</code> containing the SNMP message.
* @throws IOException
* if the message cannot be decoded.
*/
protected void dispatchMessage(TransportMapping sourceTransport,
MessageProcessingModel mp,
Address incomingAddress,
BERInputStream wholeMessage) throws IOException {
MutablePDU pdu = new MutablePDU();
Integer32 messageProcessingModel = new Integer32();
Integer32 securityModel = new Integer32();
OctetString securityName = new OctetString();
Integer32 securityLevel = new Integer32();
PduHandle handle = createPduHandle();
Integer32 maxSizeRespPDU =
new Integer32(sourceTransport.getMaxInboundMessageSize());
StatusInformation statusInfo = new StatusInformation();
MutableStateReference mutableStateReference = new MutableStateReference();
// add the transport mapping to the state reference to allow the MP to
// return REPORTs on the same interface/port the message had been received.
StateReference stateReference = new StateReference();
stateReference.setTransportMapping(sourceTransport);
stateReference.setAddress(incomingAddress);
mutableStateReference.setStateReference(stateReference);
int status = mp.prepareDataElements(this, incomingAddress, wholeMessage,
messageProcessingModel, securityModel,
securityName, securityLevel, pdu,
handle, maxSizeRespPDU, statusInfo,
mutableStateReference);
if (mutableStateReference.getStateReference() != null) {
// make sure transport mapping is set
mutableStateReference.
getStateReference().setTransportMapping(sourceTransport);
}
if (status == SnmpConstants.SNMP_ERROR_SUCCESS) {
// dispatch it
CommandResponderEvent e =
new CommandResponderEvent(this,
sourceTransport,
incomingAddress,
messageProcessingModel.getValue(),
securityModel.getValue(),
securityName.getValue(),
securityLevel.getValue(),
handle,
pdu.getPdu(),
maxSizeRespPDU.getValue(),
mutableStateReference.getStateReference());
fireProcessPdu(e);
}
else {
switch (status) {
case SnmpConstants.SNMP_MP_UNSUPPORTED_SECURITY_MODEL:
case SnmpConstants.SNMP_MP_WRONG_USER_NAME:
case SnmpConstants.SNMP_MP_USM_ERROR: {
AuthenticationFailureEvent event =
new AuthenticationFailureEvent(this, incomingAddress,
sourceTransport, status,
wholeMessage);
fireAuthenticationFailure(event);
break;
}
}
logger.warn(statusInfo.toString());
}
}
public void processMessage(TransportMapping sourceTransport,
Address incomingAddress,
ByteBuffer wholeMessage) {
processMessage(sourceTransport, incomingAddress,
new BERInputStream(wholeMessage));
}
public void processMessage(TransportMapping sourceTransport,
Address incomingAddress,
BERInputStream wholeMessage) {
fireIncrementCounter(new CounterEvent(this, SnmpConstants.snmpInPkts));
if (!wholeMessage.markSupported()) {
String txt = "Message stream must support marks";
logger.error(txt);
throw new IllegalArgumentException(txt);
}
try {
wholeMessage.mark(16);
BER.MutableByte type = new BER.MutableByte();
// decode header but do not check length here, because we do only decode
// the first 16 bytes.
BER.decodeHeader(wholeMessage, type, false);
if (type.getValue() != BER.SEQUENCE) {
logger.error("ASN.1 parse error (message is not a sequence)");
CounterEvent event = new CounterEvent(this,
SnmpConstants.snmpInASNParseErrs);
fireIncrementCounter(event);
}
Integer32 version = new Integer32();
version.decodeBER(wholeMessage);
MessageProcessingModel mp = getMessageProcessingModel(version.getValue());
if (mp == null) {
logger.warn("SNMP version "+version+" is not supported");
CounterEvent event = new CounterEvent(this,
SnmpConstants.snmpInBadVersions);
fireIncrementCounter(event);
}
else {
// reset it
wholeMessage.reset();
// dispatch it
dispatchMessage(sourceTransport, mp, incomingAddress, wholeMessage);
}
}
catch (Exception ex) {
logger.error(ex);
if (logger.isDebugEnabled()) {
ex.printStackTrace();
}
if (SNMP4JSettings.isFowardRuntimeExceptions()) {
throw new RuntimeException(ex);
}
}
catch (OutOfMemoryError oex) {
logger.error(oex);
if (SNMP4JSettings.isFowardRuntimeExceptions()) {
throw oex;
}
}
}
public PduHandle sendPdu(Address transportAddress,
int messageProcessingModel,
int securityModel,
byte[] securityName,
int securityLevel,
PDU pdu,
boolean expectResponse) throws MessageException {
return sendPdu(null, transportAddress, messageProcessingModel,
securityModel, securityName, securityLevel,
pdu, expectResponse);
}
public PduHandle sendPdu(TransportMapping transport,
Address transportAddress,
int messageProcessingModel,
int securityModel,
byte[] securityName,
int securityLevel,
PDU pdu,
boolean expectResponse,
PduHandleCallback pduHandleCallback)
throws MessageException
{
try {
MessageProcessingModel mp =
getMessageProcessingModel(messageProcessingModel);
if (mp == null) {
throw new MessageException("Unsupported message processing model: "
+ messageProcessingModel);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -