📄 connectionmanager.java
字号:
/*
* This program is free software; you can redistribute it and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation; either version 3 of the License,
* or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
* You should have received a copy of the GNU General Public License along with this program;
* if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package com.meidusa.amoeba.net;
import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.meidusa.amoeba.context.ProxyRuntimeContext;
import com.meidusa.amoeba.data.ConMgrStats;
import com.meidusa.amoeba.util.Initialisable;
import com.meidusa.amoeba.util.InitialisationException;
import com.meidusa.amoeba.util.LoopingThread;
import com.meidusa.amoeba.util.NameableRunner;
import com.meidusa.amoeba.util.Queue;
import com.meidusa.amoeba.util.Reporter;
import com.meidusa.amoeba.util.Tuple;
/**
*
* @author <a href=mailto:piratebase@sina.com>Struct chen</a>
*/
public class ConnectionManager extends LoopingThread implements Reporter,Initialisable {
protected static Logger logger = Logger.getLogger(ConnectionManager.class);
protected static final int SELECT_LOOP_TIME = 100;
// codes for notifyObservers()
protected static final int CONNECTION_ESTABLISHED = 0;
protected static final int CONNECTION_FAILED = 1;
protected static final int CONNECTION_CLOSED = 2;
protected static final int CONNECTION_AUTHENTICATE_SUCCESS = 3;
protected static final int CONNECTION_AUTHENTICATE_FAILD = 4;
protected Selector _selector;
private Executor executor;
private List<NetEventHandler> _handlers = new ArrayList<NetEventHandler>();
protected ArrayList<ConnectionObserver> _observers = new ArrayList<ConnectionObserver>();
/** Our current runtime stats. */
protected ConMgrStats _stats;
/** 连接已经失效或者网络断开的队列 */
protected Queue<Tuple<Connection,Exception>> _deathq = new Queue<Tuple<Connection,Exception>>();
protected Queue<Tuple<NetEventHandler,Integer>> _registerQueue = new Queue<Tuple<NetEventHandler,Integer>>();
/** Counts consecutive runtime errors in select(). */
protected int _runtimeExceptionCount;
private long idleCheckTime = 5000; //Connection idle check per 5 second
private long lastIdleCheckTime = 0;
public void setIdleCheckTime(long idleCheckTime) {
this.idleCheckTime = idleCheckTime;
}
public void appendReport(StringBuilder report, long now, long sinceLast,
boolean reset,Level level) {
report.append("* ").append(this.getName()).append("\n");
report.append("- Registed Connection size: ").append(_selector.keys().size()).append("\n");
report.append("- created Connection size: ").append(_stats.connects.get()).append("\n");
report.append("- disconnect Connection size: ").append(_stats.disconnects.get()).append("\n");
if(reset){
_stats = new ConMgrStats();
}
}
public ConnectionManager() throws IOException {
_selector = SelectorProvider.provider().openSelector();
// create our stats record
_stats = new ConMgrStats();
}
public ConnectionManager(String managerName) throws IOException {
super(managerName);
_selector = SelectorProvider.provider().openSelector();
// create our stats record
_stats = new ConMgrStats();
this.setDaemon(true);
}
/**
* Performs the select loop. This is the body of the conmgr thread.
*/
protected void iterate() {
final long iterStamp = System.currentTimeMillis();
// 关闭已经断开或者宣布死亡的Connection
Tuple<Connection,Exception> deathTuple;
while ((deathTuple = _deathq.getNonBlocking()) != null) {
deathTuple.left.close(deathTuple.right);
}
if(idleCheckTime>0 && iterStamp - lastIdleCheckTime>= idleCheckTime){
lastIdleCheckTime = iterStamp;
// 关闭空闲时间过长的连接
for (NetEventHandler handler : _handlers) {
if (handler.checkIdle(iterStamp)) {
// this will queue the connection for closure on our next tick
if(handler instanceof Connection){
closeConnection((Connection) handler,null);
}
}
}
}
//将注册的连接加入handler map中
Tuple<NetEventHandler,Integer> registerHandler = null;
while ((registerHandler = _registerQueue.getNonBlocking()) != null) {
if(registerHandler.left instanceof Connection){
Connection connection = (Connection)registerHandler.left;
this.registerConnection(connection, registerHandler.right.intValue());
_handlers.add(connection);
}else{
_handlers.add(registerHandler.left);
}
}
//检查网络事件
Set<SelectionKey> ready = null;
try {
// check for incoming network events
int ecount = _selector.select(SELECT_LOOP_TIME);
//selectorLock.lock();
//try{
ready = _selector.selectedKeys();
//}finally{
// selectorLock.unlock();
//}
if (ecount == 0) {
if (ready.size() == 0) {
return;
} else {
logger.warn("select() returned no selected sockets, but there are "
+ ready.size() + " in the ready set.");
}
}
} catch (IOException ioe) {
logger.warn("Failure select()ing.", ioe);
return;
} catch (RuntimeException re) {
// instead of looping indefinitely after things go pear-shaped, shut
// us down in an
// orderly fashion
logger.warn("Failure select()ing.", re);
if (_runtimeExceptionCount++ >= 20) {
logger.warn("Too many errors, bailing.");
shutdown();
}
return;
}
// clear the runtime error count
_runtimeExceptionCount = 0;
final CountDownLatch latch = new CountDownLatch(ready.size());
//处理事件(网络数据流交互等)
for (SelectionKey selkey : ready) {
NetEventHandler handler = null;
handler = (NetEventHandler)selkey.attachment();
if (handler == null) {
latch.countDown();
logger.warn("Received network event but have no registered handler "
+ "[selkey=" + selkey + "].");
selkey.cancel();
continue;
}
if(selkey.isWritable()){
try{
boolean finished = handler.doWrite();
if(finished){
selkey.interestOps(selkey.interestOps() & ~SelectionKey.OP_WRITE);
}
} catch (Exception e) {
logger.warn("Error processing network data: " + handler + ".", e);
if (handler != null && handler instanceof Connection) {
closeConnection((Connection) handler,e);
}
}finally{
latch.countDown();
}
}else if(selkey.isReadable() || selkey.isAcceptable()){
final NetEventHandler tmpHandler = handler;
executor.execute(new NameableRunner(){
public void run(){
try{
tmpHandler.handleEvent(iterStamp);
}finally{
latch.countDown();
}
}
public String getRunnerName() {
return ConnectionManager.this.getName()+"-Reading";
}
});
}else{
latch.countDown();
logger.error(selkey.attachment()+", isAcceptable="+selkey.isAcceptable()+",isConnectable="+selkey.isConnectable()+",isReadable="+selkey.isReadable()+",isWritable="+selkey.isWritable());
}
}
ready.clear();
try {
latch.await();
} catch (InterruptedException e) {
}
}
/**
* 采用异步方式关闭一个连接。
* 将即将关闭的连接放入deathQueue中
* @param conn
*/
void closeConnection(Connection conn,Exception exception) {
if(!conn.isClosed()){
_deathq.append(new Tuple<Connection,Exception>(conn,exception));
}
}
public void closeAll() {
synchronized(_selector){
Set<SelectionKey> keys = _selector.keys();
for(SelectionKey key: keys){
Object object = key.attachment();
if(object instanceof Connection){
Connection conn = (Connection)object;
closeConnection(conn,null);
}
}
}
}
/**
* 增加 ConnectionObserver。监听Connection 相关的网络事件
* @see ConnectionObserver
*/
public void addConnectionObserver(ConnectionObserver observer) {
synchronized (_observers) {
_observers.add(observer);
}
}
/**
* 从 Observer 列表中删除一个Observer对象
* @param observer
*/
public void removeConnectionObserver(ConnectionObserver observer) {
synchronized (_observers) {
_observers.remove(observer);
}
}
protected void notifyObservers(int code, Connection conn, Object arg1) {
synchronized (_observers) {
for (ConnectionObserver obs : _observers) {
switch (code) {
case CONNECTION_ESTABLISHED:
obs.connectionEstablished(conn);
break;
case CONNECTION_FAILED:
obs.connectionFailed(conn, (Exception) arg1);
break;
case CONNECTION_CLOSED:
obs.connectionClosed(conn);
break;
default:
throw new RuntimeException(
"Invalid code supplied to notifyObservers: " + code);
}
}
}
}
/**
* 异步注册一个NetEventHandler
* @param connection
* @param key
*/
public void postRegisterNetEventHandler(NetEventHandler handler,int key){
_registerQueue.append(new Tuple<NetEventHandler,Integer>(handler,key));
/**
* 唤醒ConnectionManager正在等待select的线程,让其能够更快速的处理registerQueue队列中的对象
*/
_selector.wakeup();
}
/**
* 往ConnectionManager 增加一个SocketChannel
* @param channel
* @param key
* @param handler
*/
public void registerConnection(Connection connection,int key){
SocketChannel channel = connection.getChannel();
if(logger.isDebugEnabled()){
logger.debug("["+this.getName()+"] registed Connection["+channel.socket().getInetAddress().getHostAddress()+":"+channel.socket().getPort()+"] connected!");
}
SelectionKey selkey = null;
try {
if (!(channel instanceof SelectableChannel)) {
try {
logger.warn("Provided with un-selectable socket as result of accept(), can't "
+ "cope [channel=" + channel + "].");
} catch (Error err) {
logger.warn("Un-selectable channel also couldn't be printed.");
}
// stick a fork in the socket
if(channel != null){
channel.socket().close();
}
return;
}
SelectableChannel selchan = (SelectableChannel) channel;
selchan.configureBlocking(false);
selkey = selchan.register(_selector,key,connection);
connection.setConnectionManager(this);
connection.setSelectionKey(selkey);
configConnection(connection);
_stats.connects.incrementAndGet();
connection.init();
_selector.wakeup();
return;
} catch (IOException ioe) {
logger.error("register connection error: " + ioe);
}
if(selkey != null){
selkey.attach(null);
selkey.cancel();
}
// make sure we don't leak a socket if something went awry
if (channel != null) {
try {
channel.socket().close();
} catch (IOException ioe) {
logger.warn("Failed closing aborted connection: " + ioe);
}
}
}
protected void configConnection(Connection connection) throws SocketException{
connection.getChannel().socket().setSendBufferSize(ProxyRuntimeContext.getInstance().getConfig().getNetBufferSize()*1024);
connection.getChannel().socket().setReceiveBufferSize(ProxyRuntimeContext.getInstance().getConfig().getNetBufferSize()*1024);
connection.getChannel().socket().setTcpNoDelay(ProxyRuntimeContext.getInstance().getConfig().isTcpNoDelay());
}
/**
* 当 Connection 关闭以后
* @param conn
*/
protected void connectionClosed(Connection conn) {
/**
* 删除即将被关闭的相关对象
*/
_handlers.remove(conn);
_stats.disconnects.incrementAndGet();
/**
* 通知所有Observer列表,连接已经关闭
*/
notifyObservers(CONNECTION_CLOSED, conn, null);
}
/**
* 当 Connection 出现异常以后
* @param conn
* @param ioe
*/
protected void connectionFailed(Connection conn, Exception ioe) {
_handlers.remove(conn);
_stats.disconnects.incrementAndGet();
/**
* 当发生连接异常时,通知所有Observers
*/
notifyObservers(CONNECTION_FAILED, conn, ioe);
}
public void invokeConnectionWriteMessage(Connection connection) {
if(connection.isClosed()) return;
try {
SelectionKey key = connection.getSelectionKey();
if(!key.isValid()){
connection.handleFailure(new java.nio.channels.CancelledKeyException());
return;
}
synchronized(key){
if(key!= null && (key.interestOps() & SelectionKey.OP_WRITE) == 0){
/**
* 发送数据,如果返回false,则表示socket send buffer 已经满了。则Selector 需要监听 Writeable event
*/
boolean finished = connection.doWrite();
if(!finished){
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
}
}
} catch (IOException ioe) {
connection.handleFailure(ioe);
}
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
public void init() throws InitialisationException {
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -