📄 readmessagetask.java
字号:
package com.infobank.superchannel.daemon.databasecache;import java.util.Queue;import org.apache.commons.configuration.Configuration;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.database.DAOListener;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.TextMessageResultSetting;import com.infobank.superchannel.pojo.TextMessageStatusSetting;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: 读取新信息任务 */public class ReadMessageTask implements Runnable,DAOListener{ // 应扫描的表名 private String tableName=null; // 数据库缓冲服务器对象 private DatabaseCacheDaemon daemon=null; // 每次扫描扫描间隔时间(单位毫秒:1000毫秒=1秒) private int interval=5000; // 每次扫描最多读取信息条数 private int maxRecordNumber=0; // 特服号约束条件 private String callbackPattern=null; // datasource name private String dataSource=null; // status setting private TextMessageStatusSetting statusSetting=null; // result setting private TextMessageResultSetting resultSetting=null; private String tag=null; /** * 构造函数 * @param daemon 数据库缓冲服务器对象 * @param tableName 此任务应扫描的数据库表名 * @param dataSource Datasource for this task * @param callbackPattern callback pattern * @param maxRecordNumber maximum record number in one read * @param interval interval of each reading * @param statusSetting status setting * @param resultSetting result setting * @param tag tag */ public ReadMessageTask(DatabaseCacheDaemon daemon, String tableName, String dataSource,String callbackPattern,int maxRecordNumber,int interval, TextMessageStatusSetting statusSetting, TextMessageResultSetting resultSetting, String tag){ this.daemon=daemon; this.tableName=tableName; this.dataSource=dataSource; this.callbackPattern=callbackPattern; this.maxRecordNumber=maxRecordNumber; this.interval=interval; this.statusSetting=statusSetting; this.resultSetting=resultSetting; this.tag=tag; } /** * 新信息读取任务执行体 */ public void run() { DAO dao=DAO.getInstance(); DatabaseConnectionPool dbcp=DatabaseConnectionPool.getInstance(); while(daemon.isRunnable()){ try{ if(daemon.isPaused()){ Thread.sleep(interval); continue; } } catch (InterruptedException ex){ // interrupted. probably want to exit if(!daemon.isRunnable()){ System.out.println("正在停止数据读取任务"); return; } } // 获取待发信息 dao.getPendingMessage(dataSource,tableName,callbackPattern,maxRecordNumber,statusSetting, resultSetting, tag, this); try { Thread.sleep(interval); //查询表的间隔 } catch (Exception e) { return; } } } public void onMessages(Queue<TextMessage> messages) { while(daemon.isMessageQueueFull()){ try{ Thread.sleep(interval); } catch(Exception ex){ return; } } while(!daemon.addNewMessage(messages, tag)){ // message can not be added into message queue if(!daemon.isRunnable()){ // server is exiting. // the new message does not get delivered however the status are not changed // these message will be delivered next time return; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -