⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 messagelauncherdaemon.java.svn-base

📁 监控整个SP系统的SMS信息通道的发送情况并且生成日志。 支持权限判断。
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
package com.infobank.superchannel.daemon.messagelauncher;import java.io.OutputStream;import java.io.PrintStream;import java.lang.reflect.Method;import java.rmi.RemoteException;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.Callable;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import java.util.UUID;import com.infobank.superchannel.admin.controller.MessageLauncherController;import com.infobank.superchannel.admin.controller.implementation.MessageLauncherControllerImpl;import com.infobank.superchannel.admin.monitor.ChannelStatusMonitor;import com.infobank.superchannel.daemon.GenericDaemon;import com.infobank.superchannel.daemon.databasecache.DatabaseCache;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemon;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemonState;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemonStoppedState;import com.infobank.superchannel.daemon.messagelauncher.channel.ChannelManager;import com.infobank.superchannel.daemon.messagelauncher.channel.ChannelPlugin;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.observer.MessageLauncherStatusObserver;import com.infobank.superchannel.observer.implementation.FilePersistenceMessageLauncherStatusObserver;import com.infobank.superchannel.observer.implementation.MemoryPersistenceMessageLauncherStatusObserver;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.ControllerUserToken;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DaemonTaskExecutor;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: 消息发送服务器 */public class MessageLauncherDaemon extends MessageLauncherDaemonState implements GenericDaemon{  // 批量短信息队列  private MessageQueue massMessageQueue=new MassMessageQueue();  // 批量短信息队列并发锁  private Object massMessageQueueMutex=new Object();  // 普通短信息队列  private MessageQueue generalMessageQueue=new GeneralMessageQueue();  // 普通短信息队列锁  private Object generalMessageQueueMutex=new Object();    // 批量短信息判定阈值  private AtomicInteger massMessageThreshold=new AtomicInteger(0);    // 消息发送服务器的控制器  private static MessageLauncherController controller=null;    // 消息发送服务器的状态观察者  private AtomicReference<MessageLauncherStatusObserver> observer=new AtomicReference<MessageLauncherStatusObserver>();  // 当前活动状态机  private static AtomicReference<MessageLauncherDaemonState> state=new AtomicReference<MessageLauncherDaemonState>(MessageLauncherDaemonStoppedState.getInstance());    // 单件实例  private static AtomicReference<MessageLauncherDaemon> instance=new AtomicReference<MessageLauncherDaemon>(null);    // 配置信息  private static AtomicReference<DaemonConfiguration> config=new AtomicReference<DaemonConfiguration>(null);  // 任务执行线程池  private static AtomicReference<DaemonTaskExecutor> executor=new AtomicReference<DaemonTaskExecutor>(null);  // 数据库缓冲服务器的远程连接  private AtomicReference<DatabaseCache> databaseCache=new AtomicReference<DatabaseCache>(null);  // 状态更新队列  private Queue<TextMessage> statusUpdateQueue=new LinkedList<TextMessage>();    // 状态更新队列并发锁  private Object statusUpdateQueueMutex=new Object();    // Incoming message 队列  private Queue<TextMessage> incomingMessageQueue=new LinkedList<TextMessage>();    // Incoming message 队列并发锁  private Object incomingMessageQueueMutex=new Object();    // Message tag  private static AtomicReference<String> tag=new AtomicReference<String>(null);    // internal user token    private static AtomicReference<ControllerUserToken> internalUserToken=new AtomicReference<ControllerUserToken>(null);  /**   * 设定当前活动状态机   * @param state 新状态机   */  protected void setState(MessageLauncherDaemonState state){    MessageLauncherDaemon.state.set(state);  }    /**   * 新增Incoming短信息save请求到请求队列(仅供状态机使用,不对外开放)   * @param message incoming 消息   */  protected boolean stateNewIncomingMessage(TextMessage message){    synchronized(incomingMessageQueueMutex){      incomingMessageQueue.add(message);    }    return true;  }  /**   * 新增短信息状态更新请求到请求队列(仅供状态机使用,不对外开放)   * @param message 状态更新的消息   */  protected void stateAddStatusUpdate(TextMessage message){    synchronized(statusUpdateQueueMutex){      statusUpdateQueue.add(message);    }  }    /**   * 将状态更新请求队列内所有待更新项目发送给数据库缓冲服务器(仅供状态机使用,不对外开放)   * @return 操作是否成功   */  protected boolean stateUpdateMessageStatus(){    Queue<TextMessage> status=null;    synchronized(statusUpdateQueueMutex){      // 取出状态待更新信息队列中所有的状态更新请求      if(statusUpdateQueue.isEmpty())        return true;      status=statusUpdateQueue;      statusUpdateQueue=new LinkedList<TextMessage>();    }        if(status==null||status.size()==0)      return true;    try{      // 将状态更新请求发送回数据库缓冲池      databaseCache.get().updateMessageStatus(status);    }    catch(RemoteException ex){      ex.printStackTrace();            observer.get().fatal(ex);      return false;    }        return true;  }    /**   * 设置数据库缓冲服务器远程连接(仅供状态机使用,不对外开放)   * @param cache 数据库缓冲服务器远程连接   */  protected void stateSetDatabaseCache(DatabaseCache cache){    databaseCache.set(cache);  }    /**   * 设置数据库缓冲服务器远程连接(仅供状态机使用,不对外开放)   * @param cache 数据库缓冲服务器远程连接   */  protected void stateAddTask(Runnable task){    executor.get().addTask(task);  }  /**   * 从数据库缓冲服务器获取新的待发短信息(仅供状态机使用,不对外开放)   * @param cache 数据库缓冲服务器远程连接   * @return 新的待发短信息   */  protected Queue<TextMessage> stateFetchNewMessage(){    try{      return databaseCache.get().getNewMessage(tag.get());    }catch(RemoteException ex){      ex.printStackTrace();            observer.get().fatal(ex);    }    return null;  }  /**   * 将上行短信发送给数据库缓冲服务器   */  protected void stateSaveIncomingMessage(){    Queue<TextMessage> incomingMessages=null;    synchronized(incomingMessageQueueMutex){      incomingMessages=incomingMessageQueue;      incomingMessageQueue=new LinkedList<TextMessage>();    }        if(incomingMessages==null||incomingMessages.size()==0)      return;    try{      databaseCache.get().newIncomingMessage(incomingMessages);    }catch(RemoteException ex){      ex.printStackTrace();            observer.get().fatal(ex);    }  }  /**   * 增加新消息到消息队列(仅供状态机使用,不对外开放)   * @param message 新消息   */  protected void stateAddMessage(TextMessage message){    if(message==null)      return;    synchronized(generalMessageQueueMutex){      generalMessageQueue.addMessage(message);    }    Queue<TextMessage> bucket=new LinkedList<TextMessage>();    bucket.add(message);    observer.get().newGeneralMessageBucket(bucket);  }    /**   * 从批量消息队列取出一条消息待发(仅供状态机使用,不对外开放)   * @return 新消息   */  protected TextMessage stateGetMassMessage(){    synchronized(massMessageQueueMutex){      TextMessage result=massMessageQueue.getMessage();      observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize());      return result;    }  }    /**   * 从普通消息队列取出一条消息待发(仅供状态机使用,不对外开放)   * @return 新消息   */  protected TextMessage stateGetGeneralMessage(){    synchronized(generalMessageQueueMutex){      TextMessage result=generalMessageQueue.getMessage();      observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize());      return result;    }  }  /**   * 增加一批内容相同的新消息到消息队列(仅供状态机使用,不对外开放)   * @param message 一批内容相同的新消息   */  protected void stateAddMessage(Queue<TextMessage> messages){    if(messages==null)      return;        // 如果此批消息的数目大于预先设定的阈值,将其判定为批量消息,否则判定为普通消息    if(messages.size()>=massMessageThreshold.get()){      observer.get().newMassMessageBucket(messages);      synchronized(massMessageQueueMutex){        massMessageQueue.addMessage(messages);        observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize());      }    }else{      observer.get().newGeneralMessageBucket(messages);      synchronized(generalMessageQueueMutex){        generalMessageQueue.addMessage(messages);        observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize());      }    }  }    /**   * 从批量消息队列取出一定数量的消息待发(仅供状态机使用,不对外开放)   * @return 一批新消息   */  protected Queue<TextMessage> stateGetMassMessage(int number){    synchronized(massMessageQueueMutex){      Queue<TextMessage> result=massMessageQueue.getMessage(number);      observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize());      return result;    }  }  /**   * 从普通消息队列取出一定数量的消息待发(仅供状态机使用,不对外开放)   * @return 一批新消息   */  protected Queue<TextMessage> stateGetGeneralMessage(int number){    synchronized(generalMessageQueueMutex){      Queue<TextMessage> result=generalMessageQueue.getMessage(number);      observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize());      return result;    }  }  /**   * 信息发送服务器入口点   * @param args 参数   */  public static void main(String[] args) {      boolean noConsole=false;      for(int idx=0;idx<args.length;++idx){        if(args[idx].equalsIgnoreCase("-noconsole")){          noConsole=true;          continue;        }        if(args[idx].equalsIgnoreCase("-tag")){          if(++idx==args.length){            throw new RuntimeException("invalid tag parameter");          }else{            tag.set(args[idx]);          }          continue;        }        if(args[idx].equalsIgnoreCase("-token")){          if(++idx==args.length){            throw new RuntimeException("invalid token parameter");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -