📄 defaultudptransportmapping.java
字号:
/*_############################################################################
_##
_## SNMP4J - DefaultUdpTransportMapping.java
_##
_## Copyright 2003-2005 Frank Fock and Jochen Katz (SNMP4J.org)
_##
_## Licensed under the Apache License, Version 2.0 (the "License");
_## you may not use this file except in compliance with the License.
_## You may obtain a copy of the License at
_##
_## http://www.apache.org/licenses/LICENSE-2.0
_##
_## Unless required by applicable law or agreed to in writing, software
_## distributed under the License is distributed on an "AS IS" BASIS,
_## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_## See the License for the specific language governing permissions and
_## limitations under the License.
_##
_##########################################################################*/
package org.snmp4j.transport;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import org.snmp4j.log.*;
import org.snmp4j.MessageDispatcher;
import org.snmp4j.asn1.BERInputStream;
import org.snmp4j.smi.*;
/**
* The <code>DefaultUdpTransportMapping</code> implements a UDP transport
* mapping based on Java standard IO and using an internal thread for
* listening on the inbound socket.
*
* @author Frank Fock
* @version 1.5
*/
public class DefaultUdpTransportMapping extends UdpTransportMapping {
private static final LogAdapter logger =
LogFactory.getLogger(DefaultUdpTransportMapping.class);
protected DatagramSocket socket = null;
protected ListenThread listener;
private int socketTimeout = 1000;
private int receiveBufferSize = 0; // not set by default
public DefaultUdpTransportMapping() throws IOException {
super(new UdpAddress(InetAddress.getLocalHost(), 0));
socket = new DatagramSocket(udpAddress.getPort());
}
public DefaultUdpTransportMapping(UdpAddress udpAddress) throws IOException {
super(udpAddress);
socket = new DatagramSocket(udpAddress.getPort(),
udpAddress.getInetAddress());
}
public void sendMessage(Address targetAddress, byte[] message)
throws java.io.IOException
{
/**@todo is synchronization needed here? Queuing? */
InetSocketAddress targetSocketAddress =
new InetSocketAddress(((UdpAddress)targetAddress).getInetAddress(),
((UdpAddress)targetAddress).getPort());
if (logger.isDebugEnabled()) {
logger.debug("Sending message to "+targetAddress+" with length "+
message.length+": "+
new OctetString(message).toHexString());
}
socket.send(new DatagramPacket(message, message.length,
targetSocketAddress));
}
/**
* Closes the socket and stops the listener thread.
*
* @throws IOException
*/
public void close() throws IOException {
if (listener != null) {
listener.close();
listener.interrupt();
if (socketTimeout > 0) {
try {
listener.join();
}
catch (InterruptedException ex) {
logger.warn(ex);
}
}
listener = null;
}
if (!socket.isClosed()) {
socket.close();
}
}
/**
* Starts the listener thread that accepts incoming messages. The thread is
* started in daemon mode and thus it will not block application terminated.
* Nevertheless, the {@link #close()} method should be called to stop the
* listen thread gracefully and free associated ressources.
*
* @throws IOException
*/
public void listen() throws IOException {
if (listener != null) {
throw new SocketException("Port already listening");
}
listener = new ListenThread();
// set daemon mode
listener.setDaemon(true);
listener.start();
}
/**
* Changes the priority of the listen thread for this UDP transport mapping.
* This method has no effect, if called before {@link #listen()} has been
* called for this transport mapping.
*
* @param newPriority
* the new priority.
* @see Thread#setPriority
* @since 1.2.2
*/
public void setPriority(int newPriority) {
if (listener != null) {
listener.setPriority(newPriority);
}
}
/**
* Returns the priority of the internal listen thread.
* @return
* a value between {@link Thread#MIN_PRIORITY} and
* {@link Thread#MAX_PRIORITY}.
* @since 1.2.2
*/
public int getPriority() {
if (listener != null) {
return listener.getPriority();
}
else {
return Thread.NORM_PRIORITY;
}
}
public void setMaxInboundMessageSize(int maxInboundMessageSize) {
this.maxInboundMessageSize = maxInboundMessageSize;
}
public int getSocketTimeout() {
return socketTimeout;
}
/**
* Gets the requested receive buffer size for the underlying UDP socket.
* This size might not reflect the actual size of the receive buffer, which
* is implementation specific.
* @return
* <=0 if the default buffer size of the OS is used, or a value >0 if the
* user specified a buffer size.
*/
public int getReceiveBufferSize() {
return receiveBufferSize;
}
/**
* Sets the receive buffer size, which should be > the maximum inbound message
* size. This method has to be called before {@link #listen()} to be
* effective.
* @param receiveBufferSize
* an integer value >0 and > {@link #getMaxInboundMessageSize()}.
*/
public void setReceiveBufferSize(int receiveBufferSize) {
if (receiveBufferSize <= 0) {
throw new IllegalArgumentException("Receive buffer size must be > 0");
}
this.receiveBufferSize = receiveBufferSize;
}
public void setSocketTimeout(int socketTimeout) {
this.socketTimeout = socketTimeout;
}
public boolean isListening() {
return (listener != null);
}
class ListenThread extends Thread {
private byte[] buf;
private volatile boolean stop = false;
public ListenThread() throws SocketException {
buf = new byte[getMaxInboundMessageSize()];
}
public void run() {
try {
socket.setSoTimeout(getSocketTimeout());
if (receiveBufferSize > 0) {
socket.setReceiveBufferSize(Math.max(receiveBufferSize,
maxInboundMessageSize));
}
if (logger.isInfoEnabled()) {
logger.info("UDP receive buffer size for socket " +
getAddress() + " is set to: " +
socket.getReceiveBufferSize());
}
}
catch (SocketException ex) {
logger.error(ex);
setSocketTimeout(0);
}
while (!stop) {
DatagramPacket packet = new DatagramPacket(buf, buf.length,
udpAddress.getInetAddress(),
udpAddress.getPort());
try {
socket.receive(packet);
if (logger.isDebugEnabled()) {
logger.debug("Received message from "+packet.getAddress()+"/"+
packet.getPort()+
" with length "+packet.getLength()+": "+
new OctetString(packet.getData(), 0,
packet.getLength()).toHexString());
}
for (int i=0; i<messageDispatcher.size(); i++) {
MessageDispatcher dispatcher;
synchronized (DefaultUdpTransportMapping.this) {
dispatcher = (MessageDispatcher) messageDispatcher.get(i);
}
ByteBuffer bis;
// If messages are processed asynchronously (i.e. multi-threaded)
// then we have to copy the buffer's content here!
if (isAsyncMsgProcessingSupported()) {
byte[] bytes = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, bytes, 0, bytes.length);
bis = ByteBuffer.wrap(bytes);
}
else {
bis = ByteBuffer.wrap(packet.getData());
}
dispatcher.processMessage(DefaultUdpTransportMapping.this,
new UdpAddress(packet.getAddress(),
packet.getPort()),
new BERInputStream(bis));
}
}
catch (SocketTimeoutException stex) {
// ignore
}
catch (PortUnreachableException purex) {
listener = null;
logger.error(purex);
if (logger.isDebugEnabled()) {
purex.printStackTrace();
}
/**@todo better error handling */
}
catch (IOException iox) {
logger.warn(iox);
if (logger.isDebugEnabled()) {
iox.printStackTrace();
}
}
}
socket.close();
}
public void close() {
stop = true;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -