⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 locinfomgrioprocess.java

📁 中国移动定位引擎的客户端
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package ffcs.lbp.common;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.sql.SQLException;
import java.util.Iterator;

import net.gleamynode.netty2.EventDispatcher;
import net.gleamynode.netty2.IoProcessor;
import net.gleamynode.netty2.MessageRecognizer;
import net.gleamynode.netty2.SimpleEventDispatcher;
import net.gleamynode.netty2.ThreadPooledEventDispatcher;
import ffcs.config.Config;
import ffcs.lbp.LbpMessage;
import ffcs.lbp.ProtocolException;
import ffcs.lbp.dao.LocInfoMgr;
import ffcs.lbp.dao.LoginInfoDao;
import ffcs.lbp.le.message.LeMessage;
import ffcs.lbp.le.message.Login;
import ffcs.lbp.le.message.LoginRsp;
import ffcs.lbp.common.Connection;
import ffcs.lbp.common.Error;
import ffcs.lbp.trace.TraceSmResult;
import ffcs.logging.Log;
import ffcs.logging.LogFactory;


/**
* <p>Title: 小区推送项目</p>
* <p>Description:
*  该类实现位置信息管理模块与定位前置模块之间的通信,实现以下功能 1、实现链路检测;
*   2、实时与数据库同步; 3、路由功能 4、流量控制等
* </p>
* <p>Copyright: 2008 福建福富软件技术股份有限公司 </p>
* <p>Company: 福建福富软件技术股份有限公司</p>
* @author chenxin
* @version 1.0
*/
public abstract class LocInfoMgrIOProcess extends InteIOProcess {
    
    protected static Log log = LogFactory.getLog(LocInfoMgrIOProcess.class);

	private IoProcessor ioProcessor;

	private ThreadPooledEventDispatcher eventDispatcher;
	private int ioProcessThreadSize = 2;

	private int eventProcessThreadSize = 2;

	private MessageRecognizer LocInfoMgrEncoder = null;

	private WorkThread mainThread = null;

	private FlowControl flowThread = null;

	// msp收发模块编号的最大索引号,负荷分发时,可从1到该索引号之间进行轮循
	private int maxLocInfoMgrConnIndex = 7;
	
	private SimpleHashtable LocInfoMgrConns = new SimpleHashtable(maxLocInfoMgrConnIndex);

	private volatile int currLocInfoMgrConnIndexIndex = -1;// 负荷分发的时候用

	private LoginInfoDao loginDao = null;

	private boolean start = false;
	
	//重连接时间
	private long lastReConnectTime = System.currentTimeMillis();

	private long reConnectTime=10000;//默认为10秒

   //链路状态通知时间
	private long linkNotifyTime = 60000;
	
	//模块编号,发送告警等信息的时候有用,5位编号如(20111);
	protected int moduleNo=0;
	
	protected int runIntfSeq=0;//接口运行编号,从配置文件读取该值
	
	/*流程追踪开关,true-开,false-关*/
	protected boolean traceFlag = false;
	
	/* 流程追踪 管理实例 */
//	protected TraceManager traceManager = TraceManager.getInstance();

	public LocInfoMgrIOProcess() {
		try {
			ioProcessThreadSize = Integer.parseInt(Config.getInstance()
					.getGlobalProp("IoThreadSize"));
			eventProcessThreadSize = Integer.parseInt(Config.getInstance()
					.getGlobalProp("ProcessThreadSize"));
			reConnectTime = Long.parseLong(Config.getInstance().getGlobalProp(
					LocInfoMgrConfig.CONNECT_INTERVAL_TIME));
			linkNotifyTime = Long.parseLong(Config.getInstance().getGlobalProp(
					LocInfoMgrConfig.LINK_NOTIFY_TIME));
			moduleNo = Integer.parseInt(Config.getInstance()
					.getGlobalProp("moduleNo"));
			runIntfSeq=Integer.parseInt(Config.getInstance()
					.getGlobalProp("runIntfSeq"));
			
			int temp=Integer.parseInt(Config.getInstance()
					.getGlobalProp("traceFlag"));
			traceFlag = (temp==1) ? true : false;
			
		} catch (Exception e) {
			log.error(e);
		}
		
		LocInfoMgrEncoder = this.getLocInfoMgrMessageRecognizer();
		loginDao = new LoginInfoDao();
	}

	/**
	 * 得到位置信息管理模块连接的数量
	 * @return
	 */
//	public abstract int getLocInfoMgrCount();

	/**
	 * 得到MspMessageRecognizer类,解析收发模块发来的消息
	 * @return MspMessageRecognizer
	 */
	protected abstract LocInfoMgrMessageRecognizer getLocInfoMgrMessageRecognizer();

	/**
	 * 线程隔一段时间会调用该方法一次
	 */
	protected abstract void workProcess();

	/**
	 * 程序的起动方法
	 * @return true 起动成功,false起动失败
	 * @throws IOException
	 */
	public synchronized boolean start() throws IOException {

		if (ioProcessor == null) {
			if (getLocInfoMgrCount() > 2) {
				ioProcessThreadSize += getLocInfoMgrCount() / 2;
			}
			ioProcessor = new IoProcessor();
			ioProcessor.setThreadPoolSize(ioProcessThreadSize);
			//start with the default number of I/O worker threads
			ioProcessor.start();
		}

		if (eventDispatcher == null) {
			if (getLocInfoMgrCount() > 2) {
				eventProcessThreadSize += getLocInfoMgrCount()/2;
			}

			eventDispatcher = new SimpleEventDispatcher();
			eventDispatcher.setThreadPoolSize(eventProcessThreadSize);
			eventDispatcher.start();
			
			//eventDispatcher = new LowLatencyEventDispatcher();
		}
		// 管理主线程起动
		if (mainThread == null) {
			mainThread = new WorkThread();
			mainThread.start();
		}
		start = true;
		return true;
	}

	public boolean isStart() {
		return start;
	}

	/**
	 * 检查该连接是否存在
	 * 
	 * @param OrgSystemID
	 * @return <code>true</code> 代表存在,<code>false</code>代表不存在
	 */
	public boolean checkLocInfoMgrConnExist(int OrgSystemID) {
		boolean result = false;
		/* 判断该连接是否已存在 */
		if (LocInfoMgrConns.get(OrgSystemID) != null) {
			LocInfoMgrConnection LocInfoMgrConn = (LocInfoMgrConnection) LocInfoMgrConns.get(OrgSystemID);
			// 判断是否重新登录
			if (LocInfoMgrConn.getConnStatus() == Connection.CONNECT
					|| LocInfoMgrConn.getConnStatus() == Connection.LOGIN_ON) {
				result = true;
			} else {
				LocInfoMgrConn.close();
				LocInfoMgrConns.delete(OrgSystemID);
				result = false;
			}
		}
		return result;
	}

	/**
	 * 添加新的 收发模块连接
	 * @param sc
	 */
	public void addNewLocInfoMgrConn(SocketChannel sc) {
	    log.info("新的连接:" + sc.socket().getRemoteSocketAddress());
	    Login login =  checkLogin(sc);
        if(login!=null){
        	addNewLocInfoMgrConn(sc,login);
        }
	}
	
	/**
	 * 检查收发模块登录是否正确
	 * 1、登录包是否超时
	 * 2、是否重复登录
	 * 3、用户名密码是否正确
	 * @param sc
	 * @return BindReq 如果登录成功,返回登录请求包,失败,返回空
	 */
	protected Login checkLogin(SocketChannel sc) {
		int check = -1;
		// 接收登录请求包
		ByteBuffer readBuf = receiveBindReq(sc);
		if (readBuf.position() < Login.HEADER_LENGTH) {
			log.error("接收包登录超时" + sc);
			closeSocket(sc);
			return null;
		}
		readBuf.flip();
		Login loginReq = new Login();
		try {
			loginReq.readMsg(readBuf);
		} catch (ProtocolException mpe) {
			mpe.printStackTrace();
			closeSocket(sc);
			return null;
		}

		/* 判断该连接是否已存在 */
		if (this.checkLocInfoMgrConnExist(loginReq.getOrgSystemID())) {
			log.warn("该模块连接已存在[" + loginReq.getOrgSystemID() + "]," + sc);
			LoginRsp loginRsp = new LoginRsp();
			
			loginRsp.setResult(LoginRsp.ERR_RELOGIN);
			try {
				write(sc, loginRsp);
			} catch (IOException ioe) {
				log.error(ioe);
			} finally {
				closeSocket(sc);
			}
			return null;
		}

		try {
			check = loginDao.checkUser(loginReq.getUserId(), loginReq
					.getPassword(), loginReq.getOrgSystemID(),loginReq.getDstSystemID(),sc.socket().getRemoteSocketAddress());
		} catch (SQLException sqle) {
			log.error("loginDao.checkUser", sqle);
			// 日志
		}
		// 判断用户名和密码是否正确
		if (check>0) {
			log.warn("到登录信息表验证错误[" + loginReq + "错误号"+check+"]," + sc);
			LoginRsp loginRsp = new LoginRsp();
			loginRsp.setSequenceId(loginReq.getSequenceId());
			loginRsp.setOrgSystemID(loginReq.getDstSystemID());
			loginRsp.setDstSystemID(loginReq.getOrgSystemID());
			loginRsp.setResult(check);
			try {
				write(sc, loginRsp);
			} catch (IOException ioe) {
				ioe.printStackTrace();
			} finally {
				closeSocket(sc);
			}
			return null;
		}
		return loginReq;
	}
	
	/**
	 * 添加已经经过登录包检验过的,新的收发模块连接,
	 * @param sc
	 * @param login
	 */
	protected void addNewLocInfoMgrConn(SocketChannel sc, Login login) {
		/** 登录成功,后加入到LocInfoMgrConns集合,保存起该连接 */
		LocInfoMgrConnection conn = new LocInfoMgrConnection(ioProcessor, eventDispatcher,
				LocInfoMgrEncoder, this);
		LocInfoMgr ei = new LocInfoMgr();
		ei.setEmseNo(String.valueOf(login.getOrgSystemID()));
		LocInfoMgrConfig config = new LocInfoMgrConfig();
		config.setEmseInfo(ei);
		config.setConnID(login.getOrgSystemID());
		conn.setConfig(config);
		boolean b=false;
		try {
			if (!conn.start(sc)) {
				log.error("位置信息模块模块登录时,连接失败");
			}else{
				conn.getConfig().setConnStatus(Connection.LOGIN_ON);
				LoginRsp loginRsp = new LoginRsp();
				loginRsp.setSequenceId(login.getSequenceId());
				loginRsp.setResult(LoginRsp.SUCC);
				b = conn.sendMessage(loginRsp);
				if(b){
					LocInfoMgrConns.put(login.getOrgSystemID(), conn);
					log.info("位置信息模块登录成功:" + conn);
				}
			}
		} catch (IOException ioe) {
			log.error(ioe);
		}finally{
			if(!b){
				closeSocket(sc);
				if (conn != null) {
					conn.close();
					conn=null;
				}
			}
		}
	}

	/**
	 * 把序列号转换前半个字节提取出来,转为模块编号
	 * @param sequenceNumber
	 * @return
	 */
	protected int transformModuleId(int sequenceNumber) {
		//逻辑右移,高位补0
		return (sequenceNumber >>> 28);
	}

	/**
	 * 根据模块编号得到LocInfoMgr连接
	 * @param key 模块编号
	 * @return MspConnection
	 */
	public LocInfoMgrConnection getLocInfoMgrConnection(int key) {
		return (LocInfoMgrConnection) LocInfoMgrConns.get(key);
	}

	/**
	 * 根据收发模块的编号,发送消息到收发模块
	 * @param msg 要发送的消息
	 * @param destConn 收发模块连接
	 * @param srcConn 来源实体连接
	 * @return true-消息发模块成功,false 消息发送失败
	 */
	private boolean sendToLocInfoMgr(LeMessage msg,Connection destConn) {
		
		//检查收发模块是否为空
		if (destConn == null ){
			log.error("发送消息时,收发模块连接为空,消息:" + msg);// + " 消息来源:"+srcConn );
		    return false;
		}
		//检查收发模块连接是否正常	
		if (!destConn.isConnected()) {
			// 写日志
			log.error("发送消息时,收发模块未连接:"+ destConn  + " 消息:" + msg );
			return false;
		}
		//消息是否为空
		if(msg==null){
			log.warn("向收发模块发送消息为空");//,消息来源:"+srcConn);
			return false;
		}
		
		boolean result = false;
		
		try {
			result = destConn.sendMessage(msg);
		} catch (Exception e) {
			log.error("发送消息到收发模块异常:"+destConn,e);
		}
		
		if(!result){
			log.error("发送消息到收发模块失败:"+ destConn + msg);
		}
		return result;
	}
	
	/**
	 * 
	 * @return
	 */
	public int getLocInfoMgrCount() {
		return LocInfoMgrConns.getSize();
	}

	/**
	 * 把LocInfoMgr的消息写入SC
	 * 
	 * @param sc
	 * @param msg
	 * @return
	 * @throws IOException
	 */
	protected boolean write(SocketChannel sc, LbpMessage msg)
			throws IOException {
        
		ByteBuffer writeBF = ByteBuffer.allocate(1024);
		writeBF.clear();
		msg.writeMsg(writeBF);
		writeBF.flip();
		while (writeBF.remaining() > 0) {
			if (sc.write(writeBF) == 0) {
				return false;
			}
		}
		return true;
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -