📄 cmppconnection.java
字号:
package com.wayout.wayoutsp.communication;
/**
* Cmpp连接器
* <p>Title: CmppConnection Cmpp连接器</p>
* <p>Description: 负责依照Cmpp协议与相关实体(如SP、MPCA等)建立连接,进行收包和发包</p>
* <p>Copyright: 通讯小组 Copyright (c) 2002.7</p>
* <p>Company: wayout</p>
* @author wayout
* @version 1.0
*/
import java.io.*;
import java.net.*;
import java.util.*;
import com.wayout.wayoutsp.publics.*;
public class CmppConnection extends Thread {
/**
* 建立Cmpp连接,以客户端身份连接服务器端<br>
* 构造数据缓冲<br>
* 启动处理数据线程(从缓冲读出数据,之后处理)<br>
* 发送Login命令登陆服务器<br>
* @param name 服务器名称
* @param id 服务器ID
* @param ip 服务器IP
* @param port 服务器服务端口
* @param bindtype 连接绑定类型
* @param iii 调试用标志
* @throws Exception 建立连接失败
*/
public CmppConnection(String name, String id, String ip, int port, byte bindtype, int iii){// throws Exception {
mysign = iii;
//saveData = saveMsg;
//saveData = new CmppSaveData();
myMBEname = new String(name);
myid = new String(id);
myip = new String(ip);
myport = port;
myBindType = bindtype ;
mystate= ComConstants.NOTCONNECTED;
myLastVersion = 0;
myConnType = CmppConstants.CLIENT;
try {
vcmppMsg1=new Vector(ComConstants.CmppVECTOR_NUM/2,100);
vcmppMsg2=new Vector(ComConstants.CmppVECTOR_NUM/2,100);
vCmppSubmitMsg=new Vector(ComConstants.CmppVECTOR_NUM,100);
vcmppMsgFlag = 1; //first use vector 1
handleMsgThread();
resendMsgThread();
}
catch (Exception ex) {
PublicConstants.writeLog.info("Cmpp Create Connetion "+ex.toString(),0);
}
}
//connect Cmpp
//if Exception then throws
public boolean connect() {//throws Exception {
try{
mySocket = new Socket( myip , myport );
mySocket.setSoTimeout(ComConstants.CmppRCVSOCK_TIMEOUT);
//mySocket.setReceiveBufferSize(8192);
//mySocket.setSendBufferSize(8192);
myinputstream = mySocket.getInputStream() ;
myoutputstream = mySocket.getOutputStream() ;
dataInStream = new DataInputStream(myinputstream);
// dataOutStream = new DataOutputStream(myoutputstream);
return true;
}catch (Exception e){
disconnect();
mystate= ComConstants.NOTCONNECTED;
PublicConstants.writeLog.info("Cmpp Connected ISMG fail(bindtype:"+myBindType+"):"+e.toString(),0);
return false;
}
}
/**
* 在本连接上发送消息
* 若连接异常,则线程退出,并更新路由表
* @param sm ShortMsgCmpp对象
*/
public void send(ShortMsgCmpp sm) {
try {
debugMsg(sm);
byte[] bb = sm.getPackage();
if ( bb == null ) {
PublicConstants.writeLog.info("Cmpp send pack is null .",0);
return;
}
if ( (bb.length >ComConstants.Cmpp_MAX_MSG_LEN) || (bb.length <ComConstants.Cmpp_MIN_MSG_LEN) ) {
PublicConstants.writeLog.info("Cmpp reSend pack length error : "+bb.length,0 );
return;
}
if( PublicConstants.PackDEBUGLevel > 0 )//输出报文十六进制代码
PublicConstants.writeLog.packDebug("Cmpp SEND to :"+myid+"(bindType:"+myBindType+") \t",bb);
if(sm.headCmdID==CmppConstants.Cmpp_Submit)
{
TestMTnum ++;
if( TestMTnum %200 ==1 )
System.out.println("Cmpp_Submit :"+TestMTnum+" sended :"+System.currentTimeMillis());
sm.firstSendtime=System.currentTimeMillis();
sm.lastSendtime=System.currentTimeMillis();
sm.sendCount=1;
synchronized(vCmppSubmitMsg)
{
//PublicConstants.writeLog.info("vCmppSubmitMsg add a Submit ShortMsg");
vCmppSubmitMsg.add(sm);
}
}
//if ( !getStatus().equals(ComConstants.WORKING) ) {
// return ;
//}
synchronized ( _lock ){
myoutputstream.write( bb );
myoutputstream.flush() ;
}
PublicFuction.threadSleep(100);
} catch(NullPointerException e) // bb is null
{
PublicConstants.writeLog.info ("Cmpp "+myid+" send pack is null: "+e.toString(),0 ) ;
}
catch(SocketException e) {
PublicConstants.writeLog.info("Cmpp "+myid+" send disconnected SocketException: "+e.toString(),0);
disconnect();
// if (connect())
// Login();
}
catch (IOException e) {
PublicConstants.writeLog.info("Cmpp "+myid+" send IOException: "+e.toString(),0);
disconnect();
}
catch (Exception e) {
PublicConstants.writeLog.info("Cmpp "+myid+" send disconnected Exception: "+e.toString(),0);
disconnect();
}
}
/**
* 在本连接上发送消息
* 若连接异常,则线程退出,并更新路由表
* @param bb 字节流 (用于重发消息)
*/
public void send(byte[] bb) {
try {
if ( bb == null ) {
PublicConstants.writeLog.info("Cmpp reSend pack is null .",0);
return;
}
if ( (bb.length >ComConstants.Cmpp_MAX_MSG_LEN) || (bb.length <ComConstants.Cmpp_MIN_MSG_LEN) ) {
PublicConstants.writeLog.info("Cmpp reSend pack length error : "+bb.length,0 );
return;
}
//System.out.println( "Cmpp reSend a pack to "+ myid+" . sendlen :"+ bb.length );
/*if( Constants.CmppPACK_DEBUG )
PublicConstants.writeLog.packDebug("Cmpp RESEND \t",bb);*/
synchronized ( _lock ){
myoutputstream.write( bb );
myoutputstream.flush() ;
}
} catch(NullPointerException e) // bb is null
{
PublicConstants.writeLog.info ("Cmpp "+myid+" reSend pack is null: "+e.getMessage(),0 ) ;
}
catch(SocketException e) {
//System.out.println ("Cmpp "+myid+" reSend disconnected: "+e.getMessage() ) ;
PublicConstants.writeLog.info("Cmpp "+myid+" reSend disconnected: "+e.getMessage(),0);
disconnect();
// myDelFlag = true;
// (CmppConnectionManager.instance()).delConncection(myid);
// myShutdownFlag = true;
}
catch (IOException e) {
PublicConstants.writeLog.info ("Cmpp "+myid+" reSend disconnected: "+e.getMessage(),0 ) ;
disconnect();
// myDelFlag = true;
// (CmppConnectionManager.instance()).delConncection(myid);
// myShutdownFlag = true;
}
}
/**
* 接收数据线程<br>
* 接收阻塞,收取完整数据包后调用处理数据方法<br>
*/
public void run() {
mylastactivetime=System.currentTimeMillis();
while (!myShutdownFlag) {
if(mystate != ComConstants.NOTCONNECTED ){
try {
byte[] buf = recv();
//System.out.println("Cmpp received a pack len: "+ (buf.length+4));
if ( buf.length == 1 ) {
if( buf[0]==ComConstants.ERROR_INTERRUPT ) {
// //System.out.println("Cmpp "+myid+ " sock int .!!!!!!!!!!!!!!1 ");
//
// if ( ( System.currentTimeMillis()-mylastactivetime
// > 10*1000)
// && (mystate.equals(ComConstants.WORKING) ) ) {
// shutDown();
// PublicConstants.writeLog.info("Conn Thread Stop OK........");
// continue;
// } else { //timeout disconnect
// //disconnect();
// //myDelFlag = true;
// // (CmppConnectionManager.instance()).delConncection(myid);
// //PublicConstants.writeLog.error("Cmpp "+myid+ " sock timeout . Disconnected . ");
// continue ;
// //return;
// }
PublicConstants.writeLog.info("Cmpp ERROR_INTERRUPT"+myid+ " sock error .",2);
disconnect();
continue ;
} else if( buf[0]==ComConstants.ERROR_SOCKET ) {
disconnect();
//myDelFlag = true;
// (CmppConnectionManager.instance()).delConncection(myid);
System.out.println("Cmpp ERROR_SOCKET"+myid+ " sock error . Disconnected .") ;
PublicConstants.writeLog.error("Cmpp ERROR_SOCKET"+myid+ " sock error . Disconnected .");
continue ;
//return;
} else
continue; // for future
}
//PublicFuction.threadSleep(200);
mylastactivetime = System.currentTimeMillis() ;
handle(buf);
} catch (Exception ee) {
PublicConstants.writeLog .error("Cmpp "+myid+ " handle error: "+ee.toString() );
}
}else{
PublicFuction.threadSleep(2000) ;
}
} // end while
}
/**
* 在本连接上接收数据
* 根据SMPP报文的长度字段从输入数据流中读取相应的报文
* @return
* receivedData <br>
* receivedData[0]=ERROR_SOCKET SOCKET出错<br>
* receivedData[0]=ERROR_INTERRUPT SOCKET超时中断<br>
*/
private byte[] recv()//接受的数据不包含总共长度
{
byte[] receivedData;
try {
int dataSize = dataInStream.readInt();
if( (dataSize<20)||(dataSize>ComConstants.Cmpp_MAX_MSG_LEN) ) {
receivedData = new byte[1];
receivedData[0] = ComConstants.ERROR_SOCKET ;
return receivedData;
}
receivedData = new byte[dataSize - 4]; //注意:recievedata 不包含消息包长度这四个字节
int dataTotalSizeToRead = dataSize - 4;
int dataToReadLeft = dataSize - 4;
int dataThisTimeRead;
while(dataToReadLeft > 0) {
dataThisTimeRead = dataInStream.read(receivedData,dataTotalSizeToRead - dataToReadLeft,dataToReadLeft);
dataToReadLeft -= dataThisTimeRead;
}
} catch(NullPointerException e) {
PublicConstants.writeLog.error("Cmpp recv connection null: "+e.toString() ) ;
receivedData = new byte[1];
receivedData[0] = ComConstants.ERROR_SOCKET ;
}
catch(InterruptedIOException e) {
receivedData = new byte[1];
receivedData[0] = ComConstants.ERROR_INTERRUPT ;
}
catch(EOFException e) {
receivedData = new byte[1];
receivedData[0] = ComConstants.ERROR_INTERRUPT ;
}
catch(Exception e) {
PublicConstants.writeLog.error("Cmpp recv socket: "+e.toString() ) ;
System.out.println("Cmpp recv socket: "+e.toString() ) ;
receivedData = new byte[1];
receivedData[0] = ComConstants.ERROR_SOCKET ;
}
return receivedData;
}
/**
* 处理连接上收取的数据<br>
* 解包<br>
* 将下行消息写入数据缓冲,使用两个缓冲,交替使用<br>
* @param rcvdata 未解包数据流
*/
public void handle(byte[] rcvdata) {
int unPackCode = CmppConstants.error;
int ret;
boolean sendToSmsc = true;
if( PublicConstants.PackDEBUGLevel > 0 )//输出报文十六进制代码
PublicConstants.writeLog.packDebug("Cmpp RECV from "+myid+"(bindType:"+myBindType+" len:"+(rcvdata.length+4)+")",rcvdata);
/*if( Constants.CmppPACK_DEBUG ){
System.out.println(); `
System.out.println("Cmpp receive packet:");
for(int i=0 ; i<rcvdata.length ; i++ )
System.out.print("["+rcvdata[i]+"] ");
System.out.println();
}*/
ShortMsgCmpp cmpp_sm = new ShortMsgCmpp();
cmpp_sm.parsePackage( rcvdata );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -