📄 locinfomgrioprocess.java
字号:
package ffcs.lbp.common;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.sql.SQLException;
import java.util.Iterator;
import net.gleamynode.netty2.EventDispatcher;
import net.gleamynode.netty2.IoProcessor;
import net.gleamynode.netty2.MessageRecognizer;
import net.gleamynode.netty2.SimpleEventDispatcher;
import net.gleamynode.netty2.ThreadPooledEventDispatcher;
import ffcs.config.Config;
import ffcs.lbp.LbpMessage;
import ffcs.lbp.ProtocolException;
import ffcs.lbp.dao.LocInfoMgr;
import ffcs.lbp.dao.LoginInfoDao;
import ffcs.lbp.le.message.LeMessage;
import ffcs.lbp.le.message.Login;
import ffcs.lbp.le.message.LoginRsp;
import ffcs.lbp.common.Connection;
import ffcs.lbp.common.Error;
import ffcs.lbp.trace.TraceSmResult;
import ffcs.logging.Log;
import ffcs.logging.LogFactory;
/**
* <p>Title: 小区推送项目</p>
* <p>Description:
* 该类实现位置信息管理模块与定位前置模块之间的通信,实现以下功能 1、实现链路检测;
* 2、实时与数据库同步; 3、路由功能 4、流量控制等
* </p>
* <p>Copyright: 2008 福建福富软件技术股份有限公司 </p>
* <p>Company: 福建福富软件技术股份有限公司</p>
* @author chenxin
* @version 1.0
*/
public abstract class LocInfoMgrIOProcess extends InteIOProcess {
protected static Log log = LogFactory.getLog(LocInfoMgrIOProcess.class);
private IoProcessor ioProcessor;
private ThreadPooledEventDispatcher eventDispatcher;
private int ioProcessThreadSize = 2;
private int eventProcessThreadSize = 2;
private MessageRecognizer LocInfoMgrEncoder = null;
private WorkThread mainThread = null;
private FlowControl flowThread = null;
// msp收发模块编号的最大索引号,负荷分发时,可从1到该索引号之间进行轮循
private int maxLocInfoMgrConnIndex = 7;
private SimpleHashtable LocInfoMgrConns = new SimpleHashtable(maxLocInfoMgrConnIndex);
private volatile int currLocInfoMgrConnIndexIndex = -1;// 负荷分发的时候用
private LoginInfoDao loginDao = null;
private boolean start = false;
//重连接时间
private long lastReConnectTime = System.currentTimeMillis();
private long reConnectTime=10000;//默认为10秒
//链路状态通知时间
private long linkNotifyTime = 60000;
//模块编号,发送告警等信息的时候有用,5位编号如(20111);
protected int moduleNo=0;
protected int runIntfSeq=0;//接口运行编号,从配置文件读取该值
/*流程追踪开关,true-开,false-关*/
protected boolean traceFlag = false;
/* 流程追踪 管理实例 */
// protected TraceManager traceManager = TraceManager.getInstance();
public LocInfoMgrIOProcess() {
try {
ioProcessThreadSize = Integer.parseInt(Config.getInstance()
.getGlobalProp("IoThreadSize"));
eventProcessThreadSize = Integer.parseInt(Config.getInstance()
.getGlobalProp("ProcessThreadSize"));
reConnectTime = Long.parseLong(Config.getInstance().getGlobalProp(
LocInfoMgrConfig.CONNECT_INTERVAL_TIME));
linkNotifyTime = Long.parseLong(Config.getInstance().getGlobalProp(
LocInfoMgrConfig.LINK_NOTIFY_TIME));
moduleNo = Integer.parseInt(Config.getInstance()
.getGlobalProp("moduleNo"));
runIntfSeq=Integer.parseInt(Config.getInstance()
.getGlobalProp("runIntfSeq"));
int temp=Integer.parseInt(Config.getInstance()
.getGlobalProp("traceFlag"));
traceFlag = (temp==1) ? true : false;
} catch (Exception e) {
log.error(e);
}
LocInfoMgrEncoder = this.getLocInfoMgrMessageRecognizer();
loginDao = new LoginInfoDao();
}
/**
* 得到位置信息管理模块连接的数量
* @return
*/
// public abstract int getLocInfoMgrCount();
/**
* 得到MspMessageRecognizer类,解析收发模块发来的消息
* @return MspMessageRecognizer
*/
protected abstract LocInfoMgrMessageRecognizer getLocInfoMgrMessageRecognizer();
/**
* 线程隔一段时间会调用该方法一次
*/
protected abstract void workProcess();
/**
* 程序的起动方法
* @return true 起动成功,false起动失败
* @throws IOException
*/
public synchronized boolean start() throws IOException {
if (ioProcessor == null) {
if (getLocInfoMgrCount() > 2) {
ioProcessThreadSize += getLocInfoMgrCount() / 2;
}
ioProcessor = new IoProcessor();
ioProcessor.setThreadPoolSize(ioProcessThreadSize);
//start with the default number of I/O worker threads
ioProcessor.start();
}
if (eventDispatcher == null) {
if (getLocInfoMgrCount() > 2) {
eventProcessThreadSize += getLocInfoMgrCount()/2;
}
eventDispatcher = new SimpleEventDispatcher();
eventDispatcher.setThreadPoolSize(eventProcessThreadSize);
eventDispatcher.start();
//eventDispatcher = new LowLatencyEventDispatcher();
}
// 管理主线程起动
if (mainThread == null) {
mainThread = new WorkThread();
mainThread.start();
}
start = true;
return true;
}
public boolean isStart() {
return start;
}
/**
* 检查该连接是否存在
*
* @param OrgSystemID
* @return <code>true</code> 代表存在,<code>false</code>代表不存在
*/
public boolean checkLocInfoMgrConnExist(int OrgSystemID) {
boolean result = false;
/* 判断该连接是否已存在 */
if (LocInfoMgrConns.get(OrgSystemID) != null) {
LocInfoMgrConnection LocInfoMgrConn = (LocInfoMgrConnection) LocInfoMgrConns.get(OrgSystemID);
// 判断是否重新登录
if (LocInfoMgrConn.getConnStatus() == Connection.CONNECT
|| LocInfoMgrConn.getConnStatus() == Connection.LOGIN_ON) {
result = true;
} else {
LocInfoMgrConn.close();
LocInfoMgrConns.delete(OrgSystemID);
result = false;
}
}
return result;
}
/**
* 添加新的 收发模块连接
* @param sc
*/
public void addNewLocInfoMgrConn(SocketChannel sc) {
log.info("新的连接:" + sc.socket().getRemoteSocketAddress());
Login login = checkLogin(sc);
if(login!=null){
addNewLocInfoMgrConn(sc,login);
}
}
/**
* 检查收发模块登录是否正确
* 1、登录包是否超时
* 2、是否重复登录
* 3、用户名密码是否正确
* @param sc
* @return BindReq 如果登录成功,返回登录请求包,失败,返回空
*/
protected Login checkLogin(SocketChannel sc) {
int check = -1;
// 接收登录请求包
ByteBuffer readBuf = receiveBindReq(sc);
if (readBuf.position() < Login.HEADER_LENGTH) {
log.error("接收包登录超时" + sc);
closeSocket(sc);
return null;
}
readBuf.flip();
Login loginReq = new Login();
try {
loginReq.readMsg(readBuf);
} catch (ProtocolException mpe) {
mpe.printStackTrace();
closeSocket(sc);
return null;
}
/* 判断该连接是否已存在 */
if (this.checkLocInfoMgrConnExist(loginReq.getOrgSystemID())) {
log.warn("该模块连接已存在[" + loginReq.getOrgSystemID() + "]," + sc);
LoginRsp loginRsp = new LoginRsp();
loginRsp.setResult(LoginRsp.ERR_RELOGIN);
try {
write(sc, loginRsp);
} catch (IOException ioe) {
log.error(ioe);
} finally {
closeSocket(sc);
}
return null;
}
try {
check = loginDao.checkUser(loginReq.getUserId(), loginReq
.getPassword(), loginReq.getOrgSystemID(),loginReq.getDstSystemID(),sc.socket().getRemoteSocketAddress());
} catch (SQLException sqle) {
log.error("loginDao.checkUser", sqle);
// 日志
}
// 判断用户名和密码是否正确
if (check>0) {
log.warn("到登录信息表验证错误[" + loginReq + "错误号"+check+"]," + sc);
LoginRsp loginRsp = new LoginRsp();
loginRsp.setSequenceId(loginReq.getSequenceId());
loginRsp.setOrgSystemID(loginReq.getDstSystemID());
loginRsp.setDstSystemID(loginReq.getOrgSystemID());
loginRsp.setResult(check);
try {
write(sc, loginRsp);
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
closeSocket(sc);
}
return null;
}
return loginReq;
}
/**
* 添加已经经过登录包检验过的,新的收发模块连接,
* @param sc
* @param login
*/
protected void addNewLocInfoMgrConn(SocketChannel sc, Login login) {
/** 登录成功,后加入到LocInfoMgrConns集合,保存起该连接 */
LocInfoMgrConnection conn = new LocInfoMgrConnection(ioProcessor, eventDispatcher,
LocInfoMgrEncoder, this);
LocInfoMgr ei = new LocInfoMgr();
ei.setEmseNo(String.valueOf(login.getOrgSystemID()));
LocInfoMgrConfig config = new LocInfoMgrConfig();
config.setEmseInfo(ei);
config.setConnID(login.getOrgSystemID());
conn.setConfig(config);
boolean b=false;
try {
if (!conn.start(sc)) {
log.error("位置信息模块模块登录时,连接失败");
}else{
conn.getConfig().setConnStatus(Connection.LOGIN_ON);
LoginRsp loginRsp = new LoginRsp();
loginRsp.setSequenceId(login.getSequenceId());
loginRsp.setResult(LoginRsp.SUCC);
b = conn.sendMessage(loginRsp);
if(b){
LocInfoMgrConns.put(login.getOrgSystemID(), conn);
log.info("位置信息模块登录成功:" + conn);
}
}
} catch (IOException ioe) {
log.error(ioe);
}finally{
if(!b){
closeSocket(sc);
if (conn != null) {
conn.close();
conn=null;
}
}
}
}
/**
* 把序列号转换前半个字节提取出来,转为模块编号
* @param sequenceNumber
* @return
*/
protected int transformModuleId(int sequenceNumber) {
//逻辑右移,高位补0
return (sequenceNumber >>> 28);
}
/**
* 根据模块编号得到LocInfoMgr连接
* @param key 模块编号
* @return MspConnection
*/
public LocInfoMgrConnection getLocInfoMgrConnection(int key) {
return (LocInfoMgrConnection) LocInfoMgrConns.get(key);
}
/**
* 根据收发模块的编号,发送消息到收发模块
* @param msg 要发送的消息
* @param destConn 收发模块连接
* @param srcConn 来源实体连接
* @return true-消息发模块成功,false 消息发送失败
*/
private boolean sendToLocInfoMgr(LeMessage msg,Connection destConn) {
//检查收发模块是否为空
if (destConn == null ){
log.error("发送消息时,收发模块连接为空,消息:" + msg);// + " 消息来源:"+srcConn );
return false;
}
//检查收发模块连接是否正常
if (!destConn.isConnected()) {
// 写日志
log.error("发送消息时,收发模块未连接:"+ destConn + " 消息:" + msg );
return false;
}
//消息是否为空
if(msg==null){
log.warn("向收发模块发送消息为空");//,消息来源:"+srcConn);
return false;
}
boolean result = false;
try {
result = destConn.sendMessage(msg);
} catch (Exception e) {
log.error("发送消息到收发模块异常:"+destConn,e);
}
if(!result){
log.error("发送消息到收发模块失败:"+ destConn + msg);
}
return result;
}
/**
*
* @return
*/
public int getLocInfoMgrCount() {
return LocInfoMgrConns.getSize();
}
/**
* 把LocInfoMgr的消息写入SC
*
* @param sc
* @param msg
* @return
* @throws IOException
*/
protected boolean write(SocketChannel sc, LbpMessage msg)
throws IOException {
ByteBuffer writeBF = ByteBuffer.allocate(1024);
writeBF.clear();
msg.writeMsg(writeBF);
writeBF.flip();
while (writeBF.remaining() > 0) {
if (sc.write(writeBF) == 0) {
return false;
}
}
return true;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -