📄 centralselector.java
字号:
} else if (!Server.srv.USE_CENTRAL_REQUESTQUEUE
&& !RequestReader.processRequest(sk)) {
implCloseChannel (sk);
if (nextUnavailableMessage >= System.currentTimeMillis())
return;
cb.logError ("No available requestreader");
Server.log (this, "readIn: No availabel requestreader to process request", Server.MSG_ERROR, Server.LVL_MAJOR);
nextUnavailableMessage += 1000;
}
}
private void implCloseChannel (SelectionKey sk) {
try {
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
if (cb != null) {
cb.invalidate();
User u = cb.getUser();
if (u!=null && sk.equals(u.getKey()) && !u.isRemoving() && !u.isLoggedOut()) {
StringBuffer sb = new StringBuffer ("implCloseChannel: droped key for user ").append (u.getName ());
Server.log ("static CentralSelector", sb.toString (), Server.MSG_STATE, Server.LVL_VERBOSE);
u.scheduleToRemove();
}
}
SocketChannel sc = (SocketChannel) sk.channel();
Responder.res.dropChannel(sc);
synchronized (sc) {
Socket s = sc.socket();
s.close();
sc.close();
}
sk.cancel();
} catch (Exception e) {
Server.debug (this, "closeChannel: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
sk.cancel();
}
}
public static void dropKey (SelectionKey sk) {
if (sk == null) return;
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
if (cb != null) {
cb.invalidate();
}
addToDropKeys (sk);
}
public static void dropChannel (SocketChannel sc) {
SelectionKey sk = sc.keyFor(cSel.sel);
if (sk == null) {
try {
sc.close();
} catch (IOException e) {
Server.debug ("static CentralSelector", "dropChannle:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
return;
}
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
if (cb != null) {
cb.invalidate();
}
addToDropKeys (sk);
}
private static void addToDropKeys (SelectionKey sk) {
long now = System.currentTimeMillis();
long stop = now + 5000;
synchronized (cSel.dropKeys) {
boolean success=cSel.dropKeys.put(sk);
while (!success && stop > now) {
try {
now = System.currentTimeMillis();
long waitTime = stop - now;
if (waitTime > 32)
cSel.dropKeys.wait(stop - now);
} catch (InterruptedException ie) { }
success=cSel.dropKeys.put(sk);
}
if (!success)
Server.log("static CentralSelector", "dropKey: unable to add dropkey", Server.MSG_ERROR, Server.LVL_MAJOR);
cSel.dropKeys.notify();
}
}
public static boolean isSkValid (SelectionKey sk) {
if (!chkSk(sk)) {
if (sk != null && cSel.equals(sk.selector()))
dropKey (sk);
return false;
}
return true;
}
private static boolean chkSk (SelectionKey sk) {
if (sk == null)
return false;
try {
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
if (cb == null || !cb.isValid())
return false;
if (!sk.isValid() || !sk.channel().isOpen()) {
cb.invalidate();
return false;
}
Socket s = ((SocketChannel) sk.channel()).socket();
if (s.isInputShutdown() || s.isOutputShutdown()) {
cb.invalidate();
return false;
}
if (cb != null) {
if (!cb.isValid())
return false;
}
} catch (Exception e) {
Server.debug ("static CentralSelector", "SelectionKey-Check:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
return false;
}
return true;
}
private boolean addRequestToQueue(SelectionKey sk) {
long stop = System.currentTimeMillis() + 1000;
boolean success=false;
try {
synchronized (this.reqQueue) {
if (reqQueue.contains(sk))
return true;
success = reqQueue.put(sk);
while (!success
&& stop > System.currentTimeMillis()) {
this.reqQueue.wait(stop - System.currentTimeMillis());
success = reqQueue.put(sk);
}
if (success)
this.reqQueue.notify();
}
} catch (Exception e) {
Server.debug (this, "addRequestToQueue caused exception:", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
if (reqQueue.size() > ((this.reqQueue.capacity()/1.5)/RequestReader.activeReaders()))
RequestReader.startRequestReader(false);
return success;
}
public boolean equals (Object o) {
return o instanceof Selector && o.equals(sel);
}
public String toString() {
return "[CentralSelector]";
}
private class KeepAliveTimeoutChecker implements Runnable {
private short loglvl = Server.LVL_VERY_VERBOSE;
KeepAliveTimeoutChecker() { }
public void run () {
long nextCheck = 0;
while (Server.srv.isRunning()) {
long now = System.currentTimeMillis();
if (nextCheck>now) {
long diff = Math.max(nextCheck - now, 33);
Server.log("KeepAliveCheck", "sleeping for " + diff + " millis", Server.MSG_STATE, loglvl);
try {
Thread.sleep(diff);
} catch (InterruptedException ie) { /* ok */ }
now = System.currentTimeMillis();
}
nextCheck = now + Server.srv.KEEP_ALIVE_TIMEOUT;
SelectionKey[] checkArr;
Server.log("KeepAliveCheck", "sync on selector", Server.MSG_STATE, loglvl);
synchronized (CentralSelector.cSel.sel) {
if (!CentralSelector.cSel.sel.isOpen()) {
Server.log ("KeepAliveTimeoutChecker", "Selector closed. Shutting down KeepAliveTimeoutChecker", Server.MSG_STATE, Server.LVL_MINOR);
return;
}
Set keyset = CentralSelector.cSel.sel.keys();
Server.log("KeepAliveCheck", "sync on selectors keyset", Server.MSG_STATE, loglvl);
synchronized (keyset) {
checkArr = (SelectionKey[]) keyset.toArray(new SelectionKey[0]);
}
}
Server.log("KeepAliveCheck", "processing " + checkArr.length + "keys", Server.MSG_STATE, loglvl);
for (int i = 0; i < checkArr.length; i++) {
SelectionKey sk = checkArr[i];
if (!sk.isValid() || !sk.channel().isOpen())
continue;
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
synchronized (cb) {
long kato = cb.getKeepAliveTimeout(now);
if (kato < 0) // no timeout
continue;
if (kato <= now) {
Server.log("KeepAliveCheck", "closing connection to " + cb.conn, Server.MSG_STATE, loglvl);
CentralSelector.dropKey(sk);
} else if (kato < nextCheck)
nextCheck = kato;
}
}
Server.log("KeepAliveCheck", "checking took me " + (System.currentTimeMillis()-now) + " millis", Server.MSG_STATE, loglvl);
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -