⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 centralselector.java

📁 使用工具jublider开发的一个聊天室实现基本功能,
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * Copyright (C) 2003  Manfred Andres
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 */

/**
 * The CentralSelector does the actual IO. All connections get Registered
 * and will be served when they are ready for action.
 * 
 * When a connection comes in, it get's registered with the CentralSelector.
 * If a connectino has content, the content will be automatically directed to
 *    one RequestReader's io-queue
 * If content must be written, the ConnectinoBuffer's write-queue attached to
 *    the connection will store it, untill the connection is ready for write-action
 */
package freecs.core;

import freecs.Server;
import freecs.util.ObjectBuffer;
import java.util.Set;
import java.util.Iterator;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.Selector;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CancelledKeyException;

public class CentralSelector extends Thread {
   public static boolean stopped  = false;
   public static final CentralSelector cSel = new CentralSelector();
   private Selector     sel      = null;
   private long         rqLastChecked, nextUnavailableMessage=0;
   public  ObjectBuffer dropKeys;
   
   public ObjectBuffer reqQueue = new ObjectBuffer (Server.srv.MAX_READERS*10);

   private CentralSelector () {
   	  dropKeys = new ObjectBuffer (10000);
      if (!initCsel ())
         Server.log (this, "construct: unable to init Csel", Server.MSG_ERROR, Server.LVL_HALT);
   }

   private boolean initCsel () {
		if (sel == null || !sel.isOpen ()) try {
			sel = SelectorProvider.provider ().openSelector ();
		} catch (IOException ioe) {
			Server.debug (this, "initCsel:", ioe, Server.MSG_ERROR, Server.LVL_HALT);
			return false;
		}
		if (sel != null && sel.isOpen ())
	 		return true;
		return false;
   }

	public static void startCentralSelector () {
        cSel.setName("CentralSelector");
		if (!cSel.isAlive())
			cSel.start();
		// cSel.setPriority(MAX_PRIORITY-2);
	}

   public int keyCount () {
      Set keys = sel.keys ();
      return keys.size ();
   }
   
	public void registerSC (SocketChannel sc, int reqType) throws IOException, ClosedChannelException {
		if (sc == null) return;
		sc.configureBlocking (false);
		ConnectionBuffer cb = new ConnectionBuffer (reqType);
        cb.setKey (sc.register (sel, SelectionKey.OP_READ, cb));
	}

    public void run () {
        Server.log (this, "starting up", Server.MSG_STATE, Server.LVL_MINOR);
        int sdc = 500;
        long lastMessage = 0;
        Thread katc = new Thread(new KeepAliveTimeoutChecker());
        katc.start();
        while (Server.srv.isRunning () || sel.keys().size() > 0) try {
            if (!Server.srv.isRunning ()) {
                sdc--;
                if (sdc <= 0) break;
            }
            if (Server.DEBUG || lastMessage + 5000 > System.currentTimeMillis()) {
                Server.log (this, "loopstart: known sockets=" + sel.keys().size(), Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
                lastMessage = System.currentTimeMillis();
            }
            while (!dropKeys.isEmpty()) {
                SelectionKey sc;
                synchronized (dropKeys) {
                    sc = (SelectionKey) dropKeys.pop();
                    dropKeys.notify();
                }
                implCloseChannel (sc);
            }
            long now = System.currentTimeMillis();
            try {
                if (sel.selectNow() < 1) {
                    try {
                        Thread.sleep (33);
                    } catch (InterruptedException ie) { }
                    continue;
                }
            } catch (Exception e) {
                Server.debug (this, "run (select):", e, Server.MSG_ERROR , Server.LVL_MAJOR);
            }
            Set keys = sel.selectedKeys();
            if (keys!=null && !keys.isEmpty()) {
                for (Iterator i = keys.iterator (); i.hasNext (); ) {
                    SelectionKey ck = (SelectionKey) i.next ();
                    i.remove();
                    try {
                        if (!CentralSelector.isSkValid(ck)) {
                            Server.log (this, "run: current key is invalid", Server.MSG_STATE, Server.LVL_VERBOSE);
                            continue;
                        }
                        if (ck.isReadable ()) {
                            readIn (ck);
                        }
                    } catch (CancelledKeyException cke) { }
                }
            }
            try {
                Thread.sleep (33);
            } catch (InterruptedException ie) { }
        } catch (Exception e) {
            Server.debug (this, "(outer loop): ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
        }
        katc.interrupt();
        if (sel != null) try {
            Server.log (this, "closing down", Server.MSG_ERROR, Server.LVL_MAJOR);
            sel.close ();
        } catch (Exception e) {
            Server.debug (this, "shutting down: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
        }
        Server.log (this, "suspended", Server.MSG_ERROR, Server.LVL_MINOR);
        stopped = true;
        // cSelList.remove (this);
        // Server.log (cSelList.size () + " CentralSelectors in cSelList", Server.MSG_STATE, Server.LVL_MINOR);
    }

	private void readIn (SelectionKey sk) {
        if (!CentralSelector.isSkValid(sk)) {
            Server.log (this, "readIn: current request has invalid key", Server.MSG_STATE, Server.LVL_VERBOSE);
            return;
        }
		ConnectionBuffer cb = (ConnectionBuffer) sk.attachment ();
        int bytesRead;
		try {
            synchronized (cb) {
                SocketChannel sc = (SocketChannel) sk.channel ();
                bytesRead = sc.read (cb.rBuf);
                if (bytesRead < 0) {
                    // Server.log ("CentralSelector.readIn: droped key", Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
                    implCloseChannel (sk);
                    return;
                } else if (bytesRead == 0) {
                    Server.log (this, "readIn: no data from socket", Server.MSG_STATE, Server.LVL_VERBOSE);
                    return;
                }
                cb.updateKeepAliveTimeout();
                cb.currentRequest = cb.append();
                if (cb.currentRequest != null) {
                    addRequest(sk, cb);
                }
                return;
            }
		} catch (IOException ioe) {
			Server.debug (this, "readIn: droped key (IOException)", ioe, Server.MSG_ERROR, Server.LVL_VERY_VERBOSE);
            implCloseChannel (sk);
			cb.logError (ioe.getMessage());
		} catch (Exception e) {
			Server.debug (this, "readIn: Exception encountered while reading: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
            implCloseChannel (sk);
			cb.logError (e.getMessage());
		}
    }
    
    public void addRequest(SelectionKey sk, ConnectionBuffer cb) {
        if (Server.srv.USE_CENTRAL_REQUESTQUEUE 
                && !this.addRequestToQueue (sk)) {
            implCloseChannel (sk);
            if (nextUnavailableMessage >= System.currentTimeMillis())
                return;
            cb.logError ("RequestQueue is full");
            Server.log (this, "readIn: RequestQueue is full", Server.MSG_ERROR, Server.LVL_MAJOR);
            nextUnavailableMessage += 1000;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -