📄 defaulttcptransportmapping.java
字号:
public void setServerEnabled(boolean serverEnabled) {
this.serverEnabled = serverEnabled;
}
/**
* Sets the message length decoder. Default message length decoder is the
* {@link SnmpMesssageLengthDecoder}. The message length decoder must be
* able to decode the total length of a message for this transport mapping
* protocol(s).
* @param messageLengthDecoder
* a <code>MessageLengthDecoder</code> instance.
*/
public void setMessageLengthDecoder(MessageLengthDecoder messageLengthDecoder) {
if (messageLengthDecoder == null) {
throw new NullPointerException();
}
this.messageLengthDecoder = messageLengthDecoder;
}
/**
* Gets the inbound buffer size for incoming requests. When SNMP packets are
* received that are longer than this maximum size, the messages will be
* silently dropped and the connection will be closed.
* @return
* the maximum inbound buffer size in bytes.
*/
public int getMaxInboundMessageSize() {
return super.getMaxInboundMessageSize();
}
/**
* Sets the maximum buffer size for incoming requests. When SNMP packets are
* received that are longer than this maximum size, the messages will be
* silently dropped and the connection will be closed.
* @param maxInboundMessageSize
* the length of the inbound buffer in bytes.
*/
public void setMaxInboundMessageSize(int maxInboundMessageSize) {
this.maxInboundMessageSize = maxInboundMessageSize;
}
private synchronized void timeoutSocket(SocketEntry entry) {
if (connectionTimeout > 0) {
socketCleaner.schedule(new SocketTimeout(entry), connectionTimeout);
}
}
public boolean isListening() {
return (server != null);
}
class SocketEntry {
private Socket socket;
private TcpAddress peerAddress;
private long lastUse;
private LinkedList message = new LinkedList();
private ByteBuffer readBuffer = null;
public SocketEntry(TcpAddress address, Socket socket) {
this.peerAddress = address;
this.socket = socket;
this.lastUse = System.currentTimeMillis();
}
public long getLastUse() {
return lastUse;
}
public void used() {
lastUse = System.currentTimeMillis();
}
public Socket getSocket() {
return socket;
}
public TcpAddress getPeerAddress() {
return peerAddress;
}
public synchronized void addMessage(byte[] message) {
this.message.add(message);
}
public byte[] nextMessage() {
if (this.message.size() > 0) {
return (byte[])this.message.removeFirst();
}
return null;
}
public void setReadBuffer(ByteBuffer byteBuffer) {
this.readBuffer = byteBuffer;
}
public ByteBuffer getReadBuffer() {
return readBuffer;
}
public String toString() {
return "SocketEntry[peerAddress="+peerAddress+
",socket="+socket+",lastUse="+new Date(lastUse)+"]";
}
}
public static class SnmpMesssageLengthDecoder implements MessageLengthDecoder {
public int getMinHeaderLength() {
return MIN_SNMP_HEADER_LENGTH;
}
public MessageLength getMessageLength(ByteBuffer buf) throws IOException {
MutableByte type = new MutableByte();
BERInputStream is = new BERInputStream(buf);
int ml = BER.decodeHeader(is, type);
int hl = (int)is.getPosition();
MessageLength messageLength = new MessageLength(hl, ml);
return messageLength;
}
}
class SocketTimeout extends TimerTask {
private SocketEntry entry;
public SocketTimeout(SocketEntry entry) {
this.entry = entry;
}
/**
* run
*/
public void run() {
long now = System.currentTimeMillis();
if ((socketCleaner == null) ||
(now - entry.getLastUse() >= connectionTimeout)) {
if (logger.isDebugEnabled()) {
logger.debug("Socket has not been used for "+
(now - entry.getLastUse())+
" micro seconds, closing it");
}
sockets.remove(entry.getPeerAddress());
try {
synchronized (entry) {
entry.getSocket().close();
}
logger.info("Socket to "+entry.getPeerAddress()+
" closed due to timeout");
}
catch (IOException ex) {
logger.error(ex);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling " +
((entry.getLastUse() + connectionTimeout) - now));
}
socketCleaner.schedule(new SocketTimeout(entry),
(entry.getLastUse() + connectionTimeout) - now);
}
}
}
class ServerThread extends Thread {
private byte[] buf;
private volatile boolean stop = false;
private Throwable lastError = null;
private ServerSocketChannel ssc;
private Selector selector;
private LinkedList pending = new LinkedList();
public ServerThread() throws IOException {
setName("DefaultTCPTransportMapping_"+getAddress());
buf = new byte[getMaxInboundMessageSize()];
// Selector for incoming requests
selector = Selector.open();
if (serverEnabled) {
// Create a new server socket and set to non blocking mode
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// Bind the server socket
InetSocketAddress isa = new InetSocketAddress(tcpAddress.getInetAddress(),
tcpAddress.getPort());
ssc.socket().bind(isa);
// Register accepts on the server socket with the selector. This
// step tells the selector that the socket wants to be put on the
// ready list when accept operations occur, so allowing multiplexed
// non-blocking I/O to take place.
ssc.register(selector, SelectionKey.OP_ACCEPT);
}
}
private void processPending() {
synchronized (pending) {
for (int i=0; i<pending.size(); i++) {
SocketEntry entry = (SocketEntry)pending.getFirst();
try {
// Register the channel with the selector, indicating
// interest in connection completion and attaching the
// target object so that we can get the target back
// after the key is added to the selector's
// selected-key set
if (entry.getSocket().isConnected()) {
entry.getSocket().getChannel().register(selector,
SelectionKey.OP_WRITE);
}
else {
entry.getSocket().getChannel().register(selector,
SelectionKey.OP_CONNECT);
}
}
catch (IOException iox) {
logger.error(iox);
// Something went wrong, so close the channel and
// record the failure
try {
entry.getSocket().getChannel().close();
TransportStateEvent e =
new TransportStateEvent(DefaultTcpTransportMapping.this,
entry.getPeerAddress(),
TransportStateEvent.STATE_CLOSED,
iox);
fireConnectionStateChanged(e);
}
catch (IOException ex) {
logger.error(ex);
}
lastError = iox;
if (SNMP4JSettings.isFowardRuntimeExceptions()) {
throw new RuntimeException(iox);
}
}
}
}
}
public Throwable getLastError() {
return lastError;
}
public void sendMessage(Address address, byte[] message)
throws java.io.IOException
{
Socket s = null;
SocketEntry entry = (SocketEntry) sockets.get(address);
if (logger.isDebugEnabled()) {
logger.debug("Looking up connection for destination '"+address+
"' returned: "+entry);
logger.debug(sockets.toString());
}
if (entry != null) {
s = entry.getSocket();
}
if ((s == null) || (s.isClosed()) || (!s.isConnected())) {
if (logger.isDebugEnabled()) {
logger.debug("Socket for address '"+address+
"' is closed, opening it...");
}
SocketChannel sc = null;
try {
// Open the channel, set it to non-blocking, initiate connect
sc = SocketChannel.open();
sc.configureBlocking(false);
sc.connect(new InetSocketAddress(((TcpAddress)address).getInetAddress(),
((TcpAddress)address).getPort()));
s = sc.socket();
entry = new SocketEntry((TcpAddress)address, s);
entry.addMessage(message);
sockets.put(address, entry);
synchronized (pending) {
pending.add(entry);
}
selector.wakeup();
logger.debug("Trying to connect to "+address);
}
catch (IOException iox) {
logger.error(iox);
throw iox;
}
}
else {
entry.addMessage(message);
synchronized (pending) {
pending.add(entry);
}
selector.wakeup();
}
}
public void run() {
// Here's where everything happens. The select method will
// return when any operations registered above have occurred, the
// thread has been interrupted, etc.
try {
while (!stop) {
try {
if (selector.select() > 0) {
if (stop) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -