📄 wappspsendsvcimpl.java
字号:
package com.asiainfo.batchsend.wappush.psp;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import com.asiainfo.batchsend.CtrCenEntry;
import com.asiainfo.databusi.util.CharUtil;
import com.asiainfo.batchsend.model.WapPush;
import com.asiainfo.batchsend.sms.boss.DBCon;
import com.asiainfo.batchsend.util.ChannelUtil;
import com.asiainfo.batchsend.util.DateUtil;
import com.asiainfo.batchsend.util.FileUtil;
import com.asiainfo.batchsend.util.FtpUtil;
import com.asiainfo.batchsend.util.TaskIdUtil;
import com.asiainfo.databusi.bo.NbCampChannels;
import com.asiainfo.databusi.util.DataBusiDbUtils;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Company: Asiainfo Technologies(China),Inc </p>
* <p>Date: Nov 6, 2007 3:55:51 PM </p>
* <p>Email to: jiangyl@asiainfo.com </p>
* @author jiangyl
* @version 1.0
*/
public class WapPspSendSvcImpl extends Thread {
//发送状态 Status 如果已经达到发送次数的上限,则将其状态置为2现在的状态:0,未发送,1,发功成功,2 发送失败, 3发送中*/
//所有的实现需要根据实际的任务去查找数据,taskId在CtrCenEntry中获取
public static final int TODO = 0;
public static final int DONE = 1;
public static final int DOING = 3;
public static final int FAIL = 2;
private static final Logger log = Logger.getLogger(WapPspSendSvcImpl.class);
private static String strDBType = DataBusiDbUtils.BACKTYPE;
private String channelId;
private String sql = "";
private NbCampChannels channel = null;
private boolean safeExit = false;
public void safeExit(){
safeExit = true;
}
public void run() {
while ( !safeExit ){
try{
List list = loadData();
if ( list != null && list.size() > 0 ){
sendData(list);
}else{
this.threadSleep(10000);
}
}catch(Exception e){
log.error("MMS发送线程发送严重的错误",e);
}
}
}
public WapPspSendSvcImpl(String channelId ){
setName("WAP-WapPspSendSvcImpl");
this.channelId = channelId;
channel = ChannelUtil.getChannelById(channelId);
sql = " select id, task_id, start_date , end_date , camp_id, obj_id, msisdn, wap_url, wap_subject, status, start_date,end_date " +
" from nb_wap_push where status = ? and task_id = ? ";
if (strDBType.equalsIgnoreCase("oracle")) {
sql += " and rownum <= " + WapConfig.LOADCOUNT;
}else if (strDBType.equalsIgnoreCase("db2")) {
sql += " fetch first " + WapConfig.LOADCOUNT + " rows only ";
}
}
//这里只根据任务提取数据,主要是因为根据调度任务传进来的taskId已经具备发送的条件
String udpate = " update nb_wap_push t set t.status = ?, send_count = send_count + 1 where t.id = ? ";
public List loadData() {
//获得当前执行的任务ID
String taskId = CtrCenEntry.getInstance().getTaskIdByChannel(channelId);
if ( taskId == null )
return null;
synchronized( channelId ){
Connection conn = null;
List list = new ArrayList();
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(sql);
PreparedStatement ups = conn.prepareStatement(udpate);
ps.setInt(1, TODO);
ps.setString(2, taskId);
ResultSet rs = ps.executeQuery();
while(rs.next()){
WapPush sms = new WapPush();
sms.setId( rs.getLong("id"));
sms.setTaskId( rs.getString("task_id"));
sms.setCampId( rs.getString("camp_id"));
sms.setObjId( rs.getString("obj_id").trim());
sms.setMsisdn( rs.getString("msisdn").trim());
sms.setWapSubject(CharUtil.convertCharToGBK(rs.getString("wap_subject").trim()));
sms.setWapUrl( rs.getString("wap_url").trim());
sms.setStartDate( rs.getString("start_date").replaceAll("-", "").substring(0,8));
sms.setEndDate( rs.getString("end_date").replaceAll("-", "").substring(0,8));
list.add(sms);
ups.setInt(1, DONE);
ups.setLong(2, sms.getId());
ups.addBatch();
}
ups.executeBatch();
ps.close();
ups.close();
conn.commit();
log.debug("深运平台从nb_wap_push表在装载数据时获取" + list.size() + "条记录,更新" + list.size() +"条记录");
} catch (Exception e) {
try{
conn.rollback();
}catch(Exception ee){
}
log.error("深运平台在装载数据时发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
return list;
}
}
public boolean sendData(List list) {
boolean flag = false;
int taskNum = TaskIdUtil.getWAPNextId();
String num = taskNum+"";
if ( taskNum < 10)
num = "0" + num;
String date = DateUtil.getYYYYMMDD();
String TASKINFO = "task" + num + "_" + date +".xml";
String USERGROUP = "mobile" + num +"_" + date +".txt";
String CHECKFFILE = "check" + num +"_" + date +".txt";
int USERGROUPCOUNT = list.size();
WapPush push = (WapPush)list.get(0);
//1.构造手机号码文件的内容
StringBuffer msisdns = this.getMsisdnsFile(list);
//2.构造task文件的内容
StringBuffer taskxx = this.getTaskxx(push);
//3.构造校验文件的内容
StringBuffer checkFile = new StringBuffer("TASKINFO = " + TASKINFO+"\r\n");
checkFile.append("USERGROUP = " + USERGROUP+"\r\n");
checkFile.append("USERGROUPCOUNT = " + USERGROUPCOUNT+"\r\n");
checkFile.append("$$end$$");
//4.ftp到psp平台
FtpUtil ftp = new FtpUtil(WapConfig.user,WapConfig.password,WapConfig.server,WapConfig.dataPath);
//ftp.setFtpPath(WapConfig.ftpPath);
try{
//5.保存手机构造文件
FileUtil.saveWapFile(msisdns,USERGROUP);
//6.保存task文件
FileUtil.saveWapFile(taskxx,TASKINFO);
//7.保存校验文件
FileUtil.saveWapFile(checkFile,CHECKFFILE);
if ( WapConfig.FTP.trim().equals("1") ){
ftp.connect(WapConfig.ftpPath);
ftp.put(TASKINFO);
ftp.put(USERGROUP);
ftp.put(CHECKFFILE);
FileUtil.moveWapFile(USERGROUP);
FileUtil.moveWapFile(TASKINFO);
FileUtil.moveWapFile(CHECKFFILE);
}
flag = true;
}catch(IOException e){
flag = false;
try{
FileUtil.deleteWapFile(USERGROUP);
FileUtil.deleteWapFile(TASKINFO);
FileUtil.deleteWapFile(CHECKFFILE);
TaskIdUtil.setWAPPreId();
}catch(Exception ee ){
}
log.error("",e);
}finally{
try{
ftp.close();
}catch(Exception ee){
}
}
return flag;
}
private StringBuffer getMsisdnsFile(List list){
WapPush push = null;
StringBuffer msisdns = new StringBuffer("");
for ( int i = 0; i < list.size(); i++ ){
push = (WapPush)list.get(i);
msisdns.append(push.getMsisdn()+"\r\n");
}
try{
msisdns = new StringBuffer(new String (msisdns.toString().getBytes("ASCII")));
}catch(Exception e){
}
return msisdns;
}
private StringBuffer getTaskxx(WapPush push ){
//防止在中途修改了发送时段,不能及时同步
channel = ChannelUtil.getChannelById(channelId);
if ( channel.getStartTime().trim().length() == 1 ){
channel.setStartTime("0" + channel.getStartTime().trim() );
}
if ( channel.getEndDate().trim().length() == 1 ){
channel.setEndDate("0" + channel.getEndDate().trim() );
}
if ( channel.getStartTime2().trim().length() == 1 ){
channel.setStartTime2("0" + channel.getStartTime2().trim() );
}
if ( channel.getEndDate2().trim().length() == 1 ){
channel.setEndDate2("0" + channel.getEndDate2().trim() );
}
StringBuffer taskxx = new StringBuffer("");
taskxx.append("<?xml version = \"1.0\" encoding=\"gb2312\"?>\r\n");
taskxx.append("<psp_task>\r\n");
taskxx.append(" <request_task_id>" + WapConfig.SENDID + "_" + DateUtil.getYYYYMMDDHHMMSS()+ "</request_task_id>\r\n");
taskxx.append(" <task_name>" + push.getTaskId()+ "</task_name>\r\n"); //我们将名称置换为taskId主要是再解析日志的时候根据taskId和手机号码来修改发送成功与否
taskxx.append(" <task_desc>" + push.getWapSubject() + "</task_desc>\r\n");
taskxx.append(" <message>\r\n");
taskxx.append(" <type>1</type>\r\n");
taskxx.append(" <content>" + push.getWapSubject() + "</content>\r\n");
taskxx.append(" <href>" + push.getWapUrl()+ "</href>\r\n");
taskxx.append(" </message>\r\n");
taskxx.append(" <period>\r\n");
taskxx.append(" <push_date start=\"" + push.getStartDate() + "\" end=\"" + push.getEndDate()+ "\"/>\r\n");
taskxx.append(" <push_time>\r\n");
taskxx.append(" <time start=\"" + channel.getStartTime().trim() + ":00\" end=\"" + channel.getEndDate().trim() + ":00\"/>\r\n");
taskxx.append(" <time start=\"" + channel.getStartTime2().trim() + ":00\" end=\"" + channel.getEndDate2().trim() + ":00\"/>\r\n");
taskxx.append(" </push_time>\r\n");
taskxx.append(" </period>\r\n");
taskxx.append(" <sender>\r\n");
taskxx.append(" <id>" + WapConfig.SENDID + "</id>\r\n");
taskxx.append(" <passwd>" + WapConfig.SENDPWD + "</passwd>\r\n");
taskxx.append(" <ip>" + WapConfig.IP + "</ip>\r\n");
taskxx.append(" </sender>\r\n");
taskxx.append(" <reserved>保留字段</reserved>\r\n");
taskxx.append("</psp_task>");
return taskxx;
}
//发送成功之后如何处理数据
public boolean successDone(List list) {
return true;
}
//发送失败后需要如何处理数据
public void failDone(List list) {
int count = WapConfig.SENDCOUNT;
Connection conn = null;
String update = "update nb_wap_push t set t.status = (case when send_count = " + count + " then 2 else 0 end) where t.id = ? ";
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(update);
for ( int i = 0; i < list.size(); i++ ){
ps.setLong(1, ( (WapPush)list.get(i) ).getId() );
ps.addBatch();
}
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
try{
conn.rollback();
}catch(SQLException se){
}
log.error("深运平台在设置nb_wap_push表发送失败状态的时候发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
}
public void threadSleep(long msec) {
try {
Thread.sleep(msec);
} catch (Exception e) {
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -