📄 bossreceivesvcimpl.java
字号:
package com.asiainfo.batchsend.sms.boss;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import com.asiainfo.batchsend.model.ReceiveSms;
import com.asiainfo.batchsend.model.SmsPush;
import com.asiainfo.databusi.util.CharUtil;
/**
* <p>Title: </p>
* <p>Description: </p>
* <p>Company: Asiainfo Technologies(China),Inc </p>
* <p>Date: Nov 6, 2007 3:55:51 PM </p>
* <p>Email to: jiangyl@asiainfo.com </p>
* @author jiangyl
* @version 1.0
*/
public class BossReceiveSvcImpl implements BossSvc {
private static final Logger log = Logger.getLogger(BossReceiveSvcImpl.class);
private String channelId = "";
public BossReceiveSvcImpl(String channelId){
this.channelId = channelId;
}
//判断是否有可发送的任务
String hasLoadData = " select DONE_CODE,SRC_ADDR,DST_ADDR,TEXT,RECEIVE_DATE,DONE_DATE,STATUS " +
" from sms_receive_10086100 where status =" + TODO + " and rownum <= 3" ;
public boolean hasLoadData() {
Connection conn = null;
boolean flag = false;
try {
conn = DBCon.getConnection();
conn.setAutoCommit(false);
ResultSet rs = conn.createStatement().executeQuery(hasLoadData);
while(rs.next()){
flag = true;
break;
}
rs.close();
conn.close();
} catch (SQLException e) {
log.error("深运平台检查BOSS的sms_receive_10086100时发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
return flag;
}
//这里只根据任务提取数据,主要是因为根据调度任务传进来的taskId已经具备发送的条件
String loadData = " select DONE_CODE,SRC_ADDR,DST_ADDR,TEXT,RECEIVE_DATE,DONE_DATE,STATUS " +
" from sms_receive_10086100 where status = ? and rownum <= 10 ";
String loadDataUpdate = " update sms_receive_10086100 t set t.status = ? where t.src_addr = ?";
public List loadData() {
this.threadSleep(Config12350.sleepAfterReceived);
Connection conn = null;
List list = new ArrayList();
try {
conn = DBCon.getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(loadData);
PreparedStatement ups = conn.prepareStatement(loadDataUpdate);
ps.setInt(1, TODO);
ResultSet rs = ps.executeQuery();
while(rs.next()){
ReceiveSms sms = new ReceiveSms();
sms.setDoneCode(rs.getLong("DONE_CODE"));
sms.setSrcAddr( rs.getString("SRC_ADDR").trim());
sms.setDstAddr( rs.getString("DST_ADDR").trim());
sms.setText( rs.getString("TEXT").trim());
sms.setStatus( rs.getInt("STATUS"));
list.add(sms);
ups.setInt(1, DONE);
ups.setString(2, sms.getSrcAddr());
ups.addBatch();
}
ups.executeBatch();
ps.close();
ups.close();
conn.commit();
System.out.println("深运平台从BOSS的sms_receive_10086100中获取" + list.size() + "条记录,更新" + list.size() +"条记录");
} catch (Exception e) {
list = new ArrayList();
try{
conn.rollback();
}catch(Exception ee){
}
log.error("深运平台从BOSS的sms_receive_10086100中获取发送数据时发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
return list;
}
String pSend = " insert into sms_received(msisdn, received_msg, status, flag, received_time,imsg_port,camp_channel_id) "
+ " values(?,?,0,0,current timestamp,?,?) ";
public boolean sendData(List list) {
boolean flag = true;
Connection conn = null;
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(pSend);
for ( int i = 0; i < list.size(); i++ ){
ReceiveSms sms = (ReceiveSms)list.get(i);
ps.setString(1, sms.getSrcAddr());
ps.setString(2, CharUtil.convertCharToDb( sms.getText()));
ps.setString(3, sms.getDstAddr());
ps.setString(4,this.channelId);
ps.addBatch();
}
int[] errArray = ps.executeBatch();
conn.commit();
System.out.println("向深运平台的sms_received数据库表插入数据,共插入了" + list.size() + "条记录");
} catch (SQLException e) {
try{
conn.rollback();
}catch(SQLException se){
}
flag = false;
log.error("向深运平台的sms_received数据库表插入数据时发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
return flag;
}
public boolean successDone(List list) {
boolean flag = true;
Connection conn = null;
PreparedStatement psDelete = null;
String delete = " delete from sms_receive_10086100 t where t.src_addr = ? ";
try {
conn = DBCon.getConnection();
conn.setAutoCommit(false);
psDelete = conn.prepareStatement(delete);
for ( int i = 0; i < list.size(); i++ ){
psDelete.setString(1, ((ReceiveSms)list.get(i)).getSrcAddr());
psDelete.addBatch();
}
psDelete.executeBatch();
conn.commit();
} catch (Exception e) {
if(conn!=null){
try {
conn.rollback();
} catch (SQLException se) {
}
}
log.debug("BOSS接收线程删除时发生了严重的错误");
} finally {
if(null != psDelete){
try {
psDelete.close();
} catch (SQLException e) {
}
}
if(conn!=null){
try {
conn.commit();
conn.close();
} catch (SQLException e) {
}
}
}
return flag;
}
public void failDone(List list) {
Connection conn = null;
String update = "update sms_push t set t.status = ? where t.id = ? ";
try {
conn = DBCon.getShenYunDataSource().getConnection();
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(update);
for ( int i = 0; i < list.size(); i++ ){
ps.setInt(1, FAIL);
ps.setLong(2, ( (SmsPush)list.get(i) ).getId() );
ps.addBatch();
}
ps.executeBatch();
conn.commit();
} catch (SQLException e) {
try{
conn.rollback();
}catch(SQLException se){
}
log.error("BOSS发送线程在设置发送失败状态的时候发生严重的错误",e);
} finally {
if(conn!=null){
try {
conn.close();
} catch (SQLException e) {
}
}
}
}
private void threadSleep(long msec) {
try {
Thread.sleep(msec);
} catch (Exception e) {
}
}
public String getChannel() {
return this.channelId;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -