📄 memorypersistencedatabasecachestatusobserver.java.svn-base
字号:
package com.infobank.superchannel.observer.implementation;import java.util.Date;import java.util.LinkedList;import java.util.List;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import org.apache.commons.configuration.Configuration;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemon;import com.infobank.superchannel.observer.DatabaseCacheStatusObserver;import com.infobank.superchannel.pojo.BlackList;import com.infobank.superchannel.pojo.BlackListLog;import com.infobank.superchannel.pojo.GenericLog;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.TextMessageLog;import com.infobank.superchannel.util.DaemonConfiguration;/** * @author Xiaoguang Sun * @version 类说明: MemoryPersistence状态观察者 */public class MemoryPersistenceDatabaseCacheStatusObserver extends MemoryPersistenceLogObserver implements DatabaseCacheStatusObserver { private List<TextMessageLog> blockMessageLog=new LinkedList<TextMessageLog>(); private Object blockMessageLogMutex=new Object(); private static int maximumBlockMessageLogSize=100; private List<TextMessageLog> newMessageLog=new LinkedList<TextMessageLog>(); private Object newMessageLogMutex=new Object(); private static int maximumNewMessageLogSize=100; private List<TextMessageLog> updateMessageLog=new LinkedList<TextMessageLog>(); private Object updateMessageLogMutex=new Object(); private static int maximumUpdateMessageLogSize=100; private List<TextMessageLog> newMessageStatusLog=new LinkedList<TextMessageLog>(); private Object newMessageStatusLogMutex=new Object(); private static int maximumNewMessageStatusLogSize=100; private List<TextMessageLog> newIncomingMessageLog=new LinkedList<TextMessageLog>(); private Object newIncomingMessageLogMutex=new Object(); private static int maximumNewIncomingMessageLogSize=100; private List<BlackListLog> blackListLog=new LinkedList<BlackListLog>(); private Object blackListLogMutex=new Object(); private static int maximumBlackListLogSize=100; private AtomicInteger incomingMessageQueueSize=new AtomicInteger(0); private AtomicInteger messageQueueSize=new AtomicInteger(0); private AtomicInteger numberOfFailedMessages=new AtomicInteger(0); private AtomicInteger numberOfSuccessfulMessages=new AtomicInteger(0); private AtomicInteger statusQueueSize=new AtomicInteger(0); private AtomicReference<String> state=new AtomicReference<String>(new String()); public static void init(DatabaseCacheDaemon daemon){ DaemonConfiguration config=DaemonConfiguration.getInstance(); String basePath="/daemon[@type='DatabaseCache']/observer[@type='DatabaseCacheStatusObserver']/capacity[@persistence='Memory']/"; StringBuilder sb=new StringBuilder(); sb.append(basePath); sb.append("blockMessage"); maximumBlockMessageLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("newMessage"); maximumNewMessageLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("updateMessage"); maximumUpdateMessageLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("newMessageStatus"); maximumNewMessageStatusLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("newIncomingMessage"); maximumNewIncomingMessageLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("blackList"); maximumBlackListLogSize=config.getInt(sb.toString()); } /** * 取得自上次调用此方法后被过滤掉的短信息内容日志 * @return 自上次调用此方法后被过滤掉的短信息内容日志 */ public List<TextMessageLog> getBlockMessageLog(){ List<TextMessageLog> log=null; synchronized(blockMessageLogMutex){ if(blockMessageLog.isEmpty()) return null; log=blockMessageLog; blockMessageLog=new LinkedList<TextMessageLog>(); } return log; } /** * 取得自上次调用此方法后从数据库中取出的待发短信息日志 * @return 自上次调用此方法后从数据库中取出的待发短信息日志 */ public List<TextMessageLog> getNewMessageLog(){ List<TextMessageLog> log=null; synchronized(newMessageLogMutex){ if(newMessageLog.isEmpty()) return null; log=newMessageLog; newMessageLog=new LinkedList<TextMessageLog>(); } return log; } /** * 取得自上次调用此方法后被过短信息状态被回写回数据库的日志 * @return 自上次调用此方法后被过短信息状态被回写回数据库的日志 */ public List<TextMessageLog> getUpdateMessageLog(){ List<TextMessageLog> log=null; synchronized(updateMessageLogMutex){ if(updateMessageLog.isEmpty()) return null; log=updateMessageLog; updateMessageLog=new LinkedList<TextMessageLog>(); } return log; } /** * 取得自上次调用此方法后新增短信息状态更新请求的日志 * @return 自上次调用此方法后新增短信息状态更新请求的日志 */ public List<TextMessageLog> getNewMessageStatusLog(){ List<TextMessageLog> log=null; synchronized(newMessageStatusLogMutex){ if(newMessageStatusLog.isEmpty()) return null; log=newMessageStatusLog; newMessageStatusLog=new LinkedList<TextMessageLog>(); } return log; } /** * 取得自上次调用此方法后所有黑名单更新日志 * @return 自上次调用此方法后所有黑名单更新日志 */ public List<BlackListLog> getBlackListLog(){ List<BlackListLog> log=null; synchronized(blackListLogMutex){ if(blackListLog.isEmpty()) return null; log=blackListLog; blackListLog=new LinkedList<BlackListLog>(); } return log; } /** * 取得待发消息队列尺寸 * @param token 用户访问令牌 * @return 待发消息队列尺寸 */ public int getMessageQueueSize(){ return messageQueueSize.get(); } /** * 取得累计发送失败短信息数目 * @param token 用户访问令牌 * @return 累计发送失败短信息数目 */ public int getNumberOfFailedMessages(){ return numberOfFailedMessages.get(); } /** * 取得累计发送成功短信息数目 * @param token 用户访问令牌 * @return 累计发送成功短信息数目 */ public int getNumberOfSuccessfulMessages(){ return numberOfSuccessfulMessages.get(); } /** * 取得状态更新回写队列尺寸 * @param token 用户访问令牌 * @return 状态更新回写队列尺寸 */ public int getStatusQueueSize(){ return statusQueueSize.get(); } private void checkListSize(List list,int maxsize){ int listSize=list.size(); if(listSize>=maxsize){ //list.subList(0, listSize-maxsize+1).clear(); list.remove(0); } } /** * 设置新的消息队列尺寸 * @param size 新尺寸 */ public void messageQueueSize(int size) { messageQueueSize.set(size); } /** * 从数据库中取出一条新待发短消息 * @param message 新待发短消息 */ public void newMessage(TextMessage message) { synchronized(newMessageLogMutex){ checkListSize(newMessageLog,maximumNewMessageLogSize); newMessageLog.add(new TextMessageLog(message)); } } /** * 过滤掉一条短消息 * @param message 被过滤掉的短消息 */ public void blockMessage(TextMessage message) { synchronized(blockMessageLogMutex){ checkListSize(blockMessageLog,maximumBlockMessageLogSize); blockMessageLog.add(new TextMessageLog(message)); } } /** * 设置累计发送失败的短信息条数 * @param number 累计发送失败的短信息条数 */ public void numberOfFailedMessages(int number) { numberOfFailedMessages.set(number); } /** * 设置累计发送成功的短信息条数 * @param number 累计发送成功的短信息条数 */ public void numberOfSuccessfulMessages(int number) { numberOfSuccessfulMessages.set(number); } /** * 重新读取黑名单 * @param list 黑名单 */ public void reloadBlackList(BlackList list) { synchronized(blackListLogMutex){ checkListSize(blackListLog,maximumBlackListLogSize); blackListLog.add(new BlackListLog(list)); } } /** * 设置新的状态更新等待队列尺寸 * @param size 新尺寸 */ public void statusQueueSize(int size) { statusQueueSize.set(size); } /** * 将一条短消息的状态更新回写回数据库 * @param message 状态更新的短消息 */ public void updateMessageStatus(TextMessage message) { synchronized(updateMessageLogMutex){ checkListSize(updateMessageLog,maximumUpdateMessageLogSize); updateMessageLog.add(new TextMessageLog(message)); } } /** * 有一条状态更新的短信被添加到待更新请求队列 * @param message 状态更新的短信 */ public void newMessageStatus(TextMessage message) { synchronized(newMessageStatusLogMutex){ checkListSize(newMessageStatusLog,maximumNewMessageStatusLogSize); newMessageStatusLog.add(new TextMessageLog(message)); } } public void incomingMessageQueueSize(int size) { incomingMessageQueueSize.set(size); } public void newIncomingMessage(TextMessage message) { synchronized(newIncomingMessageLogMutex){ checkListSize(newIncomingMessageLog,maximumNewIncomingMessageLogSize); newIncomingMessageLog.add(new TextMessageLog(message)); } } public int getIncomingMessageQueueSize() { return incomingMessageQueueSize.get(); } public List<TextMessageLog> getNewIncomingMessageLog() { List<TextMessageLog> log=null; synchronized(newIncomingMessageLogMutex){ if(newIncomingMessageLog.isEmpty()) return null; log=newIncomingMessageLog; newIncomingMessageLog=new LinkedList<TextMessageLog>(); } return log; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -