📄 saveincomingmessagetask.java
字号:
package com.infobank.superchannel.daemon.databasecache;import java.util.Queue;import org.apache.commons.configuration.Configuration;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.TextMessageStatusSetting;import com.infobank.superchannel.pojo.TextMessageResultSetting;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: save incoming text message task */public class SaveIncomingMessageTask implements Runnable { public class IncomingMessageTableSetting{ // 回写用数据源名称 public String dataSource; // 回写上行短信表名 public String tableName; public TextMessageStatusSetting status; public TextMessageResultSetting result; } // 数据库缓冲服务器对象 private DatabaseCacheDaemon daemon=null; // 每次回写数据时间间隔(单位毫秒:1000毫秒=1秒) private int interval=0; // table to save incoming message private IncomingMessageTableSetting[] settings; // number of tables to save incoming message private int numberOfSettings; // is incoming message enabled private boolean enabled=false; /** * 构造函数 * @param daemon 数据库缓冲服务器对象 */ public SaveIncomingMessageTask(DatabaseCacheDaemon daemon){ // 读取上行信息配置 this.daemon=daemon; DaemonConfiguration config=DaemonConfiguration.getInstance(); numberOfSettings=config.getMaxIndex("/daemon[@type='DatabaseCache']/task/updater/incoming/table"); if(numberOfSettings<=0) return; settings=new IncomingMessageTableSetting[numberOfSettings]; for(int idx=1;idx<=numberOfSettings;++idx){ int tableIndex=idx-1; IncomingMessageTableSetting setting=new IncomingMessageTableSetting(); StringBuilder sb=new StringBuilder(); sb.append("/daemon[@type='DatabaseCache']/task/updater/incoming/table["); sb.append(idx); sb.append("]/"); String basePath=sb.toString(); // 重新回写时间间隔 interval=config.getInt("/daemon[@type='DatabaseCache']/task/updater/incoming/interval"); sb=new StringBuilder(); sb.append(basePath); sb.append("@datasource"); setting.dataSource=config.getString(sb.toString()); if(setting.dataSource==null) setting.dataSource="default"; sb=new StringBuilder(); sb.append(basePath); sb.append("name"); setting.tableName=config.getString(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("@status"); String status=config.getString(sb.toString()); if(status!=null){ sb=new StringBuilder(); sb.append("/daemon[@type='DatabaseCache']/message/status[@name='"); sb.append(status); sb.append("']/"); String statusBasePath=sb.toString(); sb.append("delivered"); int delivered=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(statusBasePath); sb.append("sending"); int sending=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(statusBasePath); sb.append("blocked"); int blocked=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(statusBasePath); sb.append("waiting"); int waiting=config.getInt(sb.toString()); TextMessageStatusSetting statusSetting=new TextMessageStatusSetting(waiting,sending,delivered,blocked); setting.status=statusSetting; } sb=new StringBuilder(); sb.append(basePath); sb.append("@result"); String result=config.getString(sb.toString()); if(result!=null){ sb=new StringBuilder(); sb.append("/daemon[@type='DatabaseCache']/message/result[@name='"); sb.append(result); sb.append("']/"); String resultBasePath=sb.toString(); sb.append("successful"); int successful=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(resultBasePath); sb.append("failed"); int failed=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(resultBasePath); sb.append("overflow"); int overflow=config.getInt(sb.toString()); TextMessageResultSetting resultSetting=new TextMessageResultSetting(successful,failed,overflow); setting.result=resultSetting; } settings[idx-1]=setting; } enabled=true; } /** * 信息状态回写任务执行体 */ public void run() { if(!enabled) return; DAO dao=DAO.getInstance(); DatabaseConnectionPool dbcp=DatabaseConnectionPool.getInstance(); while(daemon.isRunnable()){ try{ if(daemon.isPaused()){ Thread.sleep(interval); continue; } } catch (InterruptedException ex){ // interrupted. probably want to exit if(!daemon.isRunnable()){ System.out.println("正在停止数据更新任务"); return; } } // 取得待回写上行短信 Queue<TextMessage> messages=daemon.getNewIncomingMessage(); dao.saveIncomingMessage(settings,messages); try { Thread.sleep(interval); //查询表的间隔 } catch (Exception e) { System.err.println("Interrupted"); return; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -