⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 locinfomgrioprocess.java

📁 中国移动定位引擎的客户端
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
	public static void closeSocket(SocketChannel sc) {
		try {
			if (sc != null && sc.isConnected()) {
				sc.close();
			}
		} catch (IOException ioe) {
			ioe.printStackTrace();
			// 不用处理
		}
	}

	protected void addFlowLocInfoMgrConn(SimpleHashtable LocInfoMgrConns) {
		if (flowThread == null) {
			flowThread = new FlowControl(5);
			flowThread.start();
		}
		// 添加需要流量控制实体集
		flowThread.add(LocInfoMgrConns);
	}

	/**
	 * 通知收发模块,各实体的连接状态
	 * @param sh
	 */
	protected void notifyLinkStatusToAllLocInfoMgr(SimpleHashtable sh){
		if(sh==null){
			return;
		}
		Iterator key  = sh.keyIterator();
		while(key.hasNext()){
			int index =((Integer)key.next()).intValue();
			Connection conn = (Connection)sh.get(index);
			if (conn != null ) {
				notifyLinkStatusToLocInfoMgr(conn);
			}
		}
	}
	/**
	 *  通知链路状态
	 * @param connId 连接编号
	 * @param status
	 * @return
	 */
	protected boolean notifyLinkStatusToLocInfoMgr(Connection conn) {
/*		if(conn==null){
			return false;
		}
		if(LocInfoMgrConns==null || LocInfoMgrConns.getSize()==0){
			log.warn("发送链路通知消息到收发模块,收发模块不存在"  );
			return false;
		}
		LinkStatusReq linkStatus = new LinkStatusReq();
		linkStatus.setEntityId(conn.getConnID());
		if(conn.isConnected()){
			linkStatus.setStatus(LinkStatusReq.CONNECT);
		}else{
			linkStatus.setStatus(LinkStatusReq.DISCONNECT);
		}
		
		if(log.isDebugEnabled()){
			log.debug("发送链路通知消息到收发模块:" +linkStatus );
	    }
		int result = sendAllMsp(linkStatus);
		
		return (result>0) ? true : false;*/
		return true;
	}
	
	/**
	 * 检查MSP中各收发模块的链路状态
	 */
	private void checkLocInfoMgrConnsStatus() {
		if (LocInfoMgrConns == null || LocInfoMgrConns.getSize() == 0) {
			return;
		}
		Iterator key  = LocInfoMgrConns.keyIterator();
		while(key.hasNext()){
			int keyIndex =((Integer)key.next()).intValue();
			Connection conn = (Connection) LocInfoMgrConns.get(keyIndex);
			if (conn == null) {
				continue;
			}
			if (!checkIsConnect(conn)) {
				//更新数据库信息
//				updateDBConnStatus(conn);
				processDisconnect(conn);
				LocInfoMgrConns.delete(keyIndex);
			}
		}
	}
	/**
	 * 向所有的MSP发消息
	 * @param mspMsg
	 * @return
	 */
    protected int sendAllLocInfoMgr(LeMessage LeMsg){
    	int result =0;
    	Iterator key  = LocInfoMgrConns.keyIterator();
		while(key.hasNext()){
			int index =((Integer)key.next()).intValue();
			Connection conn = (Connection)LocInfoMgrConns.get(index);
			if (conn != null && conn.isConnected()) {
				LeMsg.setPackFlag(conn.getConnID());
				if(log.isDebugEnabled()){
					log.debug("发送消息到收发模块:"+conn+LeMsg);
				}
				if(conn.sendMessage(LeMsg)){
					result++;
				}
			}
		}
    	return result;
    }
	/**
	 * 检查链路的连接状态
	 */
	protected boolean checkIsConnect(Connection conn) {
		   //判断实体连接状态
           if (conn==null || !conn.isConnected()){
        	   return false;
           }
		   processEnquireLink(conn);
		   
		   if(conn.checkMaxLinkFailCount()){
			   log.error("链路检测超时:"+conn);
		   }
		return (!conn.checkMaxLinkFailCount());
	}
	
	/**
	 * 关闭断开的连接,
	 * @param conn
	 */
	protected void processDisconnect(Connection conn) {
		if(log.isWarnEnabled()){
			log.warn("关闭链路:"+conn);
		}
		conn.close();
	}
	
	/**
	 * 处理重新连接
	 * @param conn
	 * @return
	 */
	protected boolean processReConnect(Connection conn) {
		long currConnectTime = System.currentTimeMillis() - lastReConnectTime;
		if (currConnectTime< reConnectTime) {
			return false;
		}
        
		lastReConnectTime = System.currentTimeMillis();
		log.warn("链路断开,重连接:"+conn);
		return processConnect(conn);
	}
	/**
	 * 重新连接
	 * @param conn
	 * @return true 重连接成功,false 重连接失败
	 */
	protected boolean processConnect(Connection conn) {
		
		Result r = null;
		if(conn.isConnected()){
			   log.info("processEmseDiconnect :close connection "+conn.getConfig().getConnID());
			   conn.close();
		}
		try {
			if (!conn.start()) {
				log.error("连接不成功:" + conn);
				return false;
			}
		} catch (IOException ioe) {
			log.error("重登录异常:"+conn,ioe);
			return false;
		}
		
		try{
			r = conn.login();
	 	} catch (Exception ioe) {
		    log.error("登录异常:"+conn,ioe);
		}finally{
			if(r!=null && r.getStatus()==0){
				log.info("登录到实体成功:" + conn);
			}else{
				conn.close();
				log.error("登录到实体失败:" + r);
			}
		}
		
		return (r!=null && r.getStatus()==0);
	}

	/**
	 * 连接关闭
	 * @param conn
	 */
	public void connectionClosed(Connection conn){
        //外部实体断开时通知收发模块
		if(!(conn instanceof LocInfoMgrConnection)){
			notifyLinkStatusToLocInfoMgr(conn);
		}
		String connType= (conn instanceof LocInfoMgrConnection) ? "内部收发模块" : "外部实体";
		log.infoAndAlert(connType+"连接已关闭"+conn.getConnID(),null,
				moduleNo,Error.ERR_INFO,Constant.UDP_ALERT_IP,Constant.UDP_ALERT_PORT);
	}
	
	/**
	 * 连接建立
	 * @param conn
	 */
	public void connectionEstablished(Connection conn){
		//外部实体连接上时通知收发模块
		if(!(conn instanceof LocInfoMgrConnection)){
			notifyLinkStatusToLocInfoMgr(conn);
		}
		String connType= (conn instanceof LocInfoMgrConnection) ? "内部收发模块" : "外部实体";
		log.infoAndAlert(connType+"连接已建立,编号:"+conn.getConnID(),null,
				moduleNo,Error.ERR_INFO,Constant.UDP_ALERT_IP,Constant.UDP_ALERT_PORT);
	}
	/**
	 * 发送UDP追踪消息
	 * @param mspSm
	 * @param entityID
	 * @param entityName
	 * @param dataCoding
	 * @param shortMsg
	 * @param srcMsisdn
	 * @param destMsisdn
	 * @return true 发送成功 false 发送失败
	 */
	public boolean sendTraceUDP(LeMessage mspSm,int entityID,
			String entityName,byte dataCoding,byte[] shortMsg,
			String srcMsisdn,String destMsisdn){
		return sendTraceUDP(mspSm,entityID,entityName,dataCoding,
				shortMsg,srcMsisdn,destMsisdn,-1);
	}
	/**
	 * 发送UDP追踪消息
	 * @param mspSm
	 * @param entityID
	 * @param entityName
	 * @param dataCoding
	 * @param shortMsg
	 * @param srcMsisdn
	 * @param destMsisdn
	 * @return true 发送成功 false 发送失败
	 */
	public boolean sendTraceUDP(LbpMessage mspSm,int entityID,
			String entityName,byte dataCoding,byte[] shortMsg,
			String srcMsisdn,String destMsisdn,int commandID){
		boolean result=false;
	/*	try{
			TraceSmResult tsr = new TraceSmResult();
 			tsr.setModuleType(moduleNo);
 			tsr.setModuleNo(runIntfSeq);
 			tsr.setEntitySrc(entityID);
 			tsr.setEntityName(entityName);
 			tsr.setTraceFlag(mspSm.getTraceFlag());
 			tsr.setTraceSeqNo(mspSm.getTraceSeqNo());
 			tsr.setTraceMsisdnSrc(mspSm.getSrcMsisdn());
 			tsr.setTraceMsisdnDest(mspSm.getDestMsisdn());
 			tsr.setTraceStepNo(mspSm.getTraceStepNo());
 			if(commandID>0){
 				tsr.setMsgNo(commandID);
 			}else {
 				tsr.setMsgNo(mspSm.getMsgType());
 			}
 			 发送原始消息包
 			LbpMessage sm = mspSm.getSmMessage();
 			if(sm!=null){
 				ByteBuffer bf = ByteBuffer.allocate(sm.getPackLocInfoMgrngth());
 				sm.writeMsg(bf);
 				tsr.setMsgText(bf.array());
 				bf.clear();
 				bf=null;
 			}
 			
 			if(srcMsisdn==null){
 				tsr.setMsisdnSrc(mspSm.getSrcMsisdn());
 			}else{
 				tsr.setMsisdnSrc(srcMsisdn);
 			}
 			if(destMsisdn==null){
 				tsr.setMsisdnDest(mspSm.getDestMsisdn());
 			}else{
 				tsr.setMsisdnDest(destMsisdn);
 			}
 			tsr.setDataCoding(dataCoding);
            tsr.setShorttMsg(shortMsg);
            
            traceManager.sendTraceResult(tsr);
            result=true;
		}catch(Exception e){
			log.error(e);	
		}*/
		return result;
	}
	/**
	 * WorkThread线程运行的方法
	 */
	private void workThreadRun() {
		//检查MSP的连接状态
		if (LocInfoMgrConns != null && LocInfoMgrConns.getSize() > 0) {
			checkLocInfoMgrConnsStatus();
		}
		workProcess();
	}

	
	/**
	 * 流量控制类.每隔1秒名连接的计数器清零一次
	 * @author chenxin
	 */
	class FlowControl extends Thread {
		boolean runFlag = true;

		SimpleHashtable[] shArray = null;

		int shSize = 0;;

		FlowControl(int maxSh) {
			shArray = new SimpleHashtable[maxSh];
			setDaemon(true);
		}

		public synchronized void add(SimpleHashtable sh) {
			if (shSize < shArray.length) {
				shArray[shSize++] = sh;
			}
		}

		public void stopThread() {
			runFlag = false;
		}

		public void run() {
			System.out.println("流量控制线程启动");
			while (runFlag) {
				try {
					for (int i = 0; i < shSize; i++) {
						if (shArray[i] == null) {
							continue;
						}
						Iterator key  = shArray[i].keyIterator();
						while(key.hasNext()){
							int index =((Integer)key.next()).intValue();
							Connection conn = (Connection) shArray[i].get(index);
							if (conn != null) {
								conn.setReceiveFlux(0);
								conn.setSendFlux(0);
							}
						}
					}
					sleep(1000);
				} catch (Throwable e) {
					log.error("流量控制出错",e);
				}
			}
		}
	}

	/**
	 * 该类负责,数据库信息更新,链路检测,重连接等功能,维护线程
	 * @author chenxin
	 */
	class WorkThread extends Thread {
		boolean runFlag = false;
		public WorkThread() {
			setDaemon(true);
		}
		public boolean isStartd() {
			return runFlag;
		}
		public void stopThread() {
			runFlag = false;
		}
		public void run() {
			runFlag = true;
			System.out
					.println("WorkThread 线程起动,定时2秒执行一次[workProcess]方法进行相关维护,包括链路检测");
			while (runFlag) {
				try {
					sleep(2000);
					workThreadRun();
				} catch (Throwable e) {
					log.error("WorkThread.run",e);
				}
			}
		}
	}

	protected IoProcessor getIoProcessor() {
		return ioProcessor;
	}

	protected EventDispatcher getEventDispatcher() {
		return eventDispatcher;
	}

	public MessageRecognizer getMspEncoder() {
		return LocInfoMgrEncoder;
	}

	public long getLinkNotifyTime() {
		return linkNotifyTime ;
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -