📄 cmppconnectionmanager.java
字号:
package com.wayout.wayoutsp.communication;
import java.util.*;
import java.net.*;
import java.io.*;
import com.wayout.wayoutsp.publics.*;
import com.wayout.wayoutsp.communication.IsmgInfo;
/**
* <p>Title: 公司运营项目</p>
* <p>Description: 实现短信SP之功能.短信互动系统</p>
* <p>Copyright: Copyright (c) 2002</p>
* <p>Company: wayout</p>
* @author wayout Software
* @version 1.01
*/
public class CmppConnectionManager extends Thread{
/**
* 构造Cmpp连接管理器.
* <p><b>处理流程.</b></p>
* 1 为消息路由创建hash表 <br>
* 2 启动写数据库线程 <br>
* 3 与短信网关建立连接 <br>
* 4 启动消息重发线程 <br>
* @param name Cmpp管理器名称
* @param size hash表大小
*/
public CmppConnectionManager(String name, int size) {
System.out.println();
System.out.println("Create CmppConnectionManager ...");
PublicConstants.writeLog.info("Create CmppConnectionManager ...",0);
myname = name;
mysize = size > 0 ? size : 2;
myconnection = new Hashtable (size);
cmppConnMangerinit();
startSaveDataThread();
connectIsmg();
setupCmppServer(ComConstants.o_IsmgInfo.ismgMoPort);
//startReSendThread();
startMaintanceThread();
}
public boolean setupCmppServer(int port)
{
try
{
mysvrsock = new ServerSocket( port );
svrCmppThread = new Thread(super, "MPCPServerThread");
svrCmppThread.start() ;
PublicConstants.writeLog.info("Cmpp create server OK ! + port"+port,0);
return true;
}
catch (Exception e)
{
PublicConstants.writeLog.error("Cmpp setupMPCPServer Exception" + e);
return false;
}
}
public void run()
{
int num =1;
try
{
int deliverNum=0;
while(! myShutdownFlag)
{
// Thread.currentThread().sleep(20);
Socket socket = mysvrsock.accept();
ChildThreadDeliver thread1=new ChildThreadDeliver(socket);
thread1.start();
// try
// {
// connectionDeliever.setupSocket(socket);
// deliverNum++;
// byte[] deliver = connectionDeliever.recv();
// System.out.println("deliver...."+deliver.length);
// while(deliver.length >1){
//connectionDeliever.handle(deliver);
// System.out.println("deliver_rep....."+deliverNum+"time:"+System.currentTimeMillis());
// deliver = connectionDeliever.recv();
// deliverNum++;
// System.out.println("deliver...."+deliver.length +"deliverNum"+deliverNum);
// }
// System.out.println("logout_rep...");
// } catch(Exception e1){
// PublicConstants.writeLog.info("CMPPConnectionManager run() Exception!",0);
// socket.close() ;
// }
}
mysvrsock.close() ; //out of while
}catch(Exception e2)
{
try
{
mysvrsock.close();
}catch(Exception e3){};
PublicConstants.writeLog.error("setupCmppServer.run " + e2.toString());
}
}
public void startServerAcceptThread()
{
int num =1;
try
{
while(! myShutdownFlag)
{
Socket socket = mysvrsock.accept();
try
{
connectionDeliever.setupSocket(socket);
byte[] login = connectionDeliever.recv();
System.out.println("login......");
connectionDeliever.handle(login);
// System.out.println("login_rep.....");
byte[] deliver = connectionDeliever.recv();
// System.out.println("deliver....");
connectionDeliever.handle(deliver);
System.out.println("deliver_rep.....");
byte[] logout = connectionDeliever.recv();
System.out.println("logout...");
// connectionDeliever.handle(logout);
// System.out.println("logout_rep...");
} catch(Exception e1){
socket.close() ;
}
}
mysvrsock.close() ; //out of while
}catch(Exception e2)
{
try
{
mysvrsock.close();
}catch(Exception e3){};
PublicConstants.writeLog.error("setupCmppServer.run " + e2.toString());
}
}
private void connectIsmg()
{
// if the connection type is SEND(0) ,then only recv MT from MPCA
// if the connection type is RECV(1) ,then only send MO from MPCA
if( ComConstants.o_IsmgInfo.ismgStatus==ComConstants.VALID ) {
for( int i =0; i< ComConstants.IsmgRecvConNum; i++ ) {
//System.out.println("CmppConnectionManager create MT conn .");
connectCmppSubmit(ComConstants.o_IsmgInfo.ismgName , ComConstants.o_IsmgInfo.ismgId, ComConstants.o_IsmgInfo.ismgIp,
ComConstants.o_IsmgInfo.ismgMtPort, ComConstants.SEND_ONLY , 10+i); //flag =1 接收型连接
PublicFuction.threadSleep(20) ;
}
connectionDeliever=new ConnectionServer();
// connectionDeliever.handleMsgThread();
// for( int i =0; i< ComConstants.IsmgSendConNum; i++ ) {
// //System.out.println("CmppConnectionManager create MO conn .");
// connectCmpp(ComConstants.o_IsmgInfo.Name , ComConstants.o_IsmgInfo.Id, ComConstants.o_IsmgInfo.Ip,
// ComConstants.o_IsmgInfo.MO_SvrPort, ComConstants.RECV_ONLY , i ); //flag =0 发送型连接
// PublicFuction.threadSleep(200) ;
// }
}
}
/**
* 获取Cmpp连接管理器的静态实例.
* @return CmppConnectionManager
*/
public static synchronized CmppConnectionManager instance() {
if(mbcpCM == null)
mbcpCM = new CmppConnectionManager("TW CmppConnectionManager",8);
return mbcpCM;
}
/**
* 以客户端身份连接ISMG.
* @param name 服务器名称
* @param mbe_id 服务器ID号
* @param ip 服务器IP
* @param port 服务器服务端口
* @param bindtype 连接绑定类型
* @param iii
* @return
* false: 建立连接失败
* true : 建立连接成功
*/
public boolean connectCmppSubmit(String name, String mbe_id, String ip, int port, byte bindtype, int iii) {
int ret ;
try {
//构造连接对象
//if ( connectionSubmit== null )
connectionSubmit = new CmppConnection(name, mbe_id, ip, port, bindtype, iii);
connectionSubmit.start() ;
if( connectionSubmit.connect() )
connectionSubmit.Login();
//启动连接上的发送数据线程
//sendThread = new Thread(connectionSubmit);
//sendThread.start();
return true;
} catch (NullPointerException e) {
PublicConstants.writeLog.error("CmppConnectionManager.myconnection.put " + name + e );
return false;
}
catch (Exception e) {
PublicConstants.writeLog.error("Cmpp connectCmpp " + mbe_id +" " + e);
return false;
}
}
/**
* 以客户端身份连接ISMG.
* @param name 服务器名称
* @param mbe_id 服务器ID号
* @param ip 服务器IP
* @param port 服务器服务端口
* @param bindtype 连接绑定类型
* @param iii
* @return
* false: 建立连接失败
* true : 建立连接成功
*/
// public boolean connectCmpp(String name, String mbe_id, String ip, int port, byte bindtype, int iii) {
// int ret ;
// try {
// //构造连接对象
// //if ( connectionDeliever== null )
// connectionDeliever = new CmppConnection(name, mbe_id, ip, port, bindtype, iii);
// connectionDeliever.start() ;
// if( connectionDeliever.connect() )
// connectionDeliever.Login();
// //启动连接上的接受数据线程
// //recvThread = new Thread(connectionDeliever);
// //recvThread.start();
// return true;
// } catch (NullPointerException e) {
// PublicConstants.writeLog.error("CmppConnectionManager.myconnection.put " + name + e );
// return false;
// }
// catch (Exception e) {
// PublicConstants.writeLog.error("Cmpp connectCmpp " + mbe_id +" " + e);
// return false;
// }
// }
private static void cmppConnMangerinit(){
try {
ComConstants.o_IsmgInfo = DatabaseAccess.instance().getIsmgPara(PublicConstants.SectionNum,PublicConstants.DealerType);
//ComConstants.IsmgSectionFlag = //DatabaseAccess.instance().getIsmgSectionFlag(PublicConstants.SectionNum,PublicConstants.DealerType);
PublicConstants.writeLog.info("ComConstants.o_IsmgInfo.Name\t"+ComConstants.o_IsmgInfo.ismgName,1);
PublicConstants.writeLog.info("ComConstants.o_IsmgInfo.Ip\t"+ComConstants.o_IsmgInfo.ismgIp,1);
PublicConstants.writeLog.info("ComConstants.o_IsmgInfo.status\t"+ComConstants.o_IsmgInfo.ismgStatus,1);
PublicConstants.writeLog.info("ComConstants.o_IsmgInfo.MO_SvrPort\t"+ComConstants.o_IsmgInfo.ismgMoPort,1);
PublicConstants.writeLog.info("ComConstants.o_IsmgInfo.MT_SvrPort\t"+ComConstants.o_IsmgInfo.ismgMtPort,1);
PublicConstants.writeLog.info("ComConstants.CmppRESEND_COUNT\t"+ComConstants.CmppRESEND_COUNT,1);
PublicConstants.writeLog.info("ComConstants.CmppRESEND_INTERVAL\t"+ComConstants.CmppRESEND_INTERVAL,1);
PublicConstants.writeLog.info("ComConstants.CmppRESEND_LIVETIME\t"+ComConstants.CmppRESEND_LIVETIME,1);
loadCfgFileForCom();
} catch (Exception rce) {
PublicConstants.writeLog.info("cmppConnMangerinit: "+rce.getMessage(),0);
}
}
/**
* 从文件读取所需参数
* @return
* true :成功
* false:失败
*/
private static boolean loadCfgFileForCom() {
System.out.println("Reading Cfg from file for Com...");
try {
InputStream is = new FileInputStream(PublicConstants.ConfigFile) ;
Properties pt = new Properties() ;
pt.load(is) ;
ComConstants.wayoutsplastversion=
java.lang.Byte.parseByte(pt.getProperty("comm.LastVersion","12")) ;
ComConstants.CmppVECTOR_NUM=
Integer.parseInt( pt.getProperty("comm.MSG_VECTOR_NUM\t","10000"));
System.out.println("CmppMSGVECTOR_NUM : \t" + ComConstants.CmppVECTOR_NUM);
ComConstants.CmppRCVSOCK_TIMEOUT =
Integer.parseInt( pt.getProperty("comm.CmppHEARTBEAT_INTERVAL","30000") );
System.out.println("CmppHEARTBEAT_INTERVAL : \t" + ComConstants.CmppRCVSOCK_TIMEOUT);
ComConstants.CmppRESEND_SLEEPTIME =
Integer.parseInt( pt.getProperty("comm.CmppRESEND_SLEEPTIME","10000") );
System.out.println("Comm.CmppRESEND_SLEEPTIME : \t" + ComConstants.CmppRESEND_SLEEPTIME);
ComConstants.CmppMAINTANCE_SLEEPTIME =
Integer.parseInt( pt.getProperty("Comm.CmppMAINTANCE_SLEEPTIME","10000") );
System.out.println("Comm.CmppMAINTANCE_SLEEPTIME : \t" + ComConstants.CmppMAINTANCE_SLEEPTIME);
ComConstants.CmppSAVEDATA_SLEEPTIME =
Integer.parseInt( pt.getProperty("Comm.CmppSAVEDATA_SLEEPTIME","500"));
System.out.println("Comm.CmppSAVEDATA_SLEEPTIME : \t" + ComConstants.CmppSAVEDATA_SLEEPTIME);
is.close() ;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -