📄 databasecachedaemon.java.svn-base
字号:
package com.infobank.superchannel.daemon.databasecache;import java.io.PrintStream;import java.io.OutputStream;import java.util.Collection;import java.util.Hashtable;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import java.util.UUID;import com.infobank.superchannel.admin.controller.DatabaseCacheController;import com.infobank.superchannel.admin.controller.implementation.DatabaseCacheControllerImpl;import com.infobank.superchannel.daemon.GenericDaemon;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.observer.DatabaseCacheStatusObserver;import com.infobank.superchannel.pojo.BlackList;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.ControllerUserToken;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DaemonTaskExecutor;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: 数据库缓冲服务器守护进程实现 */public class DatabaseCacheDaemon extends DatabaseCacheDaemonState implements GenericDaemon, DatabaseCache{ // 当前活动状态机 private static AtomicReference<DatabaseCacheDaemonState> state=new AtomicReference<DatabaseCacheDaemonState>(DatabaseCacheDaemonStoppedState.getInstance()); // 服务器守护进程实例 private static AtomicReference<DatabaseCacheDaemon> instance=new AtomicReference<DatabaseCacheDaemon>(null); // 服务器配置 private static AtomicReference<DaemonConfiguration> config=new AtomicReference<DaemonConfiguration>(null); // 任务执行 private static AtomicReference<DaemonTaskExecutor> executor=new AtomicReference<DaemonTaskExecutor>(null); // 发送失败短信最多重试次数 private static AtomicInteger maxRetry=new AtomicInteger(0); // 累计发送失败的短信息数目 private static AtomicInteger numberOfFailedMessages=new AtomicInteger(0); // 累计发送成功的短信息数目 private static AtomicInteger numberOfSuccessfulMessages=new AtomicInteger(0); // 数据库缓冲服务器控制器 private static DatabaseCacheController controller=null; // 数据库缓冲服务器状态观察者 private static DatabaseCacheStatusObserver observer=null; // 消息队列锁 private Object messageQueueMutex=new Object(); // 消息队列 private Hashtable<String,Queue<TextMessage> > messageQueue=new Hashtable<String,Queue<TextMessage>>(); // 状态更新队列锁 private Object statusQueueMutex=new Object(); // 状态更新队列 private Hashtable<Integer,TextMessage> statusQueue=new Hashtable<Integer,TextMessage>(); // 上行短信息队列锁 private Object incomingMessageQueueMutex=new Object(); // 上行短信息队列 private Queue<TextMessage> incomingMessageQueue=new LinkedList<TextMessage>(); // 黑名单 private Hashtable<String,BlackList> blackListMap=new Hashtable<String,BlackList>(); // message queue capacity private static AtomicInteger messageQueueCapacity=new AtomicInteger(0); // internal user token private static AtomicReference<ControllerUserToken> internalUserToken=new AtomicReference<ControllerUserToken>(null); protected void stateSetBlackList(BlackList blacklist){ blackListMap.put(blacklist.tag, blacklist); observer.reloadBlackList(blacklist); } /** * 添加任务(仅供状态机使用,不对外开放) * @param task 任务对象 */ protected void stateAddTask(Runnable task){ // 将任务放入线程池 executor.get().addTask(task); } /** * 添加新待发信息到信息队列(仅供状态机使用,不对外开放) * @param message 待发信息 */ private void stateAddNewMessage(TextMessage message){ // 获取当前活动的黑名单 BlackList black=blackListMap.get(message.getTag()); if(black!=null&&black.blacklist.containsKey(message.getPhone())){ /** * 如果黑名单不为空并且当前发送的消息接收者的号码在黑名单中,则将其过滤 * 并通知观察者 */ message.setStatus(TextMessage.STATUS.BLOCKED); observer.blockMessage(message); // 将状态变更回写回数据库中 synchronized(statusQueueMutex){ statusQueue.put(message.getMessageID(),message); observer.statusQueueSize(statusQueue.size()); } } else{ // 否则将信息添加到信息队列,通知观察者 observer.newMessage(message); synchronized(messageQueueMutex){ String tag=message.getTag(); Queue<TextMessage> queue=messageQueue.get(tag); if(queue==null){ queue=new LinkedList<TextMessage>(); messageQueue.put(tag,queue); } queue.add(message); observer.messageQueueSize(messageQueue.size()); } } } /** * 添加新待发信息到信息队列(仅供状态机使用,不对外开放) * @param messages 待发信息 * @param tag message tag */ protected void stateAddNewMessage(Queue<TextMessage> messages, String tag){ Queue<TextMessage> queue=null; // prepare queue for tag if it is not ready synchronized(messageQueueMutex){ queue=messageQueue.get(tag); if(queue==null){ queue=new LinkedList<TextMessage>(); messageQueue.put(tag,queue); } } // 遍历所有的待发短消息 for(TextMessage message:messages){ BlackList black=blackListMap.get(message.getTag()); // 如果黑名单不为空并且短消息接收者在黑名单中则将其过滤,否则将其放入消息队列 if(black!=null&&black.blacklist.containsKey(message.getPhone())){ message.setStatus(TextMessage.STATUS.BLOCKED); observer.blockMessage(message); // 将状态更新回写回数据库中 synchronized(statusQueueMutex){ statusQueue.put(message.getMessageID(),message); observer.statusQueueSize(statusQueue.size()); } } else{ observer.newMessage(message); synchronized(messageQueueMutex){ queue.add(message); observer.messageQueueSize(messageQueue.size()); } } } } /** * 从信息队列获取当前所有待发信息(仅供状态机使用,不对外开放) * @param tag message tag * @return 当前所有待发信息 */ protected Queue<TextMessage> stateGetNewMessage(String tag){ // 取出当前的消息队列返回给调用者,并新建一个新的空队列 Queue<TextMessage> result=null; synchronized(messageQueueMutex){ if(messageQueue.isEmpty()){ return null; } result=messageQueue.get(tag); messageQueue.put(tag,new LinkedList<TextMessage>()); } observer.messageQueueSize(0); return result; } /** * 从信息状态更新队列获取当前所有状态待更新信息(仅供状态机使用,不对外开放) * @param return 当前所有状态待更新信息 */ protected Queue<TextMessage> stateGetPendingStatusUpdate(){ Queue<TextMessage> pendingStatus=null; synchronized(statusQueueMutex){ // 如果状态队列为空则直接返回 if(statusQueue.isEmpty()){ return null; } // 将待更新状态取出,通知状态观察者新的更新队列尺寸 pendingStatus=new LinkedList<TextMessage>(); pendingStatus.addAll(statusQueue.values()); statusQueue.clear(); observer.statusQueueSize(0); } // 通知状态观察者消息状态正在被回写 for(TextMessage message: pendingStatus){ observer.updateMessageStatus(message); } return pendingStatus; } /** * 将状态待更新信息增加到状态待更新队列(仅供状态机使用,不对外开放) * @param status 状态待更新信息 */ protected void stateUpdateMessageStatus(Queue<TextMessage> status){ // 处理从消息发送服务器返回的状态更新请求 TextMessage message=status.poll(); while(message!=null){ // 通知状态观察者 observer.newMessageStatus(message); // 取出消息的id,如果消息状态为失败,则检查已重试次数,如果没有超出上限则直接将其重新插入待发消息队列,重新发送 // 如果发送状态为成功,则将其添加到待更新状态队列,并通知状态观察者 if(message.isStatus(TextMessage.STATUS.DELIVERED)){ if(message.isResult(TextMessage.RESULT.FAILED)){ int retry=message.getRetry(); if(retry<maxRetry.intValue()){ // retry. don't add number of failed messages message.setRetry(retry+1); stateAddNewMessage(message); message=status.poll(); continue; }else{ // failed finally, increase number of failed message observer.numberOfFailedMessages(numberOfFailedMessages.incrementAndGet()); } }else{ if(message.isResult(TextMessage.RESULT.SUCCESSFUL)){ // increase number of successful message observer.numberOfSuccessfulMessages(numberOfSuccessfulMessages.incrementAndGet()); } } } synchronized(statusQueueMutex){ // don't need to check if the sending status is in status queue or not. // because a previous status can only be sending statusQueue.put(message.getMessageID(), message); observer.statusQueueSize(statusQueue.size()); } message=status.poll(); } } /** * 将新上行短信添加到队列(仅供状态机使用,不对外开发) * @param messages 新上行短信 * @return 操作是否成功 */ public boolean stateNewIncomingMessage(Queue<TextMessage> messages) { synchronized(incomingMessageQueueMutex){ incomingMessageQueue.addAll(messages); observer.incomingMessageQueueSize(incomingMessageQueue.size()); } for(TextMessage message:messages){ observer.newIncomingMessage(message); } return true; } /** * 从上行短信息队列取出新信息 * @return 取出的新上行短信 */ public Queue<TextMessage> stateGetNewIncomingMessage() { Queue<TextMessage> result=null; synchronized(incomingMessageQueueMutex){ // 如果上行短信队列为空则直接返回 if(incomingMessageQueue.isEmpty()) return null; // 将上行短信取出 result=incomingMessageQueue; incomingMessageQueue=new LinkedList<TextMessage>(); } observer.incomingMessageQueueSize(0); return result; } /**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -