📄 connectionserver.java
字号:
+"\n"+"msgSrcAddr:"+sm.msgSrcAddr +"\n"+"msgDestAddr:"+sm.msgDestAddr +"\n"+"msgDataCoding:"+sm.msgDataCoding +"\n"+"msgDeliverTime:"+sm.msgDeliverTime +"\n"+"msgSmLength:"+sm.msgLength +"\n"+"msgContent:"+sm.msgContent,2); break; case CmppConstants.Cmpp_Deliver_Resp: PublicConstants.writeLog.info( "Cmpp Send <Cmpp_Deliver_Resp> to ISMG ." +" status:"+sm.headCmdStatus+" headSeqcNo:"+sm.headSeqcNo,0 ); break; default: break; } } /** * 关闭连接 */ public void shutDown() { /*if( (myConnType==CmppConstants.CLIENT)&& (! mystate.equals(Constants.WORKING)) ){ submitLogout(); PublicFuction.threadSleep(2000) ; }*/ myShutdownFlag = true; disconnect(); //myDelFlag = true; //(CmppConnectionManager.instance()).delConncection(myid); System.out.println("Disconnect "+myid); PublicConstants.writeLog.info("Disconnect "+myid,0); } //Add cmppMsg into Vector public boolean addCmppMsg(ShortMsgCmpp cmppMsg) { try { if(vcmppMsg1.size()+vcmppMsg2.size() > (ComConstants.CmppVECTOR_NUM-10) ) return false; if(vcmppMsgFlag ==1 ) { vcmppMsg1.add(cmppMsg); } else { vcmppMsg2.add(cmppMsg); } return true; } catch(Exception e) { PublicConstants.writeLog.error("Add cmppMsg to Vector Exception:"+e.toString()); return false; } } public int getNotDealMsgNum(){ try{ return vcmppMsg1.size()+vcmppMsg2.size(); }catch (Exception e){ return -1; } } public int getNotSubmitMsgNum(){ try{ return vCmppSubmitMsg.size(); }catch (Exception e){ return -1; } } //Handle cmppMsg private void handleCmppMsg(ShortMsgCmpp cmpp_sm) { int unPackCode = CmppConstants.error; boolean sendToSmsc = true; switch(cmpp_sm.getCommandId()) { case CmppConstants.Cmpp_Submit_Resp: //(正常Cmpp_Submit回应) //handleSubmitResp(sm); synchronized(vCmppSubmitMsg) { //System.out.println("Handle Cmpp_Submit_Resp"); for(int i=0; i< vCmppSubmitMsg.size(); i++) { try { if(cmpp_sm.headSeqcNo==((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headSeqcNo) { ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).submit_msg_id=cmpp_sm.submit_msg_id; if(cmpp_sm.activeResult!=8) { ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headCmdStatus=(int)cmpp_sm.activeResult; if(!CmppConnectionManager.instance().saveCmppData.saveCmppSubmitMsg((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i))) { PublicConstants.writeLog.error("have save a submit Msg Fualt!"); } else{ PublicConstants.writeLog.info("CmppConnection::handleCmppMsg Save submit Msg Success!!!",2); } vCmppSubmitMsg.remove(i); } else { ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(i)).headCmdStatus=(int)cmpp_sm.activeResult; } break; } } catch(Exception e) { PublicConstants.writeLog.error("Handle Cmpp_Submit_Resp Exception:"+e.toString()); } } //PublicFuction.threadSleep(500); } break; case CmppConstants.Cmpp_Deliver: //(正常Cmpp_Deliver) PublicConstants.writeLog.info("handleCmppMsg Cmpp_Deliver :"+cmpp_sm.msgDestAddr,1 ); if(!CmppConnectionManager.instance().saveCmppData.saveCmppDeliverMsg(cmpp_sm)) { PublicConstants.writeLog.info("Save Delieve Msg erro"+cmpp_sm.msgSrcAddr,0); } try { TestMOnum1 ++ ; if( ( TestMOnum1 %200 ) == 1 ) System.out.println("Cmpp_Deliver :"+TestMOnum1 +" handleCmppMsg ..."); cmpp_sm.msgContent=cmpp_sm.msgContent.trim(); //发送到业务处理模块 com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("Send a message to OperationMaster",1); com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgSrcAddr=="+cmpp_sm.msgSrcAddr,1); com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgContent=="+cmpp_sm.msgContent,1); com.wayout.wayoutsp.publics.PublicConstants.writeLog.info("cmpp_sm.msgDestAddr=="+cmpp_sm.msgDestAddr,1); ReceiveDataBean recvData = new ReceiveDataBean(); recvData.ismgMsgId = cmpp_sm.msgId ; recvData.destAddr = cmpp_sm.msgDestAddr ; recvData.servieceType = cmpp_sm.msgServiceType ; recvData.srcAddr = cmpp_sm.msgSrcAddr ; recvData.registeredDelivery = cmpp_sm.msgMode ; cmpp_sm.msgContent =cmpp_sm.msgContent.trim(); recvData.msgContent = cmpp_sm.msgContent; recvData.linkId = cmpp_sm.msgLinkId; com.wayout.wayoutsp.operation.OperManager.getInstance().handReceiverData(recvData); } catch (Exception e) { sendToSmsc = false; PublicConstants.writeLog.error("Cmpp call smpp sendMsg: "+e.toString() ); } break; case CmppConstants.Cmpp_Report: //(正常Cmpp_Report) PublicConstants.writeLog.info("handleCmppMsg Cmpp_Report :"+cmpp_sm.msgDestAddr,1 ); if(!CmppConnectionManager.instance().saveCmppData.saveCmppReportMsg(cmpp_sm)) { PublicConstants.writeLog.info("SaveCmpp_Report Msg erro"+cmpp_sm.msgSrcAddr,0); } break; } } /** * 处理数据线程<br> * 从缓冲中读取数据,并处理<br> * 交替处理两个缓冲 */ public void handleMsgThread() { Runnable r=new Runnable() { public void run() { ShortMsgCmpp cmpp_sm = null; //System.out.println("Cmpp a readVector Thread started!"); while (!myShutdownFlag)//while(1) avoid exception { try { if(vcmppMsgFlag ==1)// then deal the vector 2 {// System.out.println("MO handle vector 1 "); for(int i=0; i< vcmppMsg2.size(); i++) { try { cmpp_sm=(ShortMsgCmpp)vcmppMsg2.elementAt(i); handleCmppMsg(cmpp_sm); } catch(Exception e) { PublicConstants.writeLog.error("Handle cmppMsg Exception:"+e.toString()); } } if( vcmppMsg2.size() >0 ) vcmppMsg2.removeAllElements() ; PublicFuction.threadSleep(500); if(vcmppMsg1.size() <= 0 ) PublicFuction.threadSleep(1000); vcmppMsgFlag =2; } else //deal vector 1 {// System.out.println("MO handle vector 2 "); for(int i=0; i< vcmppMsg1.size(); i++) { try { cmpp_sm=(ShortMsgCmpp)vcmppMsg1.elementAt(i); handleCmppMsg(cmpp_sm); } catch(Exception e) { PublicConstants.writeLog.error("Handle cmppMsg Exception:"+e.toString()); } } if( vcmppMsg1.size() > 0) vcmppMsg1.removeAllElements() ; PublicFuction.threadSleep(500); if(vcmppMsg2.size() <= 0 ) PublicFuction.threadSleep(1000); vcmppMsgFlag =1; } } catch(Exception e) { PublicConstants.writeLog .error("Cmpp handleMsgThread exception ."+e) ; } }//end while(1) // PublicConstants.writeLog.info("Cmpp handleMsgThread exit ! myid:"+myid,0) ; } }; readVectorThread = new Thread(r); readVectorThread.start(); } /** * 启动重发线程<br> * 判断需重发数据和过期数据,并进行相应处理<br> */ private void resendMsgThread() { Runnable r=new Runnable() { public void run() { while (!myShutdownFlag)//while(1) avoid exception { try { int number =0; boolean sleep = false; while(number < vCmppSubmitMsg.size()) { ShortMsgCmpp smc = null; synchronized(vCmppSubmitMsg){ long currentTime=System.currentTimeMillis(); if( ( ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).sendCount >= 3 ) //ComConstants.CmppRESEND_COUNT ) && (currentTime - ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime>150*1000 ) ) //ComConstants.CmppRESEND_INTERVAL*1000) ) { if(!CmppConnectionManager.instance().saveCmppData.saveCmppSubmitMsg((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number))) { PublicConstants.writeLog.info("Resend Msg saving a submit Msg Fualt!",0); } vCmppSubmitMsg.remove(number); } else if( //( ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).headCmdStatus==8 ) && ( (currentTime-((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime)>150*1000 ) ) //ComConstants.CmppRESEND_INTERVAL*1000) ) { ( (ShortMsgCmpp)vCmppSubmitMsg.elementAt(number) ).headSeqcNo = ComConstants.getSeqNo(); smc = (ShortMsgCmpp)vCmppSubmitMsg.elementAt(number); System.out.println("Resend... seq:"+smc.headSeqcNo+" tel:"+smc.msgDestAddr ); PublicConstants.writeLog.info("Resend...",0); //send(((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).getPackage()); ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).lastSendtime=currentTime; ((ShortMsgCmpp)vCmppSubmitMsg.elementAt(number)).sendCount++; sleep = true; } } if ( smc != null ) send ( smc.getPackage() ); PublicFuction.threadSleep(300); if(sleep) PublicFuction.threadSleep(1000); number ++; sleep = false; } } catch(Exception e) { PublicConstants.writeLog .error("Cmpp resendMsgThread exception ."+e) ; //PublicFuction.threadSleep(ComConstants.CmppRESEND_SLEEPTIME); } PublicFuction.threadSleep(10000); }//end while(1) PublicConstants.writeLog.info("Cmpp resendMsgThread exit ! myid:"+myid,0) ; } }; resendThread = new Thread(r); resendThread.start(); } //private CmppSaveData saveData = null; //for deliver response package private static ShortMsgCmpp deliverRespSM = new ShortMsgCmpp(); private long mylastactivetime; //记录连接上次活动时间,判断是否发Active private static boolean myShutdownFlag = false; private boolean myDelFlag = false;//是否需从路由中删除该连接 private String mystate; private String myid; private String myip; private int myport; private Socket mySocket; private InputStream myinputstream; private OutputStream myoutputstream; public DataInputStream dataInStream; //public DataOutputStream dataOutStream; private Object _lock = new Object(); private byte myLastVersion; private byte myBindType; private byte myConnType; // as client or server private byte[] myMBEauth; // save the submitlogin's auth //private byte[] myMBFMauth; private String myMBEname; private volatile Vector vcmppMsg1=null; private volatile Vector vcmppMsg2=null; private volatile Vector vCmppSubmitMsg=null; private int vcmppMsgFlag; private Thread resendThread; private Thread readVectorThread; public int mysign =0; //for test private ShortMsgCmpp cmpp_sm; private volatile static int TestMOnum = 0; private volatile static int TestMOnum1 = 0; private volatile static int TestReportnum = 0; private volatile static int TestMTnum = 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -