📄 connmanager.java
字号:
/* * Copyright (C) butor.com. All rights reserved. * * This software is published under the terms of the GNU Library General * Public License (GNU LGPL), a copy of which has been included with this * distribution in the LICENSE.txt file. */package org.butor.socket.nio;import java.io.IOException;import java.nio.channels.SelectableChannel;import java.util.ArrayList;import java.util.HashMap;import java.util.Iterator;import java.util.List;import org.butor.log.Log;/** * This connection manager facilitate the usage of sockets for reading, accepting * and writing. Client channel register with this manager to be notified when the * channel is ready for the appropriate operation. * This manager load-balance client channels between selectors to ensure a good * response time. * * TODO support variable INACTIVE_AONNS_ELAPSE * * @author sawanai * Mar 25, 2004 */public class ConnManager implements IConnManager, Runnable { public static final String CLASS_NAME = ConnManager.class.getName(); public static final int DEFAULT_RATIO = 15; public static final int INACTIVE_CONNS_ELAPSE = 60 *1000; // 1 minute protected boolean f_shutdown = false; protected String f_name = null; /* * Ratio is the maximum number of channel that a selector must handle. * When a selector exceed the ratio another selector are created and the exceeded * channels are transfered to the new selector. */ protected int f_ratio = DEFAULT_RATIO; /* * Handle the reference of channels to selectors */ protected HashMap f_channelsRef = new HashMap(); /* * Handle available selectors */ protected ArrayList f_selectors = new ArrayList(); /* * Handle newly created seletors do to split of channels. This is a temporary reference * that its content are transfered to the selectors reference after that all the selecors * are balanced. */ protected ArrayList f_newSelectors = new ArrayList(); /** * */ public ConnManager(String name) throws IOException { this(name, DEFAULT_RATIO); } /** * */ public ConnManager(String name, int ratio) throws IOException { super(); f_name = name; f_ratio = ratio; // start with one selector f_selectors.add(new ConnSelector(this, f_name)); Thread thread = new Thread(this, "ConnManager-" +f_name); thread.setDaemon(true); thread.start(); } public void run() { Log.logStr(this, Log.INFO, "run()", "started."); while (!f_shutdown) { synchronized (this) { try { wait(10000); } catch (InterruptedException ex) { //OK } } if (!f_shutdown) { // load balance selectors checkSelectors(); // check inactive connections checkInactiveConnections(); } } Log.logStr(this, Log.INFO, "start()", "stopped."); } protected void checkInactiveConnections() { long now = System.currentTimeMillis(); List inactiveConns = new ArrayList(); synchronized (f_channelsRef) { Iterator it = f_selectors.iterator(); while (it.hasNext()) { ConnSelector item = (ConnSelector) it.next(); List channels = item.getChannels(); Iterator it2 = channels.iterator(); while (it2.hasNext()) { ChannelInfo info = (ChannelInfo) it2.next(); if (info.getAttachment() == null) { continue; } IConnHandler handler = (IConnHandler) info.getAttachment(); long elapse = now -handler.getLastActivityTime(); if (elapse > INACTIVE_CONNS_ELAPSE) { inactiveConns.add(handler); } } } } Iterator it = inactiveConns.iterator(); while (it.hasNext()) { IConnHandler handler = (IConnHandler) it.next(); handler.inactiveConnection(); } } /** * Loadbalance channels per selector. * f_ratio is the number of channels that a selector must handle. * 1) If a selector get more channels than f_ratio, then a new selector are creted and * the half of the channels are transfered to the newly created selector. * 2) If two selectors handle few channels then one selector are shutdown and its channels * are transfered to the other selector. * 3) If a selector does not handle any channels then it will be shutdown. */ protected void checkSelectors() { if (Log.shouldLog(this, Log.LOG_LEVEL_FULL)) { Log.logStr(this, Log.INFO, "run()", "checking ConnManager " +f_name +" ... "); } f_newSelectors.clear(); synchronized (f_channelsRef) { Iterator it = f_selectors.iterator(); while (it.hasNext()) { ConnSelector item = (ConnSelector) it.next(); int channelsCount = item.getNbrOfChannels(); if (Log.shouldLog(this, Log.LOG_LEVEL_FULL)) { Log.logStr(this, Log.INFO, "run()", "checking selector with " +channelsCount +" channels."); } if (channelsCount == 0 && f_selectors.size() > 1) { item.shutdown(); it.remove(); if (Log.shouldLog(this, Log.LOG_LEVEL_FULL)) { Log.logStr(this, Log.INFO, "run()", "removed selector (0 channels)."); } continue; } } } } protected ConnSelector getSelector(SelectableChannel channel) { synchronized (f_channelsRef) { if (!f_channelsRef.containsKey(channel)) { Log.logStr(this, Log.WARN, "getSelector()", "Not registered!"); return null; } return (ConnSelector) f_channelsRef.get(channel); } } public void registerWithSelector(SelectableChannel channel, IConnHandler handler) { synchronized (f_channelsRef) { if (f_channelsRef.containsKey(channel)) { Log.logStr(this, Log.WARN, "registerWithSelector()", "Already registered!"); return; } // find and register with the lowest busy (having less channels) selector. ConnSelector cs = null; Iterator it = f_selectors.iterator(); while (it.hasNext()) { ConnSelector item = (ConnSelector) it.next(); if (cs == null || item.getNbrOfChannels() < cs.getNbrOfChannels()) { cs = item; } } if (cs != null) { if (cs.getNbrOfChannels() > f_ratio) { try { cs = new ConnSelector(this, "New " +f_name); f_selectors.add(cs); } catch (IOException e) { Log.logException(this, Log.ERROR, "loadBalanceSelectors()", e); } } cs.registerWithSelector(channel, handler); f_channelsRef.put(channel, cs); } else { Log.logStr(this, Log.ERROR, "registerWithSelector()", "Can't find selector!"); } } } public void unregisterWithSelector(SelectableChannel channel) { synchronized (f_channelsRef) { if (!f_channelsRef.containsKey(channel)) { Log.logStr(this, Log.WARN, "unregisterWithSelector()", "Not registered!"); return; } ConnSelector cs = (ConnSelector) f_channelsRef.remove(channel); cs.unregisterWithSelector(channel); } } public void wantWrite(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.wantWrite(channel); } } public void wantRead(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.wantRead(channel); } } public void wantAccept(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.wantAccept(channel); } } public void removeWrite(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.removeWrite(channel); } } public void removeAccept(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.removeAccept(channel); } } public void removeRead(SelectableChannel channel) { ConnSelector cs = getSelector(channel); if (cs != null) { cs.removeRead(channel); } } /** * shutdown. */ public void shutdown() { Log.logStr(this, Log.INFO, "shutdown()", "..."); f_shutdown = true; synchronized (f_channelsRef) { Iterator it = f_selectors.iterator(); while (it.hasNext()) { ((ConnSelector) it.next()).shutdown(); it.remove(); } f_channelsRef.clear(); } Log.logStr(this, Log.INFO, "shutdown()", "Done."); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -