📄 cmppconnection.java
字号:
+"\n"+"spId:"+sm.spId
+"\n"+"msgid:"+sm.msgId
+"\n"+"msgServiceType:"+sm.msgServiceType
+"\n"+"msgMode:"+sm.msgMode
+"\n"+"msgSrcAddr:"+sm.msgSrcAddr
+"\n"+"msgDestAddr:"+sm.msgDestAddr
+"\n"+"msgDataCoding:"+sm.msgDataCoding
+"\n"+"msgDeliverTime:"+sm.msgDeliverTime
+"\n"+"msgSmLength:"+sm.msgLength
+"\n"+"msgContent:"+sm.msgContent,2);
break;
case CmppConstants.Cmpp_Deliver_Resp:
PublicConstants.writeLog.info( "Cmpp Send <Cmpp_Deliver_Resp> to ISMG ." +" status:"+sm.headCmdStatus+" headSeqcNo:"+sm.headSeqcNo,0 );
break;
default:
break;
}
}
/**
* 关闭连接
*/
public void shutDown() {
/*if( (myConnType==CmppConstants.CLIENT)&&
(! mystate.equals(Constants.WORKING)) ){
submitLogout();
PublicFuction.threadSleep(2000) ;
}*/
myShutdownFlag = true;
disconnect();
//myDelFlag = true;
//(CmppConnectionManager.instance()).delConncection(myid);
System.out.println("Disconnect "+myid);
PublicConstants.writeLog.info("Disconnect "+myid,0);
}
//Add cmppMsg into Vector
public boolean addCmppMsg(ShortMsgCmpp cmppMsg) {
try {
if(vcmppMsg1.size()+vcmppMsg2.size() > (ComConstants.CmppVECTOR_NUM-10) )
return false;
if(vcmppMsgFlag ==1 ) {
vcmppMsg1.add(cmppMsg);
} else {
vcmppMsg2.add(cmppMsg);
}
return true;
} catch(Exception e) {
PublicConstants.writeLog.error("Add cmppMsg to Vector Exception:"+e.toString());
return false;
}
}
public int getNotDealMsgNum(){
try{
return vcmppMsg1.size()+vcmppMsg2.size();
}catch (Exception e){
return -1;
}
}
public int getNotSubmitMsgNum(){
try{
return vCmppSubmitMsg.size();
}catch (Exception e){
return -1;
}
}
//Handle cmppMsg
private void handleCmppMsg(ShortMsgCmpp cmpp_sm) {
int unPackCode = CmppConstants.error;
boolean sendToSmsc = true;
switch(cmpp_sm.getCommandId()) {
case CmppConstants.Cmpp_Submit_Resp: //(正常Cmpp_Submit回应)
//handleSubmitResp(sm);
synchronized(vCmppSubmitMsg)
{
//System.out.println("Handle Cmpp_Submit_Resp");
for(int i=0; i< vCmppSubmitMsg.size(); i++) {
try {
if(cmpp_sm.headSeqcNo==((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headSeqcNo)
{
((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).submit_msg_id=cmpp_sm.submit_msg_id;
if(cmpp_sm.activeResult!=8)
{
((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headCmdStatus=(int)cmpp_sm.activeResult;
if(!CmppConnectionManager.instance().saveCmppData.saveCmppSubmitMsg((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)))
{
PublicConstants.writeLog.error("have save a submit Msg Fualt!");
}
else{
PublicConstants.writeLog.info("CmppConnection::handleCmppMsg Save submit Msg Success!",2);
}
vCmppSubmitMsg.remove(i);
}
else
{
((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headCmdStatus=(int)cmpp_sm.activeResult;
}
break;
}
} catch(Exception e) {
PublicConstants.writeLog.error("Handle Cmpp_Submit_Resp Exception:"+e.toString());
}
}
//PublicFuction.threadSleep(500);
}
break;
case CmppConstants.Cmpp_Deliver: //(正常Cmpp_Deliver)
PublicConstants.writeLog.info("handleCmppMsg Cmpp_Deliver :"+cmpp_sm.msgDestAddr,1 );
if(!CmppConnectionManager.instance().saveCmppData.saveCmppDeliverMsg(cmpp_sm))
{
PublicConstants.writeLog.info("Save Delieve Msg erro"+cmpp_sm.msgSrcAddr,0);
}
try {
TestMOnum1 ++ ;
if( ( TestMOnum1 %200 ) == 1 )
System.out.println("Cmpp_Deliver :"+TestMOnum1 +" handleCmppMsg ...");
cmpp_sm.msgContent=cmpp_sm.msgContent.trim();
//发送到业务处理模块
com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("Send a message to OperationMaster",1);
com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgSrcAddr=="+cmpp_sm.msgSrcAddr,1);
com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgContent=="+cmpp_sm.msgContent,1);
com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgDestAddr=="+cmpp_sm.msgDestAddr,1);
ReceiveDataBean recvData = new ReceiveDataBean();
recvData.ismgMsgId = cmpp_sm.msgId ;
recvData.destAddr = cmpp_sm.msgDestAddr ;
recvData.servieceType = cmpp_sm.msgServiceType ;
recvData.srcAddr = cmpp_sm.msgSrcAddr ;
recvData.registeredDelivery = cmpp_sm.msgMode ;
cmpp_sm.msgContent =cmpp_sm.msgContent.trim();
recvData.msgContent = cmpp_sm.msgContent;
recvData.linkId = cmpp_sm.msgLinkId;
com.wayout.wayoutsp.operation.OperManager.getInstance().handReceiverData(recvData);
} catch (Exception e) {
sendToSmsc = false;
PublicConstants.writeLog.error("Cmpp call smpp sendMsg: "+e.toString() );
}
break;
case CmppConstants.Cmpp_Report: //(正常Cmpp_Report)
PublicConstants.writeLog.info("handleCmppMsg Cmpp_Report :"+cmpp_sm.msgDestAddr,1 );
if(!CmppConnectionManager.instance().saveCmppData.saveCmppReportMsg(cmpp_sm))
{
PublicConstants.writeLog.info("SaveCmpp_Report Msg erro"+cmpp_sm.msgSrcAddr,0);
}
break;
}
}
/**
* 处理数据线程<br>
* 从缓冲中读取数据,并处理<br>
* 交替处理两个缓冲
*/
public void handleMsgThread() {
Runnable r=new Runnable() {
public void run() {
ShortMsgCmpp cmpp_sm = null;
//System.out.println("Cmpp a readVector Thread started!");
while (!myShutdownFlag)//while(1) avoid exception
{
try {
if(vcmppMsgFlag ==1)// then deal the vector 2
{
for(int i=0; i< vcmppMsg2.size(); i++) {
try {
cmpp_sm=(ShortMsgCmpp)vcmppMsg2.elementAt(i);
handleCmppMsg(cmpp_sm);
} catch(Exception e) {
PublicConstants.writeLog.error("Handle cmppMsg Exception:"+e.toString());
}
}
if( vcmppMsg2.size() >0 )
vcmppMsg2.removeAllElements() ;
PublicFuction.threadSleep(500);
if(vcmppMsg1.size() <= 0 )
PublicFuction.threadSleep(1000);
vcmppMsgFlag =2;
} else //deal vector 1
{
for(int i=0; i< vcmppMsg1.size(); i++) {
try {
cmpp_sm=(ShortMsgCmpp)vcmppMsg1.elementAt(i);
handleCmppMsg(cmpp_sm);
} catch(Exception e) {
PublicConstants.writeLog.error("Handle cmppMsg Exception:"+e.toString());
}
}
if( vcmppMsg1.size() > 0)
vcmppMsg1.removeAllElements() ;
PublicFuction.threadSleep(500);
if(vcmppMsg2.size() <= 0 )
PublicFuction.threadSleep(1000);
vcmppMsgFlag =1;
}
} catch(Exception e) {
PublicConstants.writeLog .error("Cmpp handleMsgThread exception ."+e) ;
}
}//end while(1)
PublicConstants.writeLog.info("Cmpp handleMsgThread exit ! myid:"+myid,0) ;
}
};
readVectorThread = new Thread(r);
readVectorThread.start();
}
/**
* 启动重发线程<br>
* 判断需重发数据和过期数据,并进行相应处理<br>
*/
private void resendMsgThread() {
Runnable r=new Runnable() {
public void run() {
while (!myShutdownFlag)//while(1) avoid exception
{
try {
int number =0;
boolean sleep = false;
while(number < vCmppSubmitMsg.size())
{
ShortMsgCmpp smc = null;
synchronized(vCmppSubmitMsg){
long currentTime=System.currentTimeMillis();
if( ( ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).sendCount >= 3 ) //ComConstants.CmppRESEND_COUNT )
&& (currentTime - ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime>150*1000 ) ) //ComConstants.CmppRESEND_INTERVAL*1000) )
{
if(!CmppConnectionManager.instance().saveCmppData.saveCmppSubmitMsg((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)))
{
PublicConstants.writeLog.info("Resend Msg saving a submit Msg Fualt!",0);
}
vCmppSubmitMsg.remove(number);
}
else if( //( ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).headCmdStatus==8 ) &&
( (currentTime-((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime)>150*1000 ) ) //ComConstants.CmppRESEND_INTERVAL*1000) )
{
( (ShortMsgCmpp)vCmppSubmitMsg.elementAt(number) ).headSeqcNo = ComConstants.getSeqNo();
smc = (ShortMsgCmpp)vCmppSubmitMsg.elementAt(number);
System.out.println("Resend... seq:"+smc.headSeqcNo+" tel:"+smc.msgDestAddr );
PublicConstants.writeLog.info("Resend...",0);
//send(((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).getPackage());
((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime=currentTime;
((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).sendCount++;
sleep = true;
}
}
if ( smc != null )
send ( smc.getPackage() );
PublicFuction.threadSleep(300);
if(sleep)
PublicFuction.threadSleep(1000);
number ++;
sleep = false;
}
} catch(Exception e) {
PublicConstants.writeLog .error("Cmpp resendMsgThread exception ."+e) ;
//PublicFuction.threadSleep(ComConstants.CmppRESEND_SLEEPTIME);
}
PublicFuction.threadSleep(10000);
}//end while(1)
PublicConstants.writeLog.info("Cmpp resendMsgThread exit ! myid:"+myid,0) ;
}
};
resendThread = new Thread(r);
resendThread.start();
}
//private CmppSaveData saveData = null;
//for deliver response package
private static ShortMsgCmpp deliverRespSM = new ShortMsgCmpp();
private long mylastactivetime; //记录连接上次活动时间,判断是否发Active
private boolean myShutdownFlag = false;
private boolean myDelFlag = false;//是否需从路由中删除该连接
private String mystate;
private String myid;
private String myip;
private int myport;
private Socket mySocket;
private InputStream myinputstream;
private OutputStream myoutputstream;
public DataInputStream dataInStream;
//public DataOutputStream dataOutStream;
private Object _lock = new Object();
private byte myLastVersion;
private byte myBindType;
private byte myConnType; // as client or server
private byte[] myMBEauth; // save the submitlogin's auth
//private byte[] myMBFMauth;
private String myMBEname;
private volatile Vector vcmppMsg1=null;
private volatile Vector vcmppMsg2=null;
private volatile Vector vCmppSubmitMsg=null;
private int vcmppMsgFlag;
private Thread resendThread;
private Thread readVectorThread;
public int mysign =0; //for test
private volatile static int TestMOnum = 0;
private volatile static int TestMOnum1 = 0;
private volatile static int TestReportnum = 0;
private volatile static int TestMTnum = 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -