⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connection.java

📁 中国移动定位引擎的客户端
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package ffcs.lbp.common;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Hashtable;

import ffcs.lbp.LbpMessage;
import ffcs.logging.Log;
import ffcs.logging.LogFactory;

import net.gleamynode.netty2.EventDispatcher;
import net.gleamynode.netty2.IoProcessor;
import net.gleamynode.netty2.Message;
import net.gleamynode.netty2.MessageParseException;
import net.gleamynode.netty2.MessageRecognizer;
import net.gleamynode.netty2.Session;
import net.gleamynode.netty2.SessionListener;

/**
* <p>Title: 小区推送LBP项目</p>
* <p>Description:
*  TCP连接类,该类为抽象类,由具体的子类实现
* </p>
* <p>Copyright: 2007 福建福富软件技术股份有限公司 </p>
* <p>Company: 福建福富软件技术股份有限公司</p>
* @author chenxin
* @version $Rev:1.0 $Date: 2007-06-28
*/
public abstract class Connection implements SessionListener {
    
    private static Log log = LogFactory.getLog(Connection.class);
    
    public static final int CONN_TYPE_SERVER = 0;

	public static final int CONN_TYPE_CLIENT = 1;
	
	public static final int DISCONNECT = 0;

	public static final int CONNECT = 1;

	public static final int LOGIN_ON = 2;

	public static final int DEFAULT_TIMEOUT = 10 * 1000; // 10seconds

	protected Session session;// TCP连接会话

	protected SessionListener sl = null;

	protected MessageRecognizer msgRecognizer = null;

	private ConnConfigMBean config = null;

	private static final int SESSION_IDLE_TIME = 10; // seconds

//	private static final int DISPATCHER_THREAD_POOL_SIZE = 3;
//
//	private static final int IO_PROCESSOR_THREAD_POOL_SIZE = 3;

	private static final int LOCK_MAP_SIZE = 100;

	private static final int WAIT_PACKET_MAP_SIZE = 100;

	private Hashtable lockMap = new Hashtable(LOCK_MAP_SIZE);

	private Hashtable waitPackets = new Hashtable(WAIT_PACKET_MAP_SIZE);

	private IoProcessor ioProcessor;

	private EventDispatcher eventDispatcher;

	private int bindMode;

	private int linkFailCount;
	
	private boolean connTypeIsServer = false;

	/**
	 * 参考{@link #login(long timeOut)}
	 * @return
	 */
	public abstract Result login();
	
	/**
	 * 登录方法,当连接作为客户端时,子类必须实现, 如果是服务端子类可以实现一个空方法,
	 * 通常连接建立后,必须立即登录以防止服务端超时 Usage:
	 * <pre>
	 * Result result = login(60 * 1000);
	 * if (result.getStatus() == 0) {
	 * 	//登录成功;           
	 * } else {
	 * 	//错误描述
	 * 	result.getDesc();
	 * }消息发往
	 * </pre>
	 * @param timeOut
	 *            登录超时时间(接收登录应答包的时间)
	 * @return 返回Result 对象
	 * @throws Exception
	 */
	public abstract Result login(long timeOut);
	/**
	 * 参考{@link #enquireLink(long timeOut)}
	 * @return
	 */
	public abstract boolean Link();
	
	/**
	 * 发送链路测试功能
	 * @param timeOut
	 *            链路测试超时时间,如果超时return false
	 * @return <code>true</code> 链路测试成功 <code>false</code> 链路测试失败
	 * @throws Exception
	 */
	public abstract boolean Link(long timeOut);

	/**
	 * 得到IO处理相关类,子类中实现
	 * @return
	 */
	public abstract InteIOProcess getInteIoProcess();

	/**
	 * 起动TCP连接
     * @return boolean <code>true</code>起动成功,<code>false</code>起动失败
	 * @throws IOException
	 */
	public  boolean start() throws IOException {
		return start(null);
	}
	
	/**
	 * 起动TCP连接
     * @return boolean <code>true</code>起动成功,<code>false</code>起动失败
	 * @throws IOException
	 */
	public synchronized boolean start(SocketChannel sc) throws IOException {
		initialize(sc);
		boolean b = session.isStarted();	
		if (!session.isStarted()) {
			session.start();
			b = waitStartComplete();
		    if(!b){//可能是连接超时,这时可能正在连接
		    	this.close();
		    }
		}
		return b;
	}
	/**
	 * 初始化TCP会话
	 * @param sc Socket如为空,则新建一个SOCKET连接,否则用已有连接
	 */
	 private void initialize(SocketChannel sc) {
		if (session != null) {
			return;
		}
		if (config == null) {
			config = new LocInfoMgrConfig();
		}
		if (msgRecognizer == null) {
			throw new IllegalStateException(
					"MessageRecognizer is not specified.");
		}
		if (ioProcessor == null) {
			throw new IllegalStateException("IoProcessor is not specified.");
		}

		if (eventDispatcher == null) {
			throw new IllegalStateException(
					"ThreadPooledEventDispatcher is not specified.");
		}
		// create a client session
		if(sc==null){
			session = new Session(ioProcessor, new InetSocketAddress(config
					.getHost(), config.getPort()), msgRecognizer,
					eventDispatcher);
		}else{
			session = new Session(ioProcessor, sc, msgRecognizer, eventDispatcher);
		}
		// set configuration
		session.getConfig().setConnectTimeout(
				(((int) getConfig().getConnTimeOut()) / 1000));
		session.getConfig().setWriteTimeout(5);//写超时5秒
		session.getConfig().setIdleTime(SESSION_IDLE_TIME);
		session.getConfig().setMaxQueuedWriteCount(3000);
		if (sl != null) {
			// suscribe and start communication
			session.addSessionListener(sl);
		} else {
			session.addSessionListener(this);
		}

	}
	/**
	 * 等待连接完成方法
	 * 
	 * @return boolean <code>true</code>连接完成,<code>false</code>连接未完成
	 */
	protected boolean waitStartComplete() {
		//判断如果socket还 未连接上,
		long l = System.currentTimeMillis();
		while (this.getConnStatus() != Connection.CONNECT) {
			//比session的连接多500毫秒,判断超时时间
			if ((System.currentTimeMillis() - l-1000) > getConfig().getConnTimeOut()) {
				break;
			}
			try {
				Thread.sleep(10);
			} catch (InterruptedException ex) {
				ex.printStackTrace();
			}
		}

		return getConnStatus() >= Connection.CONNECT;
	}

	/**
	 * 该方法为消息同步发送方式,发送完成后等待消息应答,直到超过(waitTime)时间, Usage:
	 * 
	 * <pre>
	 * int key = requestMsg.getMsgID();
	 * Message responseMsg = conn.sendRequestForRep(key, requestMsg, 1000);
	 * if (responseMsg == null) {
	 * 	//消息超时处理
	 * }
	 * </pre>
	 * @param key
	 *            消息唯一标识,request与response消息匹配用
	 * @param msg
	 *            要发送的消息
	 * @param waitTime
	 *            接收应答包等待时间
	 * @return Message 应答消息包,如果消息超时为<code>null</code>
	 */
	protected LbpMessage sendRequestForRep(int key, LbpMessage msg, long waitTime) {
		Integer id = new Integer(key);
		if (msg == null || waitTime < 0)
			return null;

		if (!sendMessage(msg)) {
			return null;
		}
		Object lock = new Object();
		lockMap.put(id, lock);
		LbpMessage resp = removeWaitPacket(id);
		if (resp == null) {
			synchronized (lock) {
				try {
					lock.wait(waitTime);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			}
			resp = removeWaitPacket(id);
		}
		lockMap.remove(id);
		return resp;
	}
	/**
	 * 连接类型,
	 * @return boolean true 作为服务端,false-作为客户端 
	 */
	public boolean connTypeIsServer(){
		return connTypeIsServer;
	}
	public void setConnTypeIsServer(boolean connType){
		connTypeIsServer=connType;
	}
	/**
	 * 消息同步发送方式中,接收到应答包后,通过KEY来匹配是否有对应的请求消息 配合<code>sendRequestForRep</code>方法使用
	 * @param key
	 *            消息唯一标识,request与response消息匹配用
	 * @param msg
	 *            应答消息
	 * @return 如果匹配成功<code>true></code>,如果没有对应的请求消息返回<code>false</code>
	 */
	protected boolean checkWaitPacket(int key, LbpMessage msg) {
		if (msg != null && lockMap.containsKey(new Integer(key))) {
			Object lock = lockMap.remove(new Integer(key));
			synchronized (lock) {
				if (lock != null) {
					addWaitPacket(key, msg);
					lock.notify();
				}
			}
			return true;
		}
		return false;
	}

	public int getConnID() {
		return config.getConnID();
	}

	/**
	 * 把消息添加到等待队列
	 * 
	 * @param key
	 *            消息唯一标识,request与response消息匹配用
	 * @param msg
	 *            应答消息
	 */
	protected void addWaitPacket(int key, LbpMessage msg) {
		waitPackets.put(new Integer(key), msg);
	}
	/**
	 * 发送数据时候使用,无超时时间,如果发送失败即返回
	 * @param msg SmMessage 要发送的消息包
	 * @return true 发送成功,false 发送失败
	 * @throws IOException
	 */
	public boolean sendMessage(LbpMessage msg) {
		if(session==null){
			log.error("发送消息失败,消息:"+ msg + " 原因:session=null "+this );
			return false;
		}
		return session.write(new LbpMessageAdapter(msg));
	}
	/**
	 * 发送数据时候使用,无超时时间,如果发送失败即返回
	 * @param msg LbpMessage 要发送的消息包
	 * @param timeOut 超时时间,
	 * @return true 发送成功,false 发送失败
	 * @throws IOException
	 */
	public boolean sendMessage(LbpMessage msg,long timeOut) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -