📄 nioreceiver.java
字号:
// }
// }
// else
if ( key.interestOps() == 0 ) {
//check for keys that didn't make it in.
ObjectReader ka = (ObjectReader) key.attachment();
if ( ka != null ) {
long delta = now - ka.getLastAccess();
if (delta > (long) getTimeout() && (!ka.isAccessed())) {
log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess()));
// System.out.println("Interest:"+key.interestOps());
// System.out.println("Ready Ops:"+key.readyOps());
// System.out.println("Valid:"+key.isValid());
ka.setLastAccess(now);
//key.interestOps(SelectionKey.OP_READ);
}//end if
} else {
cancelledKey(key);
}//end if
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key);
}
}
}
/**
* get data from channel and store in byte array
* send it to cluster
* @throws IOException
* @throws java.nio.channels.ClosedChannelException
*/
protected void listen() throws Exception {
if (doListen()) {
log.warn("ServerSocketChannel already started");
return;
}
setListen(true);
while (doListen() && selector != null) {
// this may block for a long time, upon return the
// selected set contains keys of the ready channels
try {
events();
socketTimeouts();
int n = selector.select(getTcpSelectorTimeout());
if (n == 0) {
//there is a good chance that we got here
//because the TcpReplicationThread called
//selector wakeup().
//if that happens, we must ensure that that
//thread has enough time to call interestOps
// synchronized (interestOpsMutex) {
//if we got the lock, means there are no
//keys trying to register for the
//interestOps method
// }
continue; // nothing to do
}
// get an iterator over the set of selected keys
Iterator it = selector.selectedKeys().iterator();
// look at each key in the selected set
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
// Is a new connection coming in?
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.socket().setReceiveBufferSize(getRxBufSize());
channel.socket().setSendBufferSize(getTxBufSize());
channel.socket().setTcpNoDelay(getTcpNoDelay());
channel.socket().setKeepAlive(getSoKeepAlive());
channel.socket().setOOBInline(getOoBInline());
channel.socket().setReuseAddress(getSoReuseAddress());
channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime());
channel.socket().setTrafficClass(getSoTrafficClass());
channel.socket().setSoTimeout(getTimeout());
Object attach = new ObjectReader(channel);
registerChannel(selector,
channel,
SelectionKey.OP_READ,
attach);
}
// is there data to read on this channel?
if (key.isReadable()) {
readDataFromSocket(key);
} else {
key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
}
// remove key from selected set, it's been handled
it.remove();
}
} catch (java.nio.channels.ClosedSelectorException cse) {
// ignore is normal at shutdown or stop listen socket
} catch (java.nio.channels.CancelledKeyException nx) {
log.warn("Replication client disconnected, error when polling key. Ignoring client.");
} catch (Throwable x) {
try {
log.error("Unable to process request in NioReceiver", x);
}catch ( Throwable tx ) {
//in case an out of memory error, will affect the logging framework as well
tx.printStackTrace();
}
}
}
serverChannel.close();
if (selector != null)
selector.close();
}
/**
* Close Selector.
*
* @see org.apache.catalina.tribes.transport.ClusterReceiverBase#stopListening()
*/
protected void stopListening() {
setListen(false);
if (selector != null) {
try {
selector.wakeup();
selector.close();
} catch (Exception x) {
log.error("Unable to close cluster receiver selector.", x);
} finally {
selector = null;
}
}
}
// ----------------------------------------------------------
/**
* Register the given channel with the given selector for
* the given operations of interest
*/
protected void registerChannel(Selector selector,
SelectableChannel channel,
int ops,
Object attach) throws Exception {
if (channel == null)return; // could happen
// set the new channel non-blocking
channel.configureBlocking(false);
// register it with the selector
channel.register(selector, ops, attach);
}
/**
* Start thread and listen
*/
public void run() {
try {
listen();
} catch (Exception x) {
log.error("Unable to run replication listener.", x);
}
}
// ----------------------------------------------------------
/**
* Sample data handler method for a channel with data ready to read.
* @param key A SelectionKey object associated with a channel
* determined by the selector to be ready for reading. If the
* channel returns an EOF condition, it is closed here, which
* automatically invalidates the associated key. The selector
* will then de-register the channel on the next select call.
*/
protected void readDataFromSocket(SelectionKey key) throws Exception {
NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask();
if (task == null) {
// No threads/tasks available, do nothing, the selection
// loop will keep calling this method until a
// thread becomes available, the thread pool itself has a waiting mechanism
// so we will not wait here.
if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available");
} else {
// invoking this wakes up the worker thread then returns
//add task to thread pool
task.serviceChannel(key);
getExecutor().execute(task);
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -