📄 defaulttcptransportmapping.java
字号:
/*_############################################################################
_##
_## SNMP4J - DefaultTcpTransportMapping.java
_##
_## Copyright 2003-2005 Frank Fock and Jochen Katz (SNMP4J.org)
_##
_## Licensed under the Apache License, Version 2.0 (the "License");
_## you may not use this file except in compliance with the License.
_## You may obtain a copy of the License at
_##
_## http://www.apache.org/licenses/LICENSE-2.0
_##
_## Unless required by applicable law or agreed to in writing, software
_## distributed under the License is distributed on an "AS IS" BASIS,
_## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
_## See the License for the specific language governing permissions and
_## limitations under the License.
_##
_##########################################################################*/
package org.snmp4j.transport;
import org.snmp4j.*;
import org.snmp4j.smi.*;
import org.snmp4j.log.*;
import java.util.Hashtable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.util.Set;
import java.util.Iterator;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.net.SocketException;
import java.util.TimerTask;
import java.util.Timer;
import java.util.LinkedList;
import org.snmp4j.transport.DefaultTcpTransportMapping.SocketEntry;
import org.snmp4j.asn1.BER;
import org.snmp4j.asn1.BER.MutableByte;
import org.snmp4j.asn1.BERInputStream;
/**
* The <code>DefaultTcpTransportMapping</code> implements a TCP transport
* mapping with the Java 1.4 new IO API.
* <p>
* It uses a single thread for processing incoming and outgoing messages.
* The thread is started when the <code>listen</code> method is called, or
* when an outgoing request is sent using the <code>sendMessage</code> method.
*
*
* @author Frank Fock
* @version 1.5
*/
public class DefaultTcpTransportMapping extends TcpTransportMapping {
private static final LogAdapter logger =
LogFactory.getLogger(DefaultTcpTransportMapping.class);
private Hashtable sockets = new Hashtable();
private ServerThread server;
private Timer socketCleaner;
// 1 minute default timeout
private long connectionTimeout = 60000;
private boolean serverEnabled = false;
/**
* Creates a default TCP transport mapping with the server for incoming
* messages disabled.
* @throws UnknownHostException
* @throws IOException
* on failure of binding a local port.
*/
public DefaultTcpTransportMapping() throws UnknownHostException, IOException {
super(new TcpAddress(InetAddress.getLocalHost(), 0));
}
/**
* Creates a default TCP transport mapping that binds to the given address
* (interface) on the local host.
*
* @param serverAddress
* the TcpAddress instance that describes the server address to listen
* on incoming connection requests.
* @throws UnknownHostException
* if the specified interface does not exist.
* @throws IOException
* if the given address cannot be bound.
*/
public DefaultTcpTransportMapping(TcpAddress serverAddress)
throws UnknownHostException, IOException
{
super(serverAddress);
this.serverEnabled = true;
}
/**
* Listen for incoming and outgoing requests. If the <code>serverEnabled</code>
* member is <code>false</code> the server for incoming requests is not
* started. This starts the internal server thread that processes messages.
* @throws SocketException
* when the transport is already listening for incoming/outgoing messages.
* @throws IOException
*/
public void listen() throws java.io.IOException {
if (server != null) {
throw new SocketException("Port already listening");
}
server = new ServerThread();
socketCleaner = new Timer(true); // run as daemon
server.setDaemon(true);
server.start();
}
/**
* Changes the priority of the server thread for this TCP transport mapping.
* This method has no effect, if called before {@link #listen()} has been
* called for this transport mapping.
*
* @param newPriority
* the new priority.
* @see Thread#setPriority
* @since 1.2.2
*/
public void setPriority(int newPriority) {
if (server != null) {
server.setPriority(newPriority);
}
}
/**
* Returns the priority of the internal listen thread.
* @return
* a value between {@link Thread#MIN_PRIORITY} and
* {@link Thread#MAX_PRIORITY}.
* @since 1.2.2
*/
public int getPriority() {
if (server != null) {
return server.getPriority();
}
else {
return Thread.NORM_PRIORITY;
}
}
/**
* Closes all open sockets and stops the internal server thread that
* processes messages.
*/
public synchronized void close() {
if (server != null) {
server.close();
try {
server.join();
}
catch (InterruptedException ex) {
logger.warn(ex);
}
server = null;
for (Iterator it = sockets.values().iterator(); it.hasNext(); ) {
SocketEntry entry = (SocketEntry)it.next();
try {
synchronized (entry) {
entry.getSocket().close();
}
logger.debug("Socket to "+entry.getPeerAddress()+" closed");
}
catch (IOException iox) {
// ingore
logger.debug(iox);
}
}
if (socketCleaner != null) {
socketCleaner.cancel();
}
socketCleaner = null;
}
}
/**
* Sends a SNMP message to the supplied address.
* @param address
* an <code>TcpAddress</code>. A <code>ClassCastException</code> is thrown
* if <code>address</code> is not a <code>TcpAddress</code> instance.
* @param message byte[]
* the message to sent.
* @throws IOException
*/
public void sendMessage(Address address, byte[] message)
throws java.io.IOException
{
if (server == null) {
listen();
}
server.sendMessage(address, message);
}
/**
* Gets the connection timeout. This timeout specifies the time a connection
* may be idle before it is closed.
* @return long
* the idle timeout in milliseconds.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}
/**
* Sets the connection timeout. This timeout specifies the time a connection
* may be idle before it is closed.
* @param connectionTimeout
* the idle timeout in milliseconds.
*/
public void setConnectionTimeout(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
/**
* Checks whether a server for incoming requests is enabled.
* @return boolean
*/
public boolean isServerEnabled() {
return serverEnabled;
}
/**
* Sets whether a server for incoming requests should be created when
* the transport is set into listen state. Setting this value has no effect
* until the {@link #listen()} method is called (if the transport is already
* listening, {@link #close()} has to be called before).
* @param serverEnabled
* if <code>true</code> if the transport will listens for incoming
* requests after {@link #listen()} has been called.
*/
public void setServerEnabled(boolean serverEnabled) {
this.serverEnabled = serverEnabled;
}
/**
* Gets the inbound buffer size for incoming requests. When SNMP packets are
* received that are longer than this maximum size, the messages will be
* silently dropped and the connection will be closed.
* @return
* the maximum inbound buffer size in bytes.
*/
public int getMaxInboundMessageSize() {
return super.getMaxInboundMessageSize();
}
/**
* Sets the maximum buffer size for incoming requests. When SNMP packets are
* received that are longer than this maximum size, the messages will be
* silently dropped and the connection will be closed.
* @param maxInboundMessageSize
* the length of the inbound buffer in bytes.
*/
public void setMaxInboundMessageSize(int maxInboundMessageSize) {
this.maxInboundMessageSize = maxInboundMessageSize;
}
private synchronized void timeoutSocket(SocketEntry entry) {
socketCleaner.schedule(new SocketTimeout(entry), connectionTimeout);
}
public boolean isListening() {
return (server != null);
}
class SocketEntry {
private Socket socket;
private TcpAddress peerAddress;
private long lastUse;
private LinkedList message = new LinkedList();
private ByteBuffer readBuffer = null;
public SocketEntry(TcpAddress address, Socket socket) {
this.peerAddress = address;
this.socket = socket;
this.lastUse = System.currentTimeMillis();
}
public long getLastUse() {
return lastUse;
}
public void used() {
lastUse = System.currentTimeMillis();
}
public Socket getSocket() {
return socket;
}
public TcpAddress getPeerAddress() {
return peerAddress;
}
public synchronized void addMessage(byte[] message) {
this.message.add(message);
}
public byte[] nextMessage() {
if (this.message.size() > 0) {
return (byte[])this.message.removeFirst();
}
return null;
}
public void setReadBuffer(ByteBuffer byteBuffer) {
this.readBuffer = byteBuffer;
}
public ByteBuffer getReadBuffer() {
return readBuffer;
}
}
class SocketTimeout extends TimerTask {
private SocketEntry entry;
public SocketTimeout(SocketEntry entry) {
this.entry = entry;
}
/**
* run
*/
public void run() {
long now = System.currentTimeMillis();
if ((socketCleaner == null) ||
(now - entry.getLastUse() >= connectionTimeout)) {
if (logger.isDebugEnabled()) {
logger.debug("Socket has not been used for "+
(now - entry.getLastUse())+
" micro seconds, closing it");
}
sockets.remove(entry.getPeerAddress());
try {
synchronized (entry) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -