📄 mmsendworker.java
字号:
/**
* <pre>
* 这里说明修改记录,包括修改人,修改时间和修改目的和内容
* --修改时间--修改人---------修改目的和内容-----------
* 2006/2/22 刘华锋 从数据库获取彩信有SQLException的时候,重新连接数据库
*
* </pre>
*/
package com.sxit.mms.sender;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.log4j.Logger;
import com.cmcc.mm7.vasp.common.MMContent;
import com.cmcc.mm7.vasp.message.MM7SubmitReq;
import com.sxit.mms.MMSQueue;
/**
* 从mmsqueue表里面取出MMSQueue对象,然后根据要求组合成MMessaage,存储在发送队列里面,由发送线程来发送
*
* @author HuaFeng
* @version 1.0 (2006-1-13 13:57:33)
*
*/
public class MMSendWorker extends Thread {
private final static Logger log = Logger.getLogger(MMSendWorker.class);
/**
* 数据库连接。自己建立
*/
private Connection con;
/**
* 存储要发送的mmessage,发送线程同步此队列,从里面取出彩信对象,然后发送出去
*/
private LinkedList sendqueue;
/**
* 一次性最多从缓冲表里面取出的MMSQueue对象个数,从主线程里面获取
*/
private int count;
/**
* 如果这次没有取出来,则下次再取的时间间隔多少,以毫秒为单位,从主线程里面获取此参数
*/
private int intervalGetWaitingTime;
/**
* 如果这次取出来了,则下次再取时设定的时间间隔,从主线程里面获取此参数
*/
private int nextGetWaitingTime;
/**
* 存储封装了MMessage对象的临时队列
*/
private LinkedList mmsStoreQueue;
/**
* 仅仅存储从MMSQueue表里面取出来的数据
*/
private ArrayList mmsQueueList;
/**
* 是否终止此获取彩信信息的线程
*/
public static boolean stop;
private MMToolKit tool;
public MMSendWorker(LinkedList sendqueue) {
this.sendqueue = sendqueue;
this.count = CommonProperties.perGetCount;
this.nextGetWaitingTime = CommonProperties.nextGetWaitingTime;
this.intervalGetWaitingTime = CommonProperties.intervalGetWaitingTime;
mmsStoreQueue = new LinkedList();
mmsQueueList = new ArrayList();
tool = new MMToolKit();
}
public void run() {
// 从mmsqueue里面取出对象,存放到mmsStoreQueue里面
// 如果这次取出来了,则下次再取时设定时间间隔(nextGetWaitingTime)
// 如果这次没有取出来,则间隔的时间(intervalGetWaitingTime)应大于nextGetWaitingTime
// 建立和数据库的连接,只建立一次
getConnection();
long begin = System.currentTimeMillis();
while (!stop) {
try {
createStoreQueue(con, count);
if (log.isDebugEnabled()) {
log.debug("mmsQueueList.size()=" + mmsQueueList.size());
}
createMMsgQueue();
if (log.isDebugEnabled()) {
log.debug("mmsStoreQueue.size()=" + mmsStoreQueue.size());
}
}
catch (SQLException e) {
log.error("MMSendWorkder run():" + e);
try {
Thread.sleep(intervalGetWaitingTime);
}
catch (Exception ee) {
log.error(ee);
}
getConnection();
log.info("SQLException,重新建立起和数据库之间的连接!");
// continue;
}
if (mmsStoreQueue.size() > 0) {// 如果有彩信,同步存储队列,将mm放入此队列中
begin = System.currentTimeMillis();
synchronized (sendqueue) {
sendqueue.addAll(mmsStoreQueue);
//如果采用优先级控制的话,则使用如下的方法
//sendqueue.addAll(0,mmsStoreQueue);
sendqueue.notify();
}
mmsStoreQueue.clear();// 清空临时存储对象
try {
log.info("成功取得彩信,等待发送.获取线程休眠" + (nextGetWaitingTime / 1000) + "秒!");
Thread.sleep(nextGetWaitingTime);
}
catch (Exception e) {
log.error(e);
}
}
else if (System.currentTimeMillis() - begin > CommonProperties.maxWaitingTime) {
begin = System.currentTimeMillis();
mmsStoreQueue.add(sendTestMMS());
log.info("彩信获取线程发送一条测试彩信!");
}
else {
try {
Thread.sleep(intervalGetWaitingTime);
log.info("暂时没有彩信,休眠" + (intervalGetWaitingTime / 1000) + "秒!");
}
catch (Exception e) {
log.error(e);
}
}
}
log.info("获取彩信队列线程终止,退出此服务!");
try {
if (con != null)
con.close();
log.info("获取彩信队列数据库连接关闭成功");
}
catch (Exception e) {
log.error(e);
}
}
/**
* 一次性从缓冲表取出count条数据,取出后即删除此批取出的数据
*
* @param con
* @param count
* @return
* @throws Exception
*/
private void createStoreQueue(Connection con, int count) throws SQLException {
// 这个地方应考虑优先级的一些问题。
String sql = "select queueid,corpid,spnumber,servicecode,vasfrom,vasto,sendtype,reportflag,readreply,feemobile,feemobiletype,subject,mmdir,contenttype,priority,feevalue,ReplyChargingSize,ReplyCharging,expiretime,deliverytime,EarliestDeliveryTime,replydeadline,AllowAdaptations,isvisible,messageclass from mmsqueue where rownum<=?";
String delSql = "delete from mmsqueue where queueid=?";
PreparedStatement stmt = null;
PreparedStatement stmtDel = null;
ResultSet rs = null;
// mmsQueueList = new ArrayList();//不再分配对象,一直使用此对象初始化的时候使用的那个
try {
con.setAutoCommit(false);
stmt = con.prepareStatement(sql);
stmt.setInt(1, count);
stmtDel = con.prepareStatement(delSql);
rs = stmt.executeQuery();
while (rs.next()) {
long queueid = rs.getLong("queueid");
String corpid = rs.getString("corpid");
String spnumber = rs.getString("spnumber");
String servicecode = rs.getString("servicecode");
MMSQueue queue = new MMSQueue(corpid, spnumber, servicecode);
queue.setQueueID(queueid);
queue.setVasFrom(rs.getString("vasfrom"));
queue.setVasTO(rs.getString("vasto"));
queue.setSendType(rs.getInt("sendtype"));// VAS.TO?VAS.Bcc?VAS.cc
queue.setReportFlag(rs.getInt("reportFlag"));
queue.setReadReply(rs.getInt("readreply"));
queue.setFeeMobile(rs.getString("feemobile"));
queue.setFeeMobileType(rs.getInt("feemobiletype"));
queue.setSubject(rs.getString("subject"));
queue.setMMDir(rs.getString("mmdir"));
queue.setContentType(rs.getInt("contenttype"));
queue.setPriority(rs.getInt("priority"));
queue.setFeeValue(rs.getString("feevalue"));
queue.setReplyChargingSize(rs.getLong("ReplyChargingSize"));
queue.setReplyCharging(rs.getInt("ReplyCharging"));
queue.setExpireTime(rs.getTimestamp("expiretime"));
queue.setDeliveryTime(rs.getTimestamp("deliverytime"));
queue.setEarliestDeliveryTime(rs.getTimestamp("EarliestDeliveryTime"));
queue.setReplyDeadLine(rs.getTimestamp("replydeadline"));
queue.setAllowAdaptations(rs.getInt("AllowAdaptations"));
queue.setIsVisible(rs.getInt("isvisible"));
queue.setMessageClass(rs.getString("messageclass"));
mmsQueueList.add(queue);
// 取出彩信并删除之
stmtDel.setLong(1, queueid);
stmtDel.addBatch();
}
// 从缓冲队列里面删掉这些个彩信了
stmtDel.executeBatch();
con.commit();
}
catch (SQLException e) { //造成的问题应该是数据库连接被关闭了的原因
mmsQueueList.clear();// 清除所有已经取到的数据
log.error("createStoreQueue:" + e);
con.rollback();
throw e;
}
finally {
con.setAutoCommit(true);
if (rs != null)
rs.close();
if (stmt != null)
stmt.close();
if (stmtDel != null)
stmtDel.close();
if (log.isDebugEnabled())
log.debug("此批彩信数据取出完毕并游标关闭!");
}
}
/**
* 根据刚才批量取出来的彩信,组合彩信内容
*
* @param mmsqueues
*/
private void createMMsgQueue() {
MM7SubmitReq submit = null;
MMSQueue queue = null;
MM7SubmitReq tempSubmit = null;
String tempDir = "";
// mmsStoreQueue.clear();//先清空队列里面已有的mmsubmit信息
for (int i = 0; i < mmsQueueList.size(); i++) {
try {
queue = (MMSQueue) mmsQueueList.get(i);
submit = tool.createMM(queue, tempSubmit, tempDir);
// if (log.isDebugEnabled())
// log.debug("submit.getTo()=" + submit.getTo());
tempSubmit = submit;
tempDir = queue.getMMDir();
mmsStoreQueue.add(submit);
}
catch (Exception e) {
log.error("MMSendWorker createMMsgQueue():" + e);
}
}
// 清空存储在mmsQueueList中的发送队列对象
mmsQueueList.clear();
if (log.isDebugEnabled())
log.debug("清除缓冲队列中的数据OK!");
}
/**
* 建立和数据库之间的连接
*
*/
private void getConnection() {
boolean isconnected = false;
while (!isconnected) {
try {
// getConnection();
Class.forName(CommonProperties.driver);
con = DriverManager.getConnection(CommonProperties.dburl, CommonProperties.dbuser,
CommonProperties.dbpass);
log.info("建立和彩信缓冲表数据库之间的连接成功!");
isconnected = true;
}
catch (Exception e) {
log.error("MMSendWorker getConnection():" + e);
try {
Thread.sleep(intervalGetWaitingTime);
}
catch (Exception ee) {
log.error(e);
}
// log.error(e);
}
}
}
/**
* 发送一条测试彩信,以保证发送线程不会莫名的断掉
*
* @return
*/
private MM7SubmitReq sendTestMMS() {
MM7SubmitReq submit = new MM7SubmitReq();
submit.setTransactionID(String.valueOf(System.currentTimeMillis()));
submit.addTo(CommonProperties.testMobile);
submit.setVASID(CommonProperties.mmsSPNumber);
submit.setVASPID(CommonProperties.mmsCorpID);
System.out.println(submit.getVASID()+"---"+submit.getVASPID());
submit.setServiceCode(CommonProperties.freeServiceCode);
System.out.println("******"+submit.getServiceCode()+"====");
// submit.setChargedPartyID("13608300000");
submit.setChargedParty((byte) 1); // 接收方付费
submit.setDeliveryReport(false); // 测试彩信,不需要状态报告
submit.setReadReply(false);// 不需要读后回复
// 填写SP的服务代码,或者填写让用户回复SP的长号码,长号码构成:SP的服务代码+业务代码+操作码),
submit.setSenderAddress(CommonProperties.mmsSPNumber);
System.out.println("******"+submit.getSenderAddress()+"====");
submit.setSubject("测试彩信,保证链路畅通!");
MMContent content = new MMContent();
content.setContentType(com.cmcc.mm7.vasp.common.MMConstants.ContentType.MULTIPART_MIXED);
content.setContentLocation("mm7Test");
content.setContentID("mm7Test");
MMContent subcontent = MMContent.createFromString("MM7SubmitReq Send Test!");
subcontent.setContentType(com.cmcc.mm7.vasp.common.MMConstants.ContentType.TEXT);
subcontent.setContentID("mm7TestText");
subcontent.setContentLocation("mm7TestText");
content.addSubContent(subcontent);
submit.setContent(content);
return submit;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -