📄 locprefixioprocess.java
字号:
package ffcs.lbp.le;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.sql.SQLException;
import java.util.Iterator;
import net.gleamynode.netty2.MessageRecognizer;
import ffcs.lbp.LbpMessage;
import ffcs.lbp.ProtocolException;
import ffcs.lbp.SagClient.SagCycLoc;
import ffcs.lbp.SagClient.SagStopCycLoc;
import ffcs.lbp.common.Connection;
import ffcs.lbp.common.ConnectionFactory;
import ffcs.lbp.common.LbpUtil;
import ffcs.lbp.common.LocInfoMgrConfig;
import ffcs.lbp.common.LocInfoMgrIOProcess;
import ffcs.lbp.common.LocInfoMgrMessageRecognizer;
import ffcs.lbp.common.MessageCount;
import ffcs.lbp.common.SimpleHashtable;
import ffcs.lbp.dao.EmseInfoDao;
import ffcs.lbp.dao.LocInfoMgr;
import ffcs.lbp.dao.LoginInfoDao;
import ffcs.lbp.le.message.AddLocUser;
import ffcs.lbp.le.message.AddLocUserRsp;
import ffcs.lbp.le.message.CycLoc;
import ffcs.lbp.le.message.CycLocRsp;
import ffcs.lbp.le.message.DelLocUser;
import ffcs.lbp.le.message.DelLocUserRsp;
import ffcs.lbp.le.message.Dstan;
import ffcs.lbp.le.message.DstanRsp;
import ffcs.lbp.le.message.GetUserLoc;
import ffcs.lbp.le.message.GetUserLocRsp;
import ffcs.lbp.le.message.LeMessage;
import ffcs.lbp.le.message.Login;
import ffcs.lbp.le.message.LoginRsp;
import ffcs.lbp.le.message.StopCycLoc;
import ffcs.lbp.le.message.StopCycLocRsp;
import ffcs.lbp.le.message.TriArea;
import ffcs.lbp.le.message.TriAreaRsp;
import ffcs.lbp.le.message.TriAreaStop;
import ffcs.lbp.le.message.TriAreaStopRsp;
import ffcs.lbp.le.message.UserTrig;
import ffcs.lbp.le.message.UserTrigRsp;
import ffcs.lbp.le.message.tlv.TLVTable;
import ffcs.lbp.le.message.tlv.Tag;
import ffcs.logging.Log;
import ffcs.logging.LogFactory;
import ffcs.util.SysUtil;
/**
* <p>
* Title: 小区推送LBP项目
* </p>
* <p>
* Description:
* 该类实现位置信息管理模块与定位前置之间的通信,实现以下功能 1、实现功能 4、流量控制等
* </p>
* <p>
* 位置信息管理模块与定位前置之间采用路由表
* </p>
* <p>
* Copyright: 2008 福建福富软件技术股份有限公司
* </p>
* <p>
* Company: 福建福富软件技术股份有限公司
* </p>
* @author chenxin
* @version v1.0
*/
public class LocPrefixIOProcess extends LocInfoMgrIOProcess {
private static Log log = LogFactory.getLog(LocPrefixIOProcess.class);
private final static int MAX_LocPre_SIZE = 3;
private final static int CONN_FLAG_SERVER=1;
protected EmseInfoDao emseDao = null;
// 位置管理模块作为客户端时,连接到定位前置的连接集
private SimpleHashtable clientConns = new SimpleHashtable(MAX_LocPre_SIZE);
/*// 位置管理模块作为服务端时,定位前置连接上去的连接集
private SimpleHashtable serverConns = new SimpleHashtable(MAX_LocPre_SIZE);*/
// 协议解析类
private MessageRecognizer LocPrefixEncoder = new LocPrefixMessageRecognizer();
// 最后一次实体从数据库同步时间
private long lastSynchEmseInfoTime = System.currentTimeMillis();
private boolean initialize = false;
// 提供给msp网关监听服务端口
private int mspgwPort = -1;
// 外部实体数量,该数量值越大,数据处理线程越多
private int emseCount = 1;// 实体数量,根据该值计算处理线程的数量
/* // 测试时候使用
private MessageCount emseCounter = MessageCount.getMspgwMsgCount();
private MessageCount mspCounter = MessageCount.getMspgwMspMsgCount();*/
// 最后链路状态通知时间
private long lastLinkNotifyTime = System.currentTimeMillis();
/**
* 构造函数
*/
public LocPrefixIOProcess() {
/* emseDao = new EmseInfoDao();
Module module = Config.getInstance().getModule(
Constant.LocPrefix_MODEUL_NAME);
mspgwPort = Integer.parseInt(module.getProperties("LocPrefix_PORT"));
try {
List emseList = emseDao.getEmseInfoList(Constant.EMSE_TYPE_LocPrefix,
runIntfSeq);
if (emseList != null) {
emseCount = emseList.size();
}
} catch (SQLException sqle) {
log.error("emseDao查询实体信息列表出错", sqle);
}*/
// 添加需要流量控制的实体集,从该流量中
// super.addFlowLocInfoMgrConn(clientConns);
// super.addFlowLocInfoMgrConn(serverConns);
}
/**
* 得到MspMessageRecognizer类,解析收发模块发来的消息
*
* @return MspMessageRecognizer
*/
public LocInfoMgrMessageRecognizer getLocInfoMgrMessageRecognizer(){
return new LocPrefixMsgRecognized();
}
/**
* 从位置信息管理模块收到消息,消息判断后,进行相应处理
* @param conn
* 连接对象
* @param message
* 收到的消息
*/
public void receivedFromLocInfoMgr(Connection conn,LbpMessage message) {
// 测试的时候使用,记录消息包的数量
// MessageCount.incrementReceive((conn.getConnID()), message.getPackFlag());
// 进行消息转换
if (!(message instanceof LeMessage)) {
log.error("LbpMessage转换成LeMessage时错误:" + message);
return;
}
LeMessage msg = (LeMessage) message;
/**
* 流程追踪功能,流程
* 1.先判断流程追踪开关是否为TRUE,为TRUE进行第二步
* 2.判断消息头是否带有流程追踪消息
* 3.发送流程追踪结果消息到UPD服务器
*/
/* if (traceFlag && msg.getTraceFlag()>0) {
this.processMspSmTrace(msg,conn);
}*/
if (!LeMessage.checkCommandID(message.getCommandId())) {
log.error("消息被丢弃,原因:命令ID错误:" + message.getCommandId() + " 来源:"
+ conn);
return;
}
switch (message.getCommandId()) {
case LeMessage.GetUserLoc :
processGetUserLoc(conn,message);
break;
case LeMessage.AddLocUser :
processAddLocUser(conn,message);
break;
case LeMessage.DelLocUser :
processDelLocUser(conn,message);
break;
case LeMessage.UserTrig :
processUserTrig(conn,message);
break;
case LeMessage.CycLoc :
processCycLoc(conn,message);
break;
case LeMessage.StopCycLoc :
processStopCycLoc(conn,message);
break;
case LeMessage.TriArea :
processTriArea(conn,message);
break;
case LeMessage.TriAreaStop :
processTriAreaStop(conn,message);
break;
case LeMessage.Dstan :
processDstan(conn,message);
break;
default:
log.warn("未知command_id:0x"
+ Integer.toHexString(msg.getCommandId()) + msg);
break;
}
}
/**
* 处理从收发模块发来的流程追踪消息流程追踪消息,
* @param mspSm 要追踪的消息
* @param srcConn 消息来源
*/
private void processMspSmTrace(LeMessage mspSm, Connection srcConn) {
/* if (mspSm == null) {
log.warn("流程追踪消息为空:" + mspSm + " 来源:" + srcConn);
return;
}
try {
byte dataCoding = 1;
byte[] smMessage = null;
String srcMsisdn = null;
String destMsisdn = null;
*//**
* 流程追踪的时候,根据来源实体区分消息是从外部实体,还是从收发模块来,进行消息ID转换
* #define ForwardSMReq_In 0x00003082
#define ForwardSMRsp_In 0x80003082
#define ForwardSRReq_In 0x00003083
#define ForwardSRRsp_In 0x80003083
#define QueryServiceReq_In 0x00003081
#define QueryServiceRsp_In 0x80003081
#define ForwardSMReq_Out 0x00003092
#define ForwardSMRsp_Out 0x80003092
#define ForwardSRReq_Out 0x00003093
#define ForwardSRRsp_Out 0x80003093
#define QueryServiceReq_Out 0x00003091
#define QueryServiceRsp_Out 0x80003091
*//*
int traceComID =mspSm.getCommandId();
switch (mspSm.getCommandId()) {
case MspgwMessage.MSP_FORWARD_SM:
MspForwardSm mds = (MspForwardSm) mspSm.getSmMessage();
dataCoding = mds.getDataCoding();
smMessage = mds.getMessage();
srcMsisdn = mds.getSourceAddr();
destMsisdn = mds.getDestinationAddr();
traceComID = TraceSmResult.ForwardSMReq_Out;
break;
case MspgwMessage.MSP_FORWARD_SM_RESP:
traceComID = TraceSmResult.ForwardSMRsp_Out;
// 该消息无,dataCoding,message,主叫,被叫字段
break;
case MspgwMessage.MSP_FORWARD_SR:
traceComID = TraceSmResult.ForwardSRReq_Out;
// 该消息无,dataCoding,message,主叫,被叫字段
break;
case MspgwMessage.MSP_FORWARD_SR_RESP:
traceComID = TraceSmResult.ForwardSRRsp_Out;
// 该消息无,dataCoding,message,主叫,被叫字段
break;
case MspgwMessage.MSP_QUERY_SERVICE:
// 该消息无,dataCoding,message,主叫,被叫字段
MspQueryService mqs = (MspQueryService) mspSm.getSmMessage();
destMsisdn = mqs.getQueryAddr();
traceComID = TraceSmResult.QueryServiceReq_Out;
break;
case MspgwMessage.MSP_QUERY_SERVICE_RESP:
traceComID = TraceSmResult.QueryServiceRsp_Out;
// 该消息无,dataCoding,message,主叫,被叫字段
break;
default:
log.warn("未知道消息包:" + mspSm);
}
// 对流程追踪的步骤加1
mspSm.setTraceStepNo((mspSm.getTraceStepNo() + 1));
sendTraceUDP(mspSm, srcConn.getConnID(), srcConn.getConnName(),
dataCoding, smMessage, srcMsisdn, destMsisdn,traceComID);
} catch (Exception e) {
log.error("处理发送流程追踪消息错误",e);
}*/
}
/**
* 把消息发送到SMSC
*
* @param msg
* 消息对象
*/
private boolean sendToLocInfoMgr(LeMessage msg, Connection conn) {
if (conn == null || !conn.isConnected()) {
super.notifyLinkStatusToLocInfoMgr(conn);
log.error("发送消息失败:外部实体未连接上:" + conn + msg);
return false;
}
// 测试的时候使用,记录消息包的数量
MessageCount msgCon=new MessageCount(msg.getOrgSystemID()+"",msg.getCommandId());
msgCon.incrementReceive(msg.getOrgSystemID(),msg.getCommandId());
try {
if (!conn.sendMessage(msg)) {
// 记录失败信息,并进行相应处理
log.error("该消息发送失败实体:" + conn + msg);
return false;
}
} catch (Exception e) {
log.error("发送消息异常,实体:" + conn + msg, e);
// 不关闭链路,链路状态统一由链路状态维护线程统一管理
return false;
}
return true;
}
public void sendToLocInfoMgr(LeMessage msg, int serverId) {
sendToLocInfoMgr(msg, getLocInfoMgrConnection(serverId));
}
/**
* 数据数据库同步实体信息
*/
private void syncEmseInfo() {
/* List emseList = null;
try {
emseList = emseDao.getEmseInfoList(Constant.EMSE_TYPE_MSPGW,
runIntfSeq);
} catch (SQLException sqle) {
log.error("emseDao查询实体信息列表出错", sqle);
// 写日志
}
lastSynchEmseInfoTime = System.currentTimeMillis();
if (emseList == null) {
log.warn("实体信息表中没有实体类型为MSPGW网关的记录:" + Constant.EMSE_TYPE_MSPGW);
return;
}
如果实体表中的实体被删除,需要特殊处理
for (int i = 0; i < emseList.size(); i++) {
LocInfoMgr locInfoMgr = (LocInfoMgr) emseList.get(i);
if (clientConns.get(locInfoMgr.getEmseId()) != null) {
Connection conn = (Connection) clientConns.get(locInfoMgr
.getEmseId());
conn.getConfig().setEmseInfo(locInfoMgr);
} else {
addNewEmseConn(locInfoMgr, clientConns);
}
// 更新服务端实体信息
if (serverConns.get(locInfoMgr.getEmseId()) != null) {
Connection conn = (Connection) serverConns.get(locInfoMgr
.getEmseId());
conn.getConfig().setEmseInfo(locInfoMgr);
}
}*/
}
/**
* 取得外部实体的连接数量
*
* @return int 外部实体的数量
*/
public int getEmseCount() {
return emseCount;
}
/**
* 添加新的客户端
*
* @param sc
* 连接客户端
*/
public void addNewLocInfoMgrConn(SocketChannel sc) {
// 根据本地端口判断新连接的是否为msp网关接入,否则为收发模块登录
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -