📄 niosender.java
字号:
/* * Copyright 1999,2004 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.transport.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Arrays;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.transport.AbstractSender;import org.apache.catalina.tribes.transport.DataSender;/** * This class is NOT thread safe and should never be used with more than one thread at a time * * This is a state machine, handled by the process method * States are: * - NOT_CONNECTED -> connect() -> CONNECTED * - CONNECTED -> setMessage() -> READY TO WRITE * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE * - TRANSFER_COMPLETE -> CONNECTED * * @author Filip Hanik * @version 1.0 */public class NioSender extends AbstractSender implements DataSender{ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioSender.class); protected Selector selector; protected SocketChannel socketChannel; /* * STATE VARIABLES * */ protected ByteBuffer readbuf = null; protected ByteBuffer writebuf = null; protected byte[] current = null; protected XByteBuffer ackbuf = new XByteBuffer(128,true); protected int remaining = 0; protected boolean complete; protected boolean connecting = false; public NioSender(Member destination) throws UnknownHostException { super(destination); } /** * State machine to send data * @param key SelectionKey * @return boolean * @throws IOException */ public boolean process(SelectionKey key, boolean waitForAck) throws IOException { int ops = key.readyOps(); key.interestOps(key.interestOps() & ~ops); //in case disconnect has been called if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key."); if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled."); if ( key.isConnectable() ) { if ( socketChannel.finishConnect() ) { //we connected, register ourselves for writing setConnected(true); connecting = false; setRequestCount(0); setConnectTime(System.currentTimeMillis()); socketChannel.socket().setSendBufferSize(getTxBufSize()); socketChannel.socket().setReceiveBufferSize(getRxBufSize()); socketChannel.socket().setSoTimeout((int)getTimeout()); socketChannel.socket().setSoLinger(false,0); socketChannel.socket().setTcpNoDelay(getTcpNoDelay()); socketChannel.socket().setKeepAlive(getSoKeepAlive()); socketChannel.socket().setReuseAddress(getSoReuseAddress()); socketChannel.socket().setOOBInline(getOoBInline()); socketChannel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); socketChannel.socket().setTrafficClass(getSoTrafficClass()); if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); return false; } else { //wait for the connection to finish key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT); return false; }//end if } else if ( key.isWritable() ) { boolean writecomplete = write(key); if ( writecomplete ) { //we are completed, should we read an ack? if ( waitForAck ) { //register to read the ack key.interestOps(key.interestOps() | SelectionKey.OP_READ); } else { //if not, we are ready, setMessage will reregister us for another write interest //do a health check, we have no way of verify a disconnected //socket since we don't register for OP_READ on waitForAck=false read(key);//this causes overhead setRequestCount(getRequestCount()+1); return true; } } else { //we are not complete, lets write some more key.interestOps(key.interestOps()|SelectionKey.OP_WRITE); }//end if } else if ( key.isReadable() ) { boolean readcomplete = read(key); if ( readcomplete ) { setRequestCount(getRequestCount()+1); return true; } else { key.interestOps(key.interestOps() | SelectionKey.OP_READ); }//end if } else { //unknown state, should never happen log.warn("Data is in unknown state. readyOps="+ops); throw new IOException("Data is in unknown state. readyOps="+ops); }//end if return false; } protected boolean read(SelectionKey key) throws IOException { //if there is no message here, we are done if ( current == null ) return true; int read = socketChannel.read(readbuf); //end of stream if ( read == -1 ) throw new IOException("Unable to receive an ack message. EOF on socket channel has been reached."); //no data read else if ( read == 0 ) return false; readbuf.flip(); ackbuf.append(readbuf,read); readbuf.clear(); if (ackbuf.doesPackageExist() ) { boolean result = Arrays.equals(ackbuf.extractDataPackage(true),org.apache.catalina.tribes.transport.Constants.ACK_DATA); return result; } else { return false; } } protected boolean write(SelectionKey key) throws IOException { if ( (!isConnected()) || (this.socketChannel==null)) { throw new IOException("NioSender is not connected, this should not occur."); } if ( current != null ) { if ( remaining > 0 ) { //weve written everything, or we are starting a new package //protect against buffer overwrite int byteswritten = socketChannel.write(writebuf); remaining -= byteswritten; //if the entire message was written from the buffer //reset the position counter if ( remaining < 0 ) { remaining = 0; } } return (remaining==0); } //no message to send, we can consider that complete return true; } /** * connect - blocking in this operation * * @throws IOException * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method */ public synchronized void connect() throws IOException { if ( connecting ) return; connecting = true; if ( isConnected() ) throw new IOException("NioSender is already in connected state."); if ( readbuf == null ) { readbuf = getReadBuffer(); } else { readbuf.clear(); } if ( writebuf == null ) { writebuf = getWriteBuffer(); } else { writebuf.clear(); } InetSocketAddress addr = new InetSocketAddress(getAddress(),getPort()); if ( socketChannel != null ) throw new IOException("Socket channel has already been established. Connection might be in progress."); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(addr); socketChannel.register(getSelector(),SelectionKey.OP_CONNECT,this); } /** * disconnect * * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method */ public void disconnect() { try { connecting = false; setConnected(false); if ( socketChannel != null ) { try { try {socketChannel.socket().close();}catch ( Exception x){} //error free close, all the way //try {socket.shutdownOutput();}catch ( Exception x){} //try {socket.shutdownInput();}catch ( Exception x){} //try {socket.close();}catch ( Exception x){} try {socketChannel.close();}catch ( Exception x){} }finally { socketChannel = null; } } } catch ( Exception x ) { log.error("Unable to disconnect NioSender. msg="+x.getMessage()); if ( log.isDebugEnabled() ) log.debug("Unable to disconnect NioSender. msg="+x.getMessage(),x); } finally { } } public void reset() { if ( isConnected() && readbuf == null) { readbuf = getReadBuffer(); } if ( readbuf != null ) readbuf.clear(); if ( writebuf != null ) writebuf.clear(); current = null; ackbuf.clear(); remaining = 0; complete = false; setAttempt(0); setRequestCount(0); setConnectTime(-1); } private ByteBuffer getReadBuffer() { return getBuffer(getRxBufSize()); } private ByteBuffer getWriteBuffer() { return getBuffer(getTxBufSize()); } private ByteBuffer getBuffer(int size) { return (getDirectBuffer()?ByteBuffer.allocateDirect(size):ByteBuffer.allocate(size)); } /** * sendMessage * * @param data ChannelMessage * @throws IOException * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method */ public synchronized void setMessage(byte[] data) throws IOException { setMessage(data,0,data.length); } public synchronized void setMessage(byte[] data,int offset, int length) throws IOException { if ( data != null ) { current = data; remaining = length; ackbuf.clear(); if ( writebuf != null ) writebuf.clear(); else writebuf = getBuffer(length); if ( writebuf.capacity() < length ) writebuf = getBuffer(length); writebuf.put(data,offset,length); writebuf.rewind(); //set the limit so that we don't write non wanted data writebuf.limit(length); if (isConnected()) { socketChannel.register(getSelector(), SelectionKey.OP_WRITE, this); } } } public byte[] getMessage() { return current; } public boolean isComplete() { return complete; } public Selector getSelector() { return selector; } public void setSelector(Selector selector) { this.selector = selector; } public void setComplete(boolean complete) { this.complete = complete; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -