📄 centralselector.java
字号:
/**
* Copyright (C) 2003 Manfred Andres
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
/**
* The CentralSelector does the actual IO. All connections get Registered
* and will be served when they are ready for action.
*
* When a connection comes in, it get's registered with the CentralSelector.
* If a connectino has content, the content will be automatically directed to
* one RequestReader's io-queue
* If content must be written, the ConnectinoBuffer's write-queue attached to
* the connection will store it, untill the connection is ready for write-action
*/
package freecs.core;
import freecs.Server;
import freecs.util.ObjectBuffer;
import java.util.Set;
import java.util.Iterator;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CancelledKeyException;
public class CentralSelector extends Thread {
public static boolean stopped = false;
public static final CentralSelector cSel = new CentralSelector();
private Selector sel = null;
private long rqLastChecked;
public ObjectBuffer dropKeys;
private CentralSelector () {
dropKeys = new ObjectBuffer (1000);
if (!initCsel ())
Server.log ("CentralSelector.construct: unable to init Csel", Server.MSG_ERROR, Server.LVL_HALT);
}
private boolean initCsel () {
if (sel == null || !sel.isOpen ()) try {
sel = SelectorProvider.provider ().openSelector ();
} catch (IOException ioe) {
Server.debug ("CentralSelector.initCsel:", ioe, Server.MSG_ERROR, Server.LVL_HALT);
return false;
}
if (sel != null && sel.isOpen ())
return true;
return false;
}
public static void startCentralSelector () {
if (!cSel.isAlive())
cSel.start();
cSel.setPriority(MAX_PRIORITY);
}
public int keyCount () {
Set keys = sel.keys ();
return keys.size ();
}
public void registerSC (SocketChannel sc, int reqType) throws IOException, ClosedChannelException {
if (sc == null) return;
sc.configureBlocking (false);
ConnectionBuffer cb = new ConnectionBuffer (reqType);
synchronized (cb) {
SelectionKey sk = sc.register (sel, SelectionKey.OP_READ, cb);
cb.setKey (sk);
cb.notifyAll();
}
}
public void run () {
Server.log ("CentralSelector: starting up", Server.MSG_STATE, Server.LVL_MINOR);
int sdc = 500;
long lastMsg = 0;
while (Server.srv.isRunning () || sel.keys().size() > 0) try {
if (!Server.srv.isRunning ()) {
sdc--;
if (sdc <= 0) break;
}
if ((System.currentTimeMillis () - lastMsg) > 5000) {
lastMsg=System.currentTimeMillis();
Server.log ("CentralSelector.run: loop start", Server.MSG_STATE, Server.LVL_MAJOR);
}
synchronized (dropKeys) {
if (!dropKeys.isEmpty()) {
while (!dropKeys.isEmpty())
closeChannel ((SelectionKey) dropKeys.pop());
}
}
try {
if (sel.selectNow() < 1) try {
Thread.sleep (33);
continue;
} catch (InterruptedException ie) { }
} catch (Exception e) {
Server.debug ("CentralSelector.run (select):", e, Server.MSG_ERROR , Server.LVL_MAJOR);
}
Set keys = sel.selectedKeys();
if (keys!=null && !keys.isEmpty()) {
for (Iterator i = keys.iterator (); i.hasNext (); ) {
SelectionKey ck = (SelectionKey) i.next ();
i.remove();
try {
if (!ck.isValid () || !ck.channel ().isOpen ()) {
closeChannel (ck);
continue;
}
if (ck.isReadable ()) {
readIn (ck);
}/* else if (ck.isWritable()) {
writeOut (ck);
} */
} catch (CancelledKeyException cke) { }
}
}
try {
Thread.sleep (33);
} catch (InterruptedException ie) { }
} catch (Exception e) {
Server.debug ("CentralSelector (outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
if (sel != null) try {
Server.log ("CentralSelector closing down selector", Server.MSG_ERROR, Server.LVL_MAJOR);
sel.close ();
} catch (Exception e) {
Server.debug ("CentralSelector shutting down: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
Server.log ("CentralSelectro suspended", Server.MSG_ERROR, Server.LVL_MINOR);
stopped = true;
// cSelList.remove (this);
// Server.log (cSelList.size () + " CentralSelectors in cSelList", Server.MSG_STATE, Server.LVL_MINOR);
}
private void readIn (SelectionKey sk) {
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
if (cb == null) {
Server.log ("Key without ConnectionBuffer!", Server.MSG_ERROR, Server.LVL_MAJOR);
closeChannel(sk);
return;
}
synchronized (cb) {
try {
if (!sk.isValid() || !sk.channel().isOpen()) {
closeChannel(sk);
cb.notifyAll();
return;
}
int bytesRead = ((SocketChannel) sk.channel ()).read (cb.rBuf);
if (bytesRead < 0) {
Server.log ("CentralSelector.readIn: droped key", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
closeChannel (sk);
cb.notifyAll();
return;
} else if (bytesRead == 0) {
Server.log ("CentralSelector.readIn: no data from socket", Server.MSG_STATE, Server.LVL_VERBOSE);
cb.notifyAll();
return;
}
cb.updateCloseWhen();
} catch (IOException ioe) {
Server.debug ("CentralSelector.readIn: droped key (IOException)", ioe, Server.MSG_ERROR, Server.LVL_VERY_VERBOSE);
closeChannel (sk);
cb.invalidate();
cb.logError (ioe.getMessage());
cb.notifyAll();
return;
} catch (Exception e) {
Server.debug ("CentralSelector.readIn: Exception encountered while reading: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
cb.invalidate();
cb.logError (e.getMessage());
cb.notifyAll();
return;
}
if (!RequestReader.processRequest(sk)) {
closeChannel (sk);
cb.invalidate();
cb.logError ("No available requestreader");
Server.log ("CentralSelector.readIn: No availabel requestreader to process request", Server.MSG_ERROR, Server.LVL_MAJOR);
cb.notifyAll();
return;
}
}
}
public void closeChannel (SelectionKey sk) {
try {
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment();
if (cb != null) {
User u = cb.getUser();
if (u!=null)
u.scheduleToRemove();
}
sk.cancel();
((SocketChannel) sk.channel()).close();
} catch (Exception e) {
Server.debug ("CentralSelector.closeChannel: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
}
}
public static void dropKey (SelectionKey sk) {
if (sk == null) return;
ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
if (cb != null) {
User u = cb.getUser ();
if (u != null) {
StringBuffer sb = new StringBuffer ("CentralSelector.dropKey: droped key for user ").append (u.getName ());
Server.log (sb.toString (), Server.MSG_STATE, Server.LVL_VERBOSE);
u.scheduleToRemove ();
}
}
Responder.res.dropChannel ((SocketChannel) sk.channel());
synchronized (cSel.dropKeys) {
cSel.dropKeys.put (sk);
}
}
public static void dropChannel (SocketChannel sc) {
SelectionKey sk = sc.keyFor(cSel.sel);
if (sk == null) return;
dropKey (sk);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -