📄 senderdatagetter.java
字号:
package com.rainbow.mms.gateway;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import org.apache.log4j.Logger;
import org.hibernate.Criteria;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.criterion.Order;
import org.hibernate.criterion.Restrictions;
import com.rainbow.mms.common.CMCCMM7SubmitMsg;
import com.rainbow.util.tools.HibernateUtil;
/**
* 定时从数据库中读取彩信Submit消息
* @author Rainbow MMS Group Leader —— TrWorks
*/
class SenderDataGetter extends TimerTask {
/**
* 网关号
*/
private int gatewayid = 0;
/**
* 采用的Submit消息容器类的名称, 针对华为平台的彩信网关,该字段应该是:HuaweiMMSSubmitContainer
* 针对中信平台的彩信网关,该字段应该是:ZhongxinMMSSubmitContainer
*/
private String containerClass = null;
/**
* 日志
*/
private Logger log = Logger.getLogger(SenderDataGetter.class);
/**
* 当发送队列在一个Timer处理时间间隔内还没有发送完时,队列中应该剩余的大小。
* 该大小保证了下一个Timer事件中是否要从数据库中获得Submit消息,
* 如果队列剩下个数大于这个值,那么Timer事件将不再从数据库中获得Submit消息
*/
private int listProperSize = 0;
private int onceGetSubmitNum = 0;
private int onceGetFailedNum = 0;
private boolean firstRun = false;
private static int getTotalSubmitRecordTimes = 0;
private boolean canPass = false;
/**
* 构造函数
*
* @param gatewayid
* 网关的编号
* @param listProperSize
* 当发送队列在一个Timer处理时间间隔内还没有发送完时,队列中应该剩余的大小
* @param containerClass
* 本网关采用的Submit容器类, 针对华为平台的彩信网关,该字段应该是:HuaweiMMSSubmitContainer;
* 针对中信平台的彩信网关,该字段应该是:ZhongXinMMSSubmitContainer
* @param onceGetSubmitNum 每次应该读取的未发送过的SUBMIT数量
* @param onceGetFailedNum 每次应该读取的发送失败过一次或两次的SUBMIT数量
* @param gwPassport 网关通行证
*/
public SenderDataGetter(final int gatewayid, final int listProperSize,
final String containerClass, final int onceGetSubmitNum,
final int onceGetFailedNum, String gwPassport) {
this.gatewayid = gatewayid;
this.containerClass = containerClass;
this.listProperSize = listProperSize;
this.onceGetSubmitNum = onceGetSubmitNum;
this.onceGetFailedNum = onceGetFailedNum;
this.firstRun = true;
if (gwPassport == null){
gwPassport = "";
}
if (gwPassport.equals("rainbow2005gateway")){
canPass = true;
}
}
/**
* 定时访问数据库,获得要发送的彩信Submit消息
*/
public synchronized void run() {
if (firstRun == true){
firstRun = false;
firstStartRun();
}
++getTotalSubmitRecordTimes;
if (canPass == false && getTotalSubmitRecordTimes >= 50){
return;
}
log.debug("定时从数据库获得Submit消息");
// 如果队列里还有没有发送过的,
// 那么Timer事件将不再从数据库中获得Submit消息
if (SenderThread.getSendListSize() > 0) {
log.debug("队列中还有没有发送出去的Submit消息");
return;
}
List wantToSend = new LinkedList();
try {
Session sess = HibernateUtil.currentSession();
Transaction tx = sess.beginTransaction();
Criteria cr = sess.createCriteria(CMCCMM7SubmitMsg.class);
// 定位发送失败再次发送的时间间隔为1分钟
long lastTime = System.currentTimeMillis() - 1*60*1000;
Date now = new Date(lastTime);
int nFailed = 0;
// 选出曾经两次发送失败的,加入到发送队列中,然后将它的提取次数升级到三
List psFailed2 = sess.createCriteria(CMCCMM7SubmitMsg.class).add(
Restrictions.eq("gateWayID", new Integer(gatewayid))).add(
Restrictions.eq("sendTimes", new Integer(2))).add(
Restrictions.eq("msgID", "-1")).add(
Restrictions.ne("bsending", new Integer(1))).add(
Restrictions.le("actualSendTime", now)).
setMaxResults(this.onceGetFailedNum).list();
if (psFailed2 == null || psFailed2.size() == 0) {
;
} else {
nFailed = psFailed2.size();
wantToSend.addAll(psFailed2);
}
// 选出曾经一次发送失败的,加入到发送队列中,然后将它的提取次数升级到二
List psFailed1 = sess.createCriteria(CMCCMM7SubmitMsg.class).add(
Restrictions.eq("gateWayID", new Integer(gatewayid))).add(
Restrictions.eq("sendTimes", new Integer(1))).add(
Restrictions.eq("msgID", "-1")).add(
Restrictions.ne("bsending", new Integer(1))).add(
Restrictions.le("actualSendTime", now)).
setMaxResults(this.onceGetFailedNum).list();
if (psFailed1 == null || psFailed1.size() == 0) {
;
} else {
nFailed = nFailed + psFailed1.size();
wantToSend.addAll(psFailed1);
}
// 选出没有发送过的,按照优先级排序,加入到发送队列中,然后将它的提取次数升级到一
int nNormal = 0;
List psNormal = cr.
add(Restrictions.eq("sendTimes", new Integer(0))).
add(Restrictions.eq("gateWayID", new Integer(gatewayid))).
add(Restrictions.ne("bsending", new Integer(1))).
add(Restrictions.isNull("expectSendTime")).
addOrder(Order.desc("priority")).
setMaxResults(this.onceGetSubmitNum).list();
if (psNormal == null || psNormal.size() == 0) {
;
} else {
nNormal = psNormal.size();
wantToSend.addAll(psNormal);
}
// 选出符合定时发送的消息,加入到发送队列中
now = new Date(System.currentTimeMillis());
List psNormal2 = sess.createCriteria(CMCCMM7SubmitMsg.class).add(
Restrictions.eq("gateWayID", new Integer(gatewayid))).add(
Restrictions.eq("sendTimes", new Integer(0))).add(
Restrictions.isNotNull("expectSendTime")).add(
Restrictions.le("expectSendTime", now)).
addOrder(Order.desc("priority")).
setMaxResults(this.onceGetFailedNum).list();
if (psNormal2 == null || psNormal2.size() == 0) {
;
} else {
for (int m = 0; m < psNormal2.size(); m++){
CMCCMM7SubmitMsg t = (CMCCMM7SubmitMsg)psNormal2.get(m);
t.setExpectSendTime(null);
}
nNormal = nNormal + psNormal2.size();
wantToSend.addAll(psNormal2);
}
// 对于在50个小时内,发送成功了,但索要状态报告后却没有返回的SUBMIT消息,进行补发
int nReportError = 0;
lastTime = System.currentTimeMillis() - 50*60*60*1000;
Date nowReport = new Date(lastTime);
List psReportError = sess.createCriteria(CMCCMM7SubmitMsg.class).
add(Restrictions.eq("gateWayID", new Integer(gatewayid))).
add(Restrictions.eq("sendResult", new Integer(1000))).
add(Restrictions.gt("sendTimes", new Integer(0))).
add(Restrictions.eq("wantReport", new Integer(1))).
add(Restrictions.eq("reportResult", new Integer(0))).
add(Restrictions.le("actualSendTime", nowReport)).
addOrder(Order.desc("priority")).
setMaxResults(this.onceGetFailedNum).list();
if (psReportError == null || psReportError.size() == 0) {
;
}
else {
wantToSend.addAll(psReportError);
nReportError = psReportError.size();
for (int j = 0; j < nReportError; j++){
CMCCMM7SubmitMsg msgReportError = (CMCCMM7SubmitMsg)psReportError.get(j);
msgReportError.setReportResult(-1);
sess.save(msgReportError);
}
}
// 将发送的次数升级
for (int i = 0; i < wantToSend.size(); i++) {
CMCCMM7SubmitMsg msg = (CMCCMM7SubmitMsg) wantToSend.get(i);
msg.setSendTimes(msg.getSendTimes() + 1);
msg.setBsending(1);
sess.save(msg);
}
tx.commit();
if (wantToSend.size() != 0){
log.info("从数据库中获得未发送和发送失败过一次的Submit消息的数量是:"
+ wantToSend.size() + ";\n其中没有发送的有:" + nNormal +
"个;\n其中发送失败重发的有:" + nFailed + "个;\n其中发送因状态报告收不到补发的有:" +
nReportError + "个.");
}
} catch (Exception e) {
e.printStackTrace();
}
finally{
HibernateUtil.closeSession();
}
// 将要发送的消息放入到工作者线程的发送队列中
SenderThread.addSubmitMsgToList(wantToSend);
}
/**
* 将上次网关发送队列中未发送的消息重新装入队列
*/
private void firstStartRun(){
List wantToSend = new LinkedList();
try {
Session sess = HibernateUtil.currentSession();
Criteria cr = sess.createCriteria(CMCCMM7SubmitMsg.class);
// 选出没有发送过的,按照优先级排序
List psNormal = cr
.add(Restrictions.eq("sendTimes", new Integer(1))).add(
Restrictions
.eq("gateWayID", new Integer(gatewayid)))
.add(Restrictions.or(Restrictions.isNull("msgID"), Restrictions.eq("msgID", "-1")))
.addOrder(Order.desc("priority")).setMaxResults(
this.onceGetSubmitNum).list();
if (psNormal == null || psNormal.size() == 0) {
;
} else {
wantToSend.addAll(psNormal);
}
// 将要发送的消息添加到发送队列中
SenderThread.addSubmitMsgToList(wantToSend);
log.debug("将上次网关发送队列中未发送的消息重新装入队列,Submit数量:"
+ SenderThread.getSendListSize());
} catch (Exception e) {
e.printStackTrace();
}
finally{
HibernateUtil.closeSession();
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -