📄 connselector.java
字号:
/* * Copyright (C) butor.com. All rights reserved. * * This software is published under the terms of the GNU Library General * Public License (GNU LGPL), a copy of which has been included with this * distribution in the LICENSE.txt file. */package org.butor.socket.nio;import java.io.IOException;import java.nio.channels.CancelledKeyException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.Set;import org.butor.helper.ReaderWriterLock;import org.butor.log.Log;/** * Selector manager. This is used by the ConnManger that delegate client requests * to an instance of this class. This class use a select on regitered channels and * notify channel's owner for IO operation when the channel is ready for given opration. * * @author sawanai * Mar 25, 2004 */public class ConnSelector implements Runnable { public static final String CLASS_NAME = ConnSelector.class.getName(); public final static int NOT_WRITE = ~SelectionKey.OP_WRITE; public final static int NOT_READ = ~SelectionKey.OP_READ; public final static int NOT_ACCEPT = ~SelectionKey.OP_ACCEPT; protected Selector f_selector = null; protected ReaderWriterLock f_selectLock = new ReaderWriterLock(); protected boolean f_shutdown = false; protected String f_name = null; protected IConnManager f_connManager; protected int f_nbrOfChannels = 0; /** * */ public ConnSelector(IConnManager connManager, String name) throws IOException { super(); f_selector = Selector.open(); f_connManager = connManager; f_name = name; Thread thread = new Thread(this, "ConnSelector-" +f_name); thread.start(); Log.logStr(this, Log.INFO, "ConnSelector", "Created new selector."); } public void run() { Log.logStr(this, Log.INFO, "run()", "started."); while (!f_shutdown) { /* * Prevent this object from entering the select * while clients are trying to access the selector * * A quick get/release lock does the job */ f_selectLock.beforeWrite(); f_selectLock.afterWrite(); try { int count = f_selector.select(); if (count == 0) { continue; } } catch (IOException e) { Log.logException(this, Log.ERROR, "run()", e); shutdown(); return; } try { Set selectedKeys = f_selector.selectedKeys(); Iterator keys = selectedKeys.iterator(); while(!f_shutdown && keys.hasNext()) { SelectionKey key = (SelectionKey) keys.next(); if (!key.isValid()) { keys.remove(); continue; } Object att = key.attachment(); if (att == null) { Log.logStr(this, Log.WARN, "run()", "Got null handler for Read/Write!"); key.cancel(); continue; } IConnHandler handler = (IConnHandler) att; if (key.isValid() && key.isReadable()) { if (!f_shutdown && !handler.read()) { key.cancel(); handler.close(); } } if (key.isValid() && key.isWritable()) { if (!f_shutdown && !handler.write()) { key.cancel(); handler.close(); } } // new Connection? if (key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); try { SocketChannel clientChannel = server.accept(); if (clientChannel != null) { clientChannel.configureBlocking(false); // to avoid java.nio.channels.IllegalBlockingModeException with select handler.accept(f_connManager, clientChannel); } } catch (IOException e) { Log.logException(this, Log.WARN, "run()", e); handler.close(); } } if (key.isValid()) { keys.remove(); } } } catch (Throwable e) { Log.logException(this, Log.ERROR, "run()", new Exception(e)); } } Log.logStr(this, Log.INFO, "start()", "stopped."); } public void registerWithSelector(SelectableChannel channel, IConnHandler handler) { // Register server socket channel with the selector try { f_selectLock.beforeRead(); f_selector.wakeup(); if (handler != null) { channel.register(f_selector, 0, handler); } else { channel.register(f_selector, 0); } } catch (ClosedChannelException e) { //OK } finally { f_selectLock.afterRead(); } } public void unregisterWithSelector(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // Not supposed ... Log.logStr(this, Log.INFO, "unregisterWithSelector()", "Got NULL key!"); } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(0); key.cancel(); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void wantWrite(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // closing channel return; } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void wantRead(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // closing channel return; } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() | SelectionKey.OP_READ); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void wantAccept(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // closing channel return; } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void removeWrite(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // Not supposed ... Log.logStr(this, Log.INFO, "removeWrite()", "Got NULL key!"); } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() & NOT_WRITE); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void removeAccept(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // Not supposed ... Log.logStr(this, Log.INFO, "removeAccept()", "Got NULL key!"); } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() & NOT_ACCEPT); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } public void removeRead(SelectableChannel channel) { SelectionKey key = channel.keyFor(f_selector); if (key == null) { // Not supposed ... Log.logStr(this, Log.INFO, "removeRead()", "Got NULL key!"); } else { try { f_selectLock.beforeRead(); f_selector.wakeup(); key.interestOps(key.interestOps() & NOT_READ); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterRead(); } } } /** * shutdown */ public void shutdown() { Log.logStr(this, Log.INFO, "shutdown()", "..."); f_shutdown = true; try { f_selectLock.beforeWrite(); f_selector.wakeup(); } catch (CancelledKeyException e) { //OK } finally { f_selectLock.afterWrite(); } } public int getNbrOfChannels() { return f_selector.keys().size(); } public List getChannels() { ArrayList list = new ArrayList(); Iterator it = f_selector.keys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); ChannelInfo info = new ChannelInfo(); info.setInterestOps(key.interestOps()); info.setChannel(key.channel()); info.setAttachment(key.attachment()); list.add(info); } return list; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -