📄 connectionserver.java
字号:
package com.wayout.wayoutsp.communication;/** * Cmpp连接器 * <p>Title: CmppConnection Cmpp连接器</p> * <p>Description: 负责依照Cmpp协议与相关实体(如SP、MPCA等)建立连接,进行收包和发包</p> * <p>Copyright: 通讯小组 Copyright (c) 2002.7</p> * <p>Company: wayout</p> * @author wayout * @version 1.0 */import java.io.*;import java.net.*;import java.util.*;import com.wayout.wayoutsp.publics.*;public class ConnectionServer extends Thread { /** * 建立MPCP连接,以服务端身份<br> * 构造数据缓冲<br> * 启动处理数据线程<br> * @param socket 传入socket描述符 */ public ConnectionServer() { try { vcmppMsg1=new Vector(100,100); vcmppMsg2=new Vector(100,100); vcmppMsgFlag = 1; //first use vector 1 handleMsgThread();// cmpp_sm = new ShortMsgCmpp(); } catch (Exception e) { System.out.println("ConnectionServer(): " + e.toString()); } } public boolean setupSocket(Socket socket) { mySocket = socket; myLastVersion = 0; try { mySocket.setSoTimeout(ComConstants.CmppRCVSOCK_TIMEOUT); myinputstream = mySocket.getInputStream() ; myoutputstream = mySocket.getOutputStream() ; dataInStream = new DataInputStream(myinputstream); myShutdownFlag = false; return true; } catch (Exception e) { System.out.println("CmppConnection(Socket): " + e.toString()); return false; }} //connect Cmpp //if Exception then throws public boolean connect() {//throws Exception { try{ mySocket = new Socket( myip , myport ); mySocket.setSoTimeout(ComConstants.CmppRCVSOCK_TIMEOUT); //mySocket.setReceiveBufferSize(8192); //mySocket.setSendBufferSize(8192); myinputstream = mySocket.getInputStream() ; myoutputstream = mySocket.getOutputStream() ; dataInStream = new DataInputStream(myinputstream); //dataOutStream = new DataOutputStream(outStream); return true; }catch (Exception e){ disconnect(); mystate= ComConstants.NOTCONNECTED; PublicConstants.writeLog.info("Cmpp Connected ISMG fail(bindtype:"+myBindType+"):"+e.toString(),0); return false; } } /** * 在本连接上发送消息 * 若连接异常,则线程退出,并更新路由表 * @param sm ShortMsgCmpp对象 */ public void send(ShortMsgCmpp sm) { try { debugMsg(sm); byte[] bb = sm.getPackage(); if ( bb == null ) { PublicConstants.writeLog.info("Cmpp send pack is null .",0); return; } if ( (bb.length >ComConstants.Cmpp_MAX_MSG_LEN) || (bb.length <ComConstants.Cmpp_MIN_MSG_LEN) ) { PublicConstants.writeLog.info("Cmpp reSend pack length error : "+bb.length,0 ); return; } if( PublicConstants.PackDEBUGLevel > 0 )//输出报文十六进制代码 PublicConstants.writeLog.packDebug("Cmpp SEND to :"+myid+"(bindType:"+myBindType+") \t",bb); if(sm.headCmdID==CmppConstants.Cmpp_Submit) { TestMTnum ++; if( TestMTnum %200 ==1 ) System.out.println("Cmpp_Submit :"+TestMTnum+" sended :"+System.currentTimeMillis()); sm.firstSendtime=System.currentTimeMillis(); sm.lastSendtime=System.currentTimeMillis(); sm.sendCount=1; synchronized(vCmppSubmitMsg) { //PublicConstants.writeLog.info("vCmppSubmitMsg add a Submit ShortMsg"); vCmppSubmitMsg.add(sm); } } //if ( !getStatus().equals(ComConstants.WORKING) ) { // return ; //}// synchronized ( _lock ){ myoutputstream.write( bb ); myoutputstream.flush() ;// } PublicFuction.threadSleep(10); } catch(NullPointerException e) // bb is null { PublicConstants.writeLog.info ("Cmpp "+myid+" send pack is null: "+e.toString(),0 ) ; } catch(SocketException e) { PublicConstants.writeLog.info("Cmpp "+myid+" send disconnected SocketException: "+e.toString(),0); disconnect(); } catch (IOException e) { PublicConstants.writeLog.info("Cmpp "+myid+" send IOException: "+e.toString(),0); disconnect(); } catch (Exception e) { PublicConstants.writeLog.info("Cmpp "+myid+" send disconnected Exception: "+e.toString(),0); disconnect();} } /** * 在本连接上发送消息 * 若连接异常,则线程退出,并更新路由表 * @param bb 字节流 (用于重发消息) */ public void send(byte[] bb) { try { if ( bb == null ) { PublicConstants.writeLog.info("Cmpp reSend pack is null .",0); return; } if ( (bb.length >ComConstants.Cmpp_MAX_MSG_LEN) || (bb.length <ComConstants.Cmpp_MIN_MSG_LEN) ) { PublicConstants.writeLog.info("Cmpp reSend pack length error : "+bb.length,0 ); return; } //System.out.println( "Cmpp reSend a pack to "+ myid+" . sendlen :"+ bb.length ); /*if( Constants.CmppPACK_DEBUG ) PublicConstants.writeLog.packDebug("Cmpp RESEND \t",bb);*/ synchronized ( _lock ){ myoutputstream.write( bb ); myoutputstream.flush() ; } } catch(NullPointerException e) // bb is null { PublicConstants.writeLog.info ("Cmpp "+myid+" reSend pack is null: "+e.getMessage(),0 ) ; } catch(SocketException e) { //System.out.println ("Cmpp "+myid+" reSend disconnected: "+e.getMessage() ) ; PublicConstants.writeLog.info("Cmpp "+myid+" reSend disconnected: "+e.getMessage(),0); disconnect();// myDelFlag = true;// (CmppConnectionManager.instance()).delConncection(myid);// myShutdownFlag = true; } catch (IOException e) { PublicConstants.writeLog.info ("Cmpp "+myid+" reSend disconnected: "+e.getMessage(),0 ) ; disconnect();// myDelFlag = true;// (CmppConnectionManager.instance()).delConncection(myid);// myShutdownFlag = true; } } /** * 接收数据线程<br> * 接收阻塞,收取完整数据包后调用处理数据方法<br> */ public void run() { mylastactivetime=System.currentTimeMillis(); while (!myShutdownFlag) { if(mystate != ComConstants.NOTCONNECTED ){ try { byte[] buf = recv(); //System.out.println("Cmpp received a pack len: "+ (buf.length+4)); if ( buf.length == 1 ) { if( buf[0]==ComConstants.ERROR_INTERRUPT ) { //System.out.println("Cmpp "+myid+ " sock int .!!!!!!!!!!!!!!1 "); if ( ( System.currentTimeMillis()-mylastactivetime > 60*1000) && (mystate.equals(ComConstants.WORKING) ) ) {// shutDown();// PublicConstants.writeLog.info("Conn Thread Stop OK........"); continue; } else { //timeout disconnect //disconnect(); //myDelFlag = true; // (CmppConnectionManager.instance()).delConncection(myid); //PublicConstants.writeLog.error("Cmpp "+myid+ " sock timeout . Disconnected . "); continue ; //return; } } else if( buf[0]==ComConstants.ERROR_SOCKET ) { disconnect(); //myDelFlag = true;// (CmppConnectionManager.instance()).delConncection(myid); System.out.println("Cmpp ERROR_SOCKET"+myid+ " sock error . Disconnected .") ; PublicConstants.writeLog.error("Cmpp ERROR_SOCKET"+myid+ " sock error . Disconnected ."); continue ; //return; } else continue; // for future } //PublicFuction.threadSleep(200); mylastactivetime = System.currentTimeMillis() ; handle(buf); } catch (Exception ee) { PublicConstants.writeLog .error("Cmpp "+myid+ " handle error: "+ee.toString() ); } }else{ PublicFuction.threadSleep(2000) ; } } // end while } /** * 在本连接上接收数据 * 根据SMPP报文的长度字段从输入数据流中读取相应的报文 * @return * receivedData <br> * receivedData[0]=ERROR_SOCKET SOCKET出错<br> * receivedData[0]=ERROR_INTERRUPT SOCKET超时中断<br> */ public byte[] recv()//接受的数据不包含总共长度 { byte[] receivedData; try { int dataSize = dataInStream.readInt();// if( (dataSize<20)||(dataSize>ComConstants.Cmpp_MAX_MSG_LEN) ) {// receivedData = new byte[1];// receivedData[0] = ComConstants.ERROR_SOCKET ;// return receivedData;// } receivedData = new byte[dataSize - 4]; //注意:recievedata 不包含消息包长度这四个字节 int dataTotalSizeToRead = dataSize - 4; int dataToReadLeft = dataSize - 4; int dataThisTimeRead; while(dataToReadLeft > 0) { dataThisTimeRead = dataInStream.read(receivedData,dataTotalSizeToRead - dataToReadLeft,dataToReadLeft); dataToReadLeft -= dataThisTimeRead; } } catch(NullPointerException e) { PublicConstants.writeLog.error("Cmpp recv connection null: "+e.toString() ) ; receivedData = new byte[1]; receivedData[0] = ComConstants.ERROR_SOCKET ; } catch(InterruptedIOException e) { receivedData = new byte[1]; receivedData[0] = ComConstants.ERROR_INTERRUPT ; } catch(EOFException e) { receivedData = new byte[1]; receivedData[0] = ComConstants.ERROR_INTERRUPT ; } catch(Exception e) { PublicConstants.writeLog.error("Cmpp recv socket: "+e.toString() ) ; System.out.println("Cmpp recv socket: "+e.toString() ) ; receivedData = new byte[1]; receivedData[0] = ComConstants.ERROR_SOCKET ; } return receivedData; } /** * 处理连接上收取的数据<br> * 解包<br> * 将下行消息写入数据缓冲,使用两个缓冲,交替使用<br> * @param rcvdata 未解包数据流 */ public synchronized void handle(byte[] rcvdata) { int unPackCode = CmppConstants.error; int ret; boolean sendToSmsc = true; if( PublicConstants.PackDEBUGLevel > 0 )//输出报文十六进制代码 PublicConstants.writeLog.packDebug("Cmpp RECV from "+myid+"(bindType:"+myBindType+" len:"+(rcvdata.length+4)+")",rcvdata); /*if( Constants.CmppPACK_DEBUG ){ System.out.println(); ` System.out.println("Cmpp receive packet:"); for(int i=0 ; i<rcvdata.length ; i++ ) System.out.print("["+rcvdata[i]+"] "); System.out.println(); }*/ cmpp_sm = new ShortMsgCmpp(); cmpp_sm.parsePackage( rcvdata );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -