📄 nioendpoint.java
字号:
return workerThread;
}
/**
* Recycle the specified Processor so that it can be used again.
*
* @param workerThread The processor to be recycled
*/
protected void recycleWorkerThread(Worker workerThread) {
synchronized (workers) {
workers.push(workerThread);
curThreadsBusy--;
workers.notify();
}
}
protected boolean processSocket(SocketChannel socket) {
try {
if (executor == null) {
getWorkerThread().assign(socket);
} else {
executor.execute(new SocketOptionsProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
/**
* Process given socket.
*/
protected boolean processSocket(NioChannel socket) {
try {
if (executor == null) {
getWorkerThread().assign(socket);
} else {
executor.execute(new SocketProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
/**
* Process given socket for an event.
*/
protected boolean processSocket(NioChannel socket, SocketStatus status) {
try {
if (executor == null) {
getWorkerThread().assign(socket, status);
} else {
executor.execute(new SocketEventProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
// --------------------------------------------------- Acceptor Inner Class
/**
* Server socket acceptor thread.
*/
protected class Acceptor implements Runnable {
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
try {
// Accept the next incoming connection from the server socket
SocketChannel socket = serverSock.accept();
// Hand this socket off to an appropriate processor
if ( running && (!paused) && socket != null ) processSocket(socket);
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
// ----------------------------------------------------- Poller Inner Classes
/**
*
* PollerEvent, cacheable object for poller events to avoid GC
*/
public class PollerEvent implements Runnable {
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
reset(ch, k, intOps);
}
public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
key = k;
}
public void reset() {
reset(null, null, 0);
}
public void run() {
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
final KeyAttachment att = (KeyAttachment) key.attachment();
try {
if (key != null) {
key.interestOps(interestOps);
att.interestOps(interestOps);
}
}
catch (CancelledKeyException ckx) {
try {
if (key != null && key.attachment() != null) {
KeyAttachment ka = (KeyAttachment) key.attachment();
ka.setError(true); //set to collect this socket immediately
}
try {
socket.close();
}
catch (Exception ignore) {}
if (socket.isOpen())
socket.close(true);
}
catch (Exception ignore) {}
}
}//end if
}//run
public String toString() {
return super.toString()+"[intOps="+this.interestOps+"]";
}
}
/**
* Poller class.
*/
public class Poller implements Runnable {
protected Selector selector;
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
protected boolean close = false;
protected long nextExpiration = 0;//optimize expiration handling
protected int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
protected AtomicLong wakeupCounter = new AtomicLong(0l);
public Poller() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector() { return selector;}
/**
* Create the poller. With some versions of APR, the maximum poller size will
* be 62 (reocmpiling APR is necessary to remove this limitation).
*/
protected void init() {
keepAliveCount = 0;
}
/**
* Destroy the poller.
*/
protected void destroy() {
// Wait for polltime before doing anything, so that the poller threads
// exit, otherwise parallel descturction of sockets which are still
// in the poller can cause problems
close = true;
events.clear();
selector.wakeup();
}
public void addEvent(Runnable event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() < 3 ) selector.wakeup();
}
/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
* of time equal to pollTime (in most cases, latency will be much lower,
* however).
*
* @param socket to add to the poller
*/
public void add(final NioChannel socket) {
add(socket,SelectionKey.OP_READ);
}
public void add(final NioChannel socket, final int interestOps) {
PollerEvent r = eventCache.poll();
if ( r==null) r = new PollerEvent(socket,null,interestOps);
else r.reset(socket,null,interestOps);
addEvent(r);
}
public boolean events() {
boolean result = false;
//synchronized (events) {
Runnable r = null;
result = (events.size() > 0);
while ( (r = (Runnable)events.poll()) != null ) {
try {
r.run();
if ( r instanceof PollerEvent ) {
((PollerEvent)r).reset();
eventCache.offer((PollerEvent)r);
}
} catch ( Exception x ) {
log.error("",x);
}
}
//events.clear();
//}
return result;
}
public void register(final NioChannel socket)
{
socket.setPoller(this);
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment();
ka.reset(this,socket);
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
public void cancelledKey(SelectionKey key, SocketStatus status) {
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if (ka != null && ka.getComet()) {
//the comet event takes care of clean up
processSocket(ka.getChannel(), status);
}else {
if (key.isValid()) key.cancel();
if (key.channel().isOpen()) key.channel().close();
key.attach(null);
}
} catch (Throwable e) {
if ( log.isDebugEnabled() ) log.error("",e);
// Ignore
}
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
boolean hasEvents = false;
hasEvents = (hasEvents | events());
// Time to terminate?
if (close) return;
int keyCount = 0;
try {
keyCount = selector.select(selectorTimeout);
wakeupCounter.set(0);
if ( close ) { selector.close(); return; }
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch ( CancelledKeyException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch (Throwable x) {
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey) iterator.next();
iterator.remove();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
try {
if ( sk.isValid() && attachment != null ) {
attachment.access();
sk.attach(attachment);
sk.interestOps(0); //this is a must, so that we don't have multiple threads messing with the socket
attachment.interestOps(0);
NioChannel channel = attachment.getChannel();
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getComet() ) {
if (!processSocket(channel, SocketStatus.OPEN))
processSocket(channel, SocketStatus.DISCONNECT);
} else {
boolean close = (!processSocket(channel));
if ( close ) {
channel.close();
channel.getIOChannel().socket().close();
}
}
}
} else {
//invalid key
cancelledKey(sk, SocketStatus.ERROR);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk, SocketStatus.ERROR);
} catch (Throwable t) {
log.error("",t);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
synchronized (this) {
this.notifyAll();
}
}
protected void timeout(int keyCount, boolean hasEvents) {
long now = System.currentTimeMillis();
//don't process timeouts too frequently, but if the selector simply timed out
//then we can check timeouts to avoid gaps
if ( (now < nextExpiration) && (keyCount>0 || hasEvents) ) return;
nextExpiration = now + (long)socketProperties.getSoTimeout();
//timeout
Set<SelectionKey> keys = selector.keys();
for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) {
SelectionKey key = iter.next();
try {
KeyAttachment ka = (KeyAttachment) key.attachment();
if ( ka == null ) {
cancelledKey(key, SocketStatus.ERROR); //we don't support any keys without attachments
} else if ( ka.getError() ) {
cancelledKey(key, SocketStatus.DISCONNECT);
}else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//only timeout sockets that we are waiting for a read from
long delta = now - ka.getLastAccess();
long timeout = (ka.getTimeout()==-1)?((long) socketProperties.getSoTimeout()):(ka.getTimeout());
boolean isTimedout = delta > timeout;
if (isTimedout) {
key.interestOps(0);
ka.interestOps(0); //avoid duplicate timeout calls
cancelledKey(key, SocketStatus.TIMEOUT);
} else {
long nextTime = now+(timeout-delta);
nextExpiration = (nextTime < nextExpiration)?nextTime:nextExpiration;
}
}//end if
}catch ( CancelledKeyException ckx ) {
cancelledKey(key, SocketStatus.ERROR);
}
}//for
}
}
public static class KeyAttachment {
public KeyAttachment() {
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -