📄 connection.java
字号:
package ffcs.lbp.common;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Hashtable;
import ffcs.lbp.LbpMessage;
import ffcs.logging.Log;
import ffcs.logging.LogFactory;
import net.gleamynode.netty2.EventDispatcher;
import net.gleamynode.netty2.IoProcessor;
import net.gleamynode.netty2.Message;
import net.gleamynode.netty2.MessageParseException;
import net.gleamynode.netty2.MessageRecognizer;
import net.gleamynode.netty2.Session;
import net.gleamynode.netty2.SessionListener;
/**
* <p>Title: 小区推送LBP项目</p>
* <p>Description:
* TCP连接类,该类为抽象类,由具体的子类实现
* </p>
* <p>Copyright: 2007 福建福富软件技术股份有限公司 </p>
* <p>Company: 福建福富软件技术股份有限公司</p>
* @author chenxin
* @version $Rev:1.0 $Date: 2007-06-28
*/
public abstract class Connection implements SessionListener {
private static Log log = LogFactory.getLog(Connection.class);
public static final int CONN_TYPE_SERVER = 0;
public static final int CONN_TYPE_CLIENT = 1;
public static final int DISCONNECT = 0;
public static final int CONNECT = 1;
public static final int LOGIN_ON = 2;
public static final int DEFAULT_TIMEOUT = 10 * 1000; // 10seconds
protected Session session;// TCP连接会话
protected SessionListener sl = null;
protected MessageRecognizer msgRecognizer = null;
private ConnConfigMBean config = null;
private static final int SESSION_IDLE_TIME = 10; // seconds
// private static final int DISPATCHER_THREAD_POOL_SIZE = 3;
//
// private static final int IO_PROCESSOR_THREAD_POOL_SIZE = 3;
private static final int LOCK_MAP_SIZE = 100;
private static final int WAIT_PACKET_MAP_SIZE = 100;
private Hashtable lockMap = new Hashtable(LOCK_MAP_SIZE);
private Hashtable waitPackets = new Hashtable(WAIT_PACKET_MAP_SIZE);
private IoProcessor ioProcessor;
private EventDispatcher eventDispatcher;
private int bindMode;
private int linkFailCount;
private boolean connTypeIsServer = false;
/**
* 参考{@link #login(long timeOut)}
* @return
*/
public abstract Result login();
/**
* 登录方法,当连接作为客户端时,子类必须实现, 如果是服务端子类可以实现一个空方法,
* 通常连接建立后,必须立即登录以防止服务端超时 Usage:
* <pre>
* Result result = login(60 * 1000);
* if (result.getStatus() == 0) {
* //登录成功;
* } else {
* //错误描述
* result.getDesc();
* }消息发往
* </pre>
* @param timeOut
* 登录超时时间(接收登录应答包的时间)
* @return 返回Result 对象
* @throws Exception
*/
public abstract Result login(long timeOut);
/**
* 参考{@link #enquireLink(long timeOut)}
* @return
*/
public abstract boolean Link();
/**
* 发送链路测试功能
* @param timeOut
* 链路测试超时时间,如果超时return false
* @return <code>true</code> 链路测试成功 <code>false</code> 链路测试失败
* @throws Exception
*/
public abstract boolean Link(long timeOut);
/**
* 得到IO处理相关类,子类中实现
* @return
*/
public abstract InteIOProcess getInteIoProcess();
/**
* 起动TCP连接
* @return boolean <code>true</code>起动成功,<code>false</code>起动失败
* @throws IOException
*/
public boolean start() throws IOException {
return start(null);
}
/**
* 起动TCP连接
* @return boolean <code>true</code>起动成功,<code>false</code>起动失败
* @throws IOException
*/
public synchronized boolean start(SocketChannel sc) throws IOException {
initialize(sc);
boolean b = session.isStarted();
if (!session.isStarted()) {
session.start();
b = waitStartComplete();
if(!b){//可能是连接超时,这时可能正在连接
this.close();
}
}
return b;
}
/**
* 初始化TCP会话
* @param sc Socket如为空,则新建一个SOCKET连接,否则用已有连接
*/
private void initialize(SocketChannel sc) {
if (session != null) {
return;
}
if (config == null) {
config = new LocInfoMgrConfig();
}
if (msgRecognizer == null) {
throw new IllegalStateException(
"MessageRecognizer is not specified.");
}
if (ioProcessor == null) {
throw new IllegalStateException("IoProcessor is not specified.");
}
if (eventDispatcher == null) {
throw new IllegalStateException(
"ThreadPooledEventDispatcher is not specified.");
}
// create a client session
if(sc==null){
session = new Session(ioProcessor, new InetSocketAddress(config
.getHost(), config.getPort()), msgRecognizer,
eventDispatcher);
}else{
session = new Session(ioProcessor, sc, msgRecognizer, eventDispatcher);
}
// set configuration
session.getConfig().setConnectTimeout(
(((int) getConfig().getConnTimeOut()) / 1000));
session.getConfig().setWriteTimeout(5);//写超时5秒
session.getConfig().setIdleTime(SESSION_IDLE_TIME);
session.getConfig().setMaxQueuedWriteCount(3000);
if (sl != null) {
// suscribe and start communication
session.addSessionListener(sl);
} else {
session.addSessionListener(this);
}
}
/**
* 等待连接完成方法
*
* @return boolean <code>true</code>连接完成,<code>false</code>连接未完成
*/
protected boolean waitStartComplete() {
//判断如果socket还 未连接上,
long l = System.currentTimeMillis();
while (this.getConnStatus() != Connection.CONNECT) {
//比session的连接多500毫秒,判断超时时间
if ((System.currentTimeMillis() - l-1000) > getConfig().getConnTimeOut()) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
return getConnStatus() >= Connection.CONNECT;
}
/**
* 该方法为消息同步发送方式,发送完成后等待消息应答,直到超过(waitTime)时间, Usage:
*
* <pre>
* int key = requestMsg.getMsgID();
* Message responseMsg = conn.sendRequestForRep(key, requestMsg, 1000);
* if (responseMsg == null) {
* //消息超时处理
* }
* </pre>
* @param key
* 消息唯一标识,request与response消息匹配用
* @param msg
* 要发送的消息
* @param waitTime
* 接收应答包等待时间
* @return Message 应答消息包,如果消息超时为<code>null</code>
*/
protected LbpMessage sendRequestForRep(int key, LbpMessage msg, long waitTime) {
Integer id = new Integer(key);
if (msg == null || waitTime < 0)
return null;
if (!sendMessage(msg)) {
return null;
}
Object lock = new Object();
lockMap.put(id, lock);
LbpMessage resp = removeWaitPacket(id);
if (resp == null) {
synchronized (lock) {
try {
lock.wait(waitTime);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
resp = removeWaitPacket(id);
}
lockMap.remove(id);
return resp;
}
/**
* 连接类型,
* @return boolean true 作为服务端,false-作为客户端
*/
public boolean connTypeIsServer(){
return connTypeIsServer;
}
public void setConnTypeIsServer(boolean connType){
connTypeIsServer=connType;
}
/**
* 消息同步发送方式中,接收到应答包后,通过KEY来匹配是否有对应的请求消息 配合<code>sendRequestForRep</code>方法使用
* @param key
* 消息唯一标识,request与response消息匹配用
* @param msg
* 应答消息
* @return 如果匹配成功<code>true></code>,如果没有对应的请求消息返回<code>false</code>
*/
protected boolean checkWaitPacket(int key, LbpMessage msg) {
if (msg != null && lockMap.containsKey(new Integer(key))) {
Object lock = lockMap.remove(new Integer(key));
synchronized (lock) {
if (lock != null) {
addWaitPacket(key, msg);
lock.notify();
}
}
return true;
}
return false;
}
public int getConnID() {
return config.getConnID();
}
/**
* 把消息添加到等待队列
*
* @param key
* 消息唯一标识,request与response消息匹配用
* @param msg
* 应答消息
*/
protected void addWaitPacket(int key, LbpMessage msg) {
waitPackets.put(new Integer(key), msg);
}
/**
* 发送数据时候使用,无超时时间,如果发送失败即返回
* @param msg SmMessage 要发送的消息包
* @return true 发送成功,false 发送失败
* @throws IOException
*/
public boolean sendMessage(LbpMessage msg) {
if(session==null){
log.error("发送消息失败,消息:"+ msg + " 原因:session=null "+this );
return false;
}
return session.write(new LbpMessageAdapter(msg));
}
/**
* 发送数据时候使用,无超时时间,如果发送失败即返回
* @param msg LbpMessage 要发送的消息包
* @param timeOut 超时时间,
* @return true 发送成功,false 发送失败
* @throws IOException
*/
public boolean sendMessage(LbpMessage msg,long timeOut) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -