📄 mmsmm7sendsvcimpl.java
字号:
package com.asiainfo.batchsend.mms.mm7api;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import com.asiainfo.batchsend.CtrCenEntry;
import com.asiainfo.batchsend.mms.mm7api.MM7ConifgUtil;
import com.asiainfo.batchsend.model.MmsPush;
import com.asiainfo.batchsend.sms.boss.DBCon;
import com.asiainfo.batchsend.util.ChannelUtil;
import com.asiainfo.databusi.bo.NbCampChannels;
import com.asiainfo.databusi.platform.util.MMSSendData;
import com.asiainfo.databusi.util.DataBusiDbUtils;
import com.asiainfo.databusi.util.IdGen;
import com.asiainfo.servlet.SystemCommonListener;
import com.cmcc.mm7.vasp.message.MM7RSRes;
import com.cmcc.mm7.vasp.message.MM7SubmitReq;
import com.cmcc.mm7.vasp.message.MM7SubmitRes;
import com.cmcc.mm7.vasp.service.MM7Sender;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Company: Asiainfo Technologies(China),Inc </p>
* <p>Date: Nov 6, 2007 3:55:51 PM </p>
* <p>Email to: jiangyl@asiainfo.com </p>
* @author jiangyl
* @version 1.0
*/
public class MmsMm7SendSvcImpl extends Thread {
//发送状态 Status 如果已经达到发送次数的上限,则将其状态置为2现在的状态:0,未发送,1,发功成功,2 发送失败, 3发送中*/
//所有的实现需要根据实际的任务去查找数据,taskId在CtrCenEntry中获取
public static final int TODO = 0;
public static final int DONE = 1;
public static final int DOING = 3;
public static final int FAIL = 2;
private static final Logger log = Logger.getLogger(MmsMm7SendSvcImpl.class);
private static String strDBType = DataBusiDbUtils.BACKTYPE;
private String channelId;
private String sql = "";
private NbCampChannels channel = null;
private MM7Sender mm7Sender = null;
private boolean safeExit = false;
public void safeExit(){
safeExit = true;
}
public void run() {
while ( !safeExit ){
try{
List list = loadData();
if ( list != null && list.size() > 0 ){
sendData(list);
}else{
this.threadSleep(10000);
}
}catch(Exception e){
log.error("MMS发送线程发送严重的错误",e);
}
}
}
public MmsMm7SendSvcImpl(String channelId ){
setName("MMS-MmsMm7SvcImpl");
try{
mm7Sender = new MM7Sender(MM7ConifgUtil.getMM7Config());
}catch(Exception e){
log.error("MmsMm7SendSvcImpl初始化发送严重的错误",e);
}
this.channelId = channelId;
//因为中间有可能修改时间短,因此需要即时从数据库中提取
channel = ChannelUtil.getChannelById(channelId);
sql = " select id, camp_id, obj_id, msisdn, file_path, subject, status, start_date,end_date " +
" from nb_mms_push where status = ? and task_id = ? ";
if (strDBType.equalsIgnoreCase("oracle")) {
sql += " and rownum < 101";
}else if(strDBType.equalsIgnoreCase("db2")) {
sql += " fetch first 100 rows only ";
}
}
//这里只根据任务提取数据,主要是因为根据调度任务传进来的taskId已经具备发送的条件
String udpate = " update nb_mms_push t set t.status = ?, send_count = send_count + 1 where t.id = ? ";
public List loadData() {
//获得当前执行的任务ID
String taskId = CtrCenEntry.getInstance().getTaskIdByChannel(channelId);
if ( taskId == null )
return null;
synchronized( channelId ){
Connection conn = null;
List list = new ArrayList();
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(sql);
PreparedStatement ups = conn.prepareStatement(udpate);
ps.setInt(1, TODO);
ps.setString(2, taskId);
ResultSet rs = ps.executeQuery();
while(rs.next()){
MmsPush sms = new MmsPush();
sms.setId( rs.getLong("id"));
sms.setCampId( rs.getString("camp_id"));
sms.setObjId( rs.getString("obj_id").trim());
sms.setMsisdn( rs.getString("msisdn").trim());
sms.setFilePath( rs.getString("file_path").trim());
sms.setSubject( rs.getString("subject").trim());
list.add(sms);
ups.setInt(1, DONE);
ups.setLong(2, sms.getId());
ups.addBatch();
}
ups.executeBatch();
ps.close();
ups.close();
conn.commit();
log.debug("深运平台从nb_mms_push表在装载数据时获取" + list.size() + "条记录,更新" + list.size() +"条记录");
} catch (Exception e) {
try{
conn.rollback();
}catch(Exception ee){
}
log.error("深运平台在装载数据时发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
return list;
}
}
public boolean sendData(List list) {
MmsPush mms = (MmsPush)list.get(0);
String filePath = SystemCommonListener.realPath + "file" + java.io.File.separator + mms.getFilePath();
long begin = 0;
for ( int i = 0; i < list.size(); i++ ){
begin = System.currentTimeMillis();
try{
mms = (MmsPush)list.get(i);
MM7SubmitReq submit = new MM7SubmitReq();
submit.addTo( mms.getMsisdn());
submit.setSubject( mms.getSubject());
submit.setTransactionID(IdGen.getNextSmssqVal()+""); /*设置MM7_submit.REQ/MM7_submit.RES对的标识,必备*/
submit.setVASPID(MM7ConifgUtil.spid); //设置SP代码,必备 icpId
submit.setVASID(MM7ConifgUtil.id); //设置服务代码,必备 ismgPort
submit.setServiceCode(MM7ConifgUtil.sc); //设置业务代码,必备 serviceType
submit.setSenderAddress(MM7ConifgUtil.sa); //设置MM始发方的地址(填写SP的服务代码,或者填写让用户回复SP的长号码,长号码构成:SP的服务代码+业务代码+操作码),必备
submit.setChargedPartyID(MM7ConifgUtil.ff); //设置付费方的手机号码,必备
submit.setChargedParty((byte)4);
submit.setDeliveryReport(false);
submit.setReadReply(false);
submit.setContent(MMSSendData.getSendData(filePath));
MM7RSRes rsRes = (MM7RSRes)mm7Sender.send(submit);
log.info("******************************\r\n spend time = " + (System.currentTimeMillis()-begin) + "\r\n rsRes.getStatusCode()= " + rsRes.getStatusCode());
boolean flag = false;
if(rsRes instanceof MM7SubmitRes){
if( rsRes.getStatusCode() == 1000 )
flag = true;
}
begin = 100 - (System.currentTimeMillis()-begin) -30;
threadSleep(begin);
if ( !flag ){
fail(mms);
}
}catch(Exception e){
fail(mms);
log.error("mms send error",e);
}
}
return true;
}
public void fail(MmsPush push) {
int count = channel.getSendFailCount().intValue();
Connection conn = null;
String update = "update nb_mms_push t set t.status = (case when send_count >= " + count + " then 2 else 0 end) where t.id = ? and t.camp_id = ? ";
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(update);
ps.setLong(1, push.getId() );
ps.setString(2, push.getCampId());
ps.addBatch();
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
try{
conn.rollback();
}catch(SQLException se){
}
log.error("深运平台在设置nb_mms_push表发送失败状态的时候发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
}
public void threadSleep(long msec) {
if ( msec <= 0 )
return;
try {
Thread.sleep(msec);
} catch (Exception e) {
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -