⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 databasecachedaemon.java

📁 监控整个SP系统的SMS信息通道的发送情况并且生成日志。 支持权限判断。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package com.infobank.superchannel.daemon.databasecache;import java.io.PrintStream;import java.io.OutputStream;import java.util.Collection;import java.util.Hashtable;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import java.util.UUID;import com.infobank.superchannel.admin.controller.DatabaseCacheController;import com.infobank.superchannel.admin.controller.implementation.DatabaseCacheControllerImpl;import com.infobank.superchannel.daemon.GenericDaemon;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.observer.DatabaseCacheStatusObserver;import com.infobank.superchannel.pojo.BlackList;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.ControllerUserToken;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DaemonTaskExecutor;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: 数据库缓冲服务器守护进程实现 */public class DatabaseCacheDaemon extends DatabaseCacheDaemonState implements GenericDaemon, DatabaseCache{  // 当前活动状态机  private static AtomicReference<DatabaseCacheDaemonState> state=new AtomicReference<DatabaseCacheDaemonState>(DatabaseCacheDaemonStoppedState.getInstance());  // 服务器守护进程实例  private static AtomicReference<DatabaseCacheDaemon> instance=new AtomicReference<DatabaseCacheDaemon>(null);  // 服务器配置  private static AtomicReference<DaemonConfiguration> config=new AtomicReference<DaemonConfiguration>(null);  // 任务执行  private static AtomicReference<DaemonTaskExecutor> executor=new AtomicReference<DaemonTaskExecutor>(null);  // 发送失败短信最多重试次数  private static AtomicInteger maxRetry=new AtomicInteger(0);  // 累计发送失败的短信息数目  private static AtomicInteger numberOfFailedMessages=new AtomicInteger(0);  // 累计发送成功的短信息数目  private static AtomicInteger numberOfSuccessfulMessages=new AtomicInteger(0);  // 数据库缓冲服务器控制器  private static DatabaseCacheController controller=null;  // 数据库缓冲服务器状态观察者  private static DatabaseCacheStatusObserver observer=null;  // 消息队列锁  private Object messageQueueMutex=new Object();  // 消息队列  private Hashtable<String,Queue<TextMessage> > messageQueue=new Hashtable<String,Queue<TextMessage>>();  // 状态更新队列锁  private Object statusQueueMutex=new Object();  // 状态更新队列  private Hashtable<Integer,TextMessage> statusQueue=new Hashtable<Integer,TextMessage>();  // 上行短信息队列锁  private Object incomingMessageQueueMutex=new Object();  // 上行短信息队列  private Queue<TextMessage> incomingMessageQueue=new LinkedList<TextMessage>();  // 黑名单    private Hashtable<String,BlackList> blackListMap=new Hashtable<String,BlackList>();    // message queue capacity  private static AtomicInteger messageQueueCapacity=new AtomicInteger(0);    // internal user token    private static AtomicReference<ControllerUserToken> internalUserToken=new AtomicReference<ControllerUserToken>(null);  protected void stateSetBlackList(BlackList blacklist){    blackListMap.put(blacklist.tag, blacklist);    observer.reloadBlackList(blacklist);  }  /**   * 添加任务(仅供状态机使用,不对外开放)   * @param task 任务对象   */  protected void stateAddTask(Runnable task){    // 将任务放入线程池    executor.get().addTask(task);  }  /**   * 添加新待发信息到信息队列(仅供状态机使用,不对外开放)   * @param message 待发信息   */  private void stateAddNewMessage(TextMessage message){    // 获取当前活动的黑名单    BlackList black=blackListMap.get(message.getTag());    if(black!=null&&black.blacklist.containsKey(message.getPhone())){      /**       * 如果黑名单不为空并且当前发送的消息接收者的号码在黑名单中,则将其过滤       * 并通知观察者       */      message.setStatus(TextMessage.STATUS.BLOCKED);      observer.blockMessage(message);      // 将状态变更回写回数据库中      synchronized(statusQueueMutex){        statusQueue.put(message.getMessageID(),message);        observer.statusQueueSize(statusQueue.size());      }    }    else{      // 否则将信息添加到信息队列,通知观察者       observer.newMessage(message);      synchronized(messageQueueMutex){        String tag=message.getTag();        Queue<TextMessage> queue=messageQueue.get(tag);        if(queue==null){          queue=new LinkedList<TextMessage>();          messageQueue.put(tag,queue);        }        queue.add(message);        observer.messageQueueSize(messageQueue.size());      }    }  }  /**   * 添加新待发信息到信息队列(仅供状态机使用,不对外开放)   * @param messages 待发信息   * @param tag message tag   */  protected void stateAddNewMessage(Queue<TextMessage> messages, String tag){    Queue<TextMessage> queue=null;    // prepare queue for tag if it is not ready    synchronized(messageQueueMutex){      queue=messageQueue.get(tag);      if(queue==null){        queue=new LinkedList<TextMessage>();        messageQueue.put(tag,queue);      }    }        // 遍历所有的待发短消息    for(TextMessage message:messages){      BlackList black=blackListMap.get(message.getTag());      // 如果黑名单不为空并且短消息接收者在黑名单中则将其过滤,否则将其放入消息队列      if(black!=null&&black.blacklist.containsKey(message.getPhone())){        message.setStatus(TextMessage.STATUS.BLOCKED);        observer.blockMessage(message);        // 将状态更新回写回数据库中        synchronized(statusQueueMutex){          statusQueue.put(message.getMessageID(),message);          observer.statusQueueSize(statusQueue.size());        }      }      else{        observer.newMessage(message);        synchronized(messageQueueMutex){          queue.add(message);          observer.messageQueueSize(messageQueue.size());        }      }    }  }  /**   * 从信息队列获取当前所有待发信息(仅供状态机使用,不对外开放)   * @param tag message tag   * @return 当前所有待发信息   */  protected Queue<TextMessage> stateGetNewMessage(String tag){    // 取出当前的消息队列返回给调用者,并新建一个新的空队列    Queue<TextMessage> result=null;    synchronized(messageQueueMutex){      if(messageQueue.isEmpty()){        return null;      }            result=messageQueue.get(tag);      messageQueue.put(tag,new LinkedList<TextMessage>());    }    observer.messageQueueSize(0);    return result;  }  /**   * 从信息状态更新队列获取当前所有状态待更新信息(仅供状态机使用,不对外开放)   * @param return 当前所有状态待更新信息   */  protected Queue<TextMessage> stateGetPendingStatusUpdate(){    Queue<TextMessage> pendingStatus=null;    synchronized(statusQueueMutex){      // 如果状态队列为空则直接返回      if(statusQueue.isEmpty()){        return null;      }      // 将待更新状态取出,通知状态观察者新的更新队列尺寸      pendingStatus=new LinkedList<TextMessage>();      pendingStatus.addAll(statusQueue.values());      statusQueue.clear();      observer.statusQueueSize(0);    }    // 通知状态观察者消息状态正在被回写    for(TextMessage message: pendingStatus){      observer.updateMessageStatus(message);    }    return pendingStatus;  }  /**   * 将状态待更新信息增加到状态待更新队列(仅供状态机使用,不对外开放)   * @param status 状态待更新信息   */  protected void stateUpdateMessageStatus(Queue<TextMessage> status){    // 处理从消息发送服务器返回的状态更新请求    TextMessage message=status.poll();    while(message!=null){      // 通知状态观察者      observer.newMessageStatus(message);      // 取出消息的id,如果消息状态为失败,则检查已重试次数,如果没有超出上限则直接将其重新插入待发消息队列,重新发送      // 如果发送状态为成功,则将其添加到待更新状态队列,并通知状态观察者      if(message.isStatus(TextMessage.STATUS.DELIVERED)){        if(message.isResult(TextMessage.RESULT.FAILED)){            int retry=message.getRetry();            if(retry<maxRetry.intValue()){              // retry. don't add number of failed messages              message.setRetry(retry+1);              stateAddNewMessage(message);              message=status.poll();              continue;            }else{              // failed finally, increase number of failed message              observer.numberOfFailedMessages(numberOfFailedMessages.incrementAndGet());            }        }else{          if(message.isResult(TextMessage.RESULT.SUCCESSFUL)){            // increase number of successful message            observer.numberOfSuccessfulMessages(numberOfSuccessfulMessages.incrementAndGet());          }        }      }      synchronized(statusQueueMutex){        // don't need to check if the sending status is in status queue or not.        // because a previous status can only be sending        statusQueue.put(message.getMessageID(), message);        observer.statusQueueSize(statusQueue.size());      }      message=status.poll();    }  }  /**   * 将新上行短信添加到队列(仅供状态机使用,不对外开发)   * @param messages 新上行短信   * @return 操作是否成功   */  public boolean stateNewIncomingMessage(Queue<TextMessage> messages) {    synchronized(incomingMessageQueueMutex){      incomingMessageQueue.addAll(messages);      observer.incomingMessageQueueSize(incomingMessageQueue.size());    }    for(TextMessage message:messages){      observer.newIncomingMessage(message);    }    return true;  }  /**   * 从上行短信息队列取出新信息   * @return 取出的新上行短信   */  public Queue<TextMessage> stateGetNewIncomingMessage() {    Queue<TextMessage> result=null;    synchronized(incomingMessageQueueMutex){      // 如果上行短信队列为空则直接返回      if(incomingMessageQueue.isEmpty())        return null;      // 将上行短信取出      result=incomingMessageQueue;      incomingMessageQueue=new LinkedList<TextMessage>();    }    observer.incomingMessageQueueSize(0);    return result;  }  /**

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -