📄 cmpplinker.java
字号:
package com.ekun.common.sms;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Copyright: Copyright (c) 2006</p>
*
* <p>Company: </p>
*
* @author ekun
* @version 1.0
*/
import java.io.*;
import com.ekun.common.log.*;
import com.ekun.sms.cmpp2.*;
import com.ekun.sms.common.*;
import java.util.ArrayList;
import java.util.Properties;
public class CmppLinker
{
private static Logger logger = LoggerFactory.getLogger(CmppLinker.class);
//是否返回流量控制错(submitResponse)
private String iniFileName = ""; //配置文件名
private static int SUBMIT_RETRY_TIMES = 2; //submit重试次数
private CmppApis cm;
private CmppMsgHandler processor;
private SlipWindowCtrl slipCtrl;
private int socketTimeOut = 10 * 1000; //socket超时单位:毫秒!
//用于限定每秒发送多少条短信!
private static int SubmitPreSecond = 150;
private RecvTrd recvTrd = null;
private ReSendTrd reSendTrd = null;
private boolean linkerTerminal = false;
private boolean isTooFaster = false;
private static CmppLinker instance = null;
private Object sockSendLock = new Object();
private Object reConnLock = new Object();
private boolean isWaitForConn = false;
public String sendPort;
/**
* 内部类,实现数据的收取,同时维护Socket连接!
*/
private class RecvTrd
extends Thread
{
long oldTime = System.currentTimeMillis();
private int sendActive(int spaceSecond)
{
int nRet = 0;
if ( (System.currentTimeMillis() - oldTime) > spaceSecond)
{
nRet = sendOtherPacket(new RecCActiveTest());
oldTime = System.currentTimeMillis();
}
return nRet;
}
public void run()
{
int nRet = -1;
RecCCmppPacket packet = new RecCCmppPacket();
while (!linkerTerminal)
{
try
{
nRet = cm.nCMPP_RecvPacket(packet);
if (nRet >= 0)
parseRcvPacket(packet, nRet);
else if (0 != sendActive( -1))
{
reOpen();
}
//如果发送线程还在等待,则通知他!
if (isWaitForConn)
{
synchronized (reConnLock)
{
reConnLock.notifyAll();
}
}
}
catch(Exception ex)
{
logger.error(ex);
}
}
logger.error("严重错误:Exit CMPP Recv Thread!!!");
System.out.println("Exit CMPP Recv Thread!!!");
}
}
private class ReSendTrd
extends Thread
{
public void run()
{
while (!linkerTerminal)
{
//获取所有需要重发的数据
ArrayList pends = slipCtrl.getPending();
if (pends.size() <= 0)
{
try
{
sleep(1000);
}
catch (InterruptedException ie)
{}
}
java.util.Iterator it = pends.iterator();
while (it.hasNext())
{
SmsInfo smsInfo = (SmsInfo) it.next();
//判断是否无效请求!
if ( (smsInfo.SeqID != -999) || (smsInfo.DBMsgTo.trim().length() != 0))
{
logger.info("CMPP重发阻塞数据:" + smsInfo);
sendSubmit(smsInfo);
}
else
{
logger.info("CMPP重发阻塞数据,未匹配的Response:" + smsInfo);
}
}
}
}
}
/**
* 默认构造函数
*/
private CmppLinker()
{
processor = null;
cm = new CmppApis();
linkerTerminal = false;
slipCtrl = new SlipWindowCtrl();
}
public static CmppLinker getInstance()
{
synchronized (CmppLinker.class)
{
if (instance == null)
{
instance = new CmppLinker();
}
}
return instance;
}
/**
* 注册消息处理器,目前先处理一个,以后可添加多个!
* @param processor:消息处理器
*/
public void setProcessor(CmppMsgHandler processor)
{
this.processor = processor;
}
/**
* 打开连接,开始服务
*/
public void reOpen()
{
logger.debug("CMPPLinker begin reOpen...");
connectToISMG();
slipCtrl.setSlipWindowSize(cm.sendSpeed);
System.out.println("连接ISMG成功!");
logger.debug("Connect ISMG successed!");
if (recvTrd == null)
{
recvTrd = new RecvTrd();
recvTrd.setName("CMPPLinker.RecvTrd");
recvTrd.start();
}
if (reSendTrd == null)
{
reSendTrd = new ReSendTrd();
reSendTrd.setName("CMPPLinker.ReSendTrd");
reSendTrd.start();
}
synchronized (reConnLock)
{
reConnLock.notifyAll();
}
logger.debug("CMPPLinker reOpen Complete!");
}
/**
* 执行清除动作,关闭线程及对象!
*/
public void cleanup()
{
linkerTerminal = true;
cm.nCMPP_Terminate();
}
/**
* 解析ISMG下发的数据包
* @param packet
*/
private void parseRcvPacket(RecCCmppPacket packet, int nRet)
{
if (processor == null)
return;
try
{
switch (packet.nCmdId)
{
case CmppPacket.CMPP_Deliver:
if ( (nRet == 0) && (processor != null))
{
RecCDeliverRep recDeliverRep = new RecCDeliverRep();
recDeliverRep.nSeqId = packet.recDeliver.nSeqId;
recDeliverRep.usMsgId = new byte[8];
System.arraycopy(packet.recDeliver.usMsgId, 0,
recDeliverRep.usMsgId, 0, 8);
recDeliverRep.ucResult = 0;
sendOtherPacket(recDeliverRep);
processor.onCmppMO(packet.recDeliver);
}
else if ( (nRet == 1) && (processor != null))
{
processor.onRcvReceipt(packet.recReceipt);
}
break;
case CmppPacket.CMPP_Submit_REP:
//1、得到response的SeqID,用以匹配等待对象!
String key = String.valueOf(nRet);
//2、从滑动窗口中获得等待对象
SmsInfo respSmsInfo = new SmsInfo();
respSmsInfo.SMSMsgID = BuffMemoUtil.getMsgId(packet.recSubmitRep.
usMsgId);
respSmsInfo.SendResult = packet.recSubmitRep.ucResult;
SmsInfo smsInfo = (SmsInfo) slipCtrl.popPending(key, respSmsInfo);
if ( (processor != null) && (smsInfo != null))
{
//3、赋SMSID,以便后续状态报告时匹配之用!
smsInfo.SMSMsgID = respSmsInfo.SMSMsgID;
smsInfo.SendResult = respSmsInfo.SendResult;
//4、触发onMT消息
processor.onCmppMT(smsInfo);
}
if (packet.recSubmitRep.ucResult == 8)
{
logger.info("ISMG 返回流量控制错误!");
isTooFaster = true;
}
break;
case CmppPacket.CMPP_Active_Test:
RecCActiveTestRep precActiveRep = new RecCActiveTestRep();
precActiveRep.nSeqId = packet.recActiveTest.nSeqId;
precActiveRep.ucSuccId = 0;
synchronized (sockSendLock)
{
cm.nCMPP_SendActiveRep(precActiveRep);
}
break;
case CmppPacket.CMPP_Terminate:
RecCTerminateRep precPkg = new RecCTerminateRep();
precPkg.nSeqId = packet.recTerminate.nSeqId;
synchronized (sockSendLock)
{
cm.nCMPP_ReplyTerminate(precPkg);
}
break;
default:
;
}
}
catch (Exception e)
{
logger.error("doRcvPacket Error!", e);
}
}
/**
* 读取配置参数
* @return boolean
*/
private boolean loadConfig()
{
Properties prop = new Properties();
if ( (iniFileName == null) || (iniFileName.equals("")))
{
logger.error("找不到CMPP API 配置文件!");
return false;
}
try
{
prop.load(new FileInputStream(iniFileName));
}
catch (IOException e)
{
logger.error("CMPP API 配置文件打开出错!");
return false;
}
cm.SPID = prop.getProperty("SP_ID", "").trim();
cm.SPPassword = prop.getProperty("SP_Pwd", "").trim();
cm.CorpID = prop.getProperty("CorpID", "").trim();
cm.hostIp = prop.getProperty("HostAddr", "").trim();
sendPort = prop.getProperty("SendPort").trim();
String sPort = prop.getProperty("HostPort", "").trim();
String sSendSpeed = prop.getProperty("SendSpeed", "60").trim();
if (cm.SPID.equals("") || cm.SPPassword.equals("") || cm.CorpID.equals(""))
{
logger.error("CMPP API 配置文件读取错误: SP_ID/SP_Pwd/CorpID参数读取失败!");
return false;
}
try
{
cm.port = Integer.parseInt(sPort.trim());
cm.sendSpeed = Integer.parseInt(sSendSpeed.trim());
cm.hostIp = cm.hostIp.trim();
}
catch (NumberFormatException e)
{
logger.error("CMPP API 配置文件读取错误: HostAddr/HostPort/SendSpeed参数读取失败!");
return false;
}
String smaxMsgAscii = prop.getProperty("MaxMsgLenAscii", "");
String smaxMsgOther = prop.getProperty("MaxMsgLenOther", "");
try
{
cm.maxMsgLenAscii = Integer.parseInt(smaxMsgAscii.trim());
cm.maxMsgLenOther = Integer.parseInt(smaxMsgOther.trim());
if (cm.maxMsgLenAscii > 160)
cm.maxMsgLenAscii = 159;
if (cm.maxMsgLenOther > 140)
cm.maxMsgLenOther = 140;
}
catch (NumberFormatException e)
{
cm.maxMsgLenAscii = 159;
cm.maxMsgLenOther = 140;
logger.error("CMPP API 配置文件读取错误: MaxMsgLenAscii/MaxMsgLenOther参数读取失败!");
return false;
}
logger.debug("ConnInfo:\nSPID:\t" + cm.SPID +
"\nCorpID:\t" + cm.CorpID +
"\nSPPWD:\t" + cm.SPPassword +
"\nHOSTIP:\t" + cm.hostIp +
"\nPORT:\t" + cm.port +
"\nSendSpeed:\t" + cm.sendSpeed);
return true;
}
/**
* 发送CMPP_CONNECT请求
* @return:连接情况
*/
private void connectToISMG()
{
/*每次读取配置文件,保证动态加载*/
while (!linkerTerminal)
{
disconnectToISMG();
if (!loadConfig())
{
try
{
Thread.currentThread().sleep(1000);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -