📄 messagelauncherdaemon.java.svn-base
字号:
package com.infobank.superchannel.daemon.messagelauncher;import java.io.OutputStream;import java.io.PrintStream;import java.lang.reflect.Method;import java.rmi.RemoteException;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.Callable;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import java.util.UUID;import com.infobank.superchannel.admin.controller.MessageLauncherController;import com.infobank.superchannel.admin.controller.implementation.MessageLauncherControllerImpl;import com.infobank.superchannel.admin.monitor.ChannelStatusMonitor;import com.infobank.superchannel.daemon.GenericDaemon;import com.infobank.superchannel.daemon.databasecache.DatabaseCache;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemon;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemonState;import com.infobank.superchannel.daemon.databasecache.DatabaseCacheDaemonStoppedState;import com.infobank.superchannel.daemon.messagelauncher.channel.ChannelManager;import com.infobank.superchannel.daemon.messagelauncher.channel.ChannelPlugin;import com.infobank.superchannel.database.DAO;import com.infobank.superchannel.observer.MessageLauncherStatusObserver;import com.infobank.superchannel.observer.implementation.FilePersistenceMessageLauncherStatusObserver;import com.infobank.superchannel.observer.implementation.MemoryPersistenceMessageLauncherStatusObserver;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.ControllerUserToken;import com.infobank.superchannel.util.DaemonConfiguration;import com.infobank.superchannel.util.DaemonTaskExecutor;import com.infobank.superchannel.util.DatabaseConnectionPool;/** * @author Xiaoguang Sun * @version 类说明: 消息发送服务器 */public class MessageLauncherDaemon extends MessageLauncherDaemonState implements GenericDaemon{ // 批量短信息队列 private MessageQueue massMessageQueue=new MassMessageQueue(); // 批量短信息队列并发锁 private Object massMessageQueueMutex=new Object(); // 普通短信息队列 private MessageQueue generalMessageQueue=new GeneralMessageQueue(); // 普通短信息队列锁 private Object generalMessageQueueMutex=new Object(); // 批量短信息判定阈值 private AtomicInteger massMessageThreshold=new AtomicInteger(0); // 消息发送服务器的控制器 private static MessageLauncherController controller=null; // 消息发送服务器的状态观察者 private AtomicReference<MessageLauncherStatusObserver> observer=new AtomicReference<MessageLauncherStatusObserver>(); // 当前活动状态机 private static AtomicReference<MessageLauncherDaemonState> state=new AtomicReference<MessageLauncherDaemonState>(MessageLauncherDaemonStoppedState.getInstance()); // 单件实例 private static AtomicReference<MessageLauncherDaemon> instance=new AtomicReference<MessageLauncherDaemon>(null); // 配置信息 private static AtomicReference<DaemonConfiguration> config=new AtomicReference<DaemonConfiguration>(null); // 任务执行线程池 private static AtomicReference<DaemonTaskExecutor> executor=new AtomicReference<DaemonTaskExecutor>(null); // 数据库缓冲服务器的远程连接 private AtomicReference<DatabaseCache> databaseCache=new AtomicReference<DatabaseCache>(null); // 状态更新队列 private Queue<TextMessage> statusUpdateQueue=new LinkedList<TextMessage>(); // 状态更新队列并发锁 private Object statusUpdateQueueMutex=new Object(); // Incoming message 队列 private Queue<TextMessage> incomingMessageQueue=new LinkedList<TextMessage>(); // Incoming message 队列并发锁 private Object incomingMessageQueueMutex=new Object(); // Message tag private static AtomicReference<String> tag=new AtomicReference<String>(null); // internal user token private static AtomicReference<ControllerUserToken> internalUserToken=new AtomicReference<ControllerUserToken>(null); /** * 设定当前活动状态机 * @param state 新状态机 */ protected void setState(MessageLauncherDaemonState state){ MessageLauncherDaemon.state.set(state); } /** * 新增Incoming短信息save请求到请求队列(仅供状态机使用,不对外开放) * @param message incoming 消息 */ protected boolean stateNewIncomingMessage(TextMessage message){ synchronized(incomingMessageQueueMutex){ incomingMessageQueue.add(message); } return true; } /** * 新增短信息状态更新请求到请求队列(仅供状态机使用,不对外开放) * @param message 状态更新的消息 */ protected void stateAddStatusUpdate(TextMessage message){ synchronized(statusUpdateQueueMutex){ statusUpdateQueue.add(message); } } /** * 将状态更新请求队列内所有待更新项目发送给数据库缓冲服务器(仅供状态机使用,不对外开放) * @return 操作是否成功 */ protected boolean stateUpdateMessageStatus(){ Queue<TextMessage> status=null; synchronized(statusUpdateQueueMutex){ // 取出状态待更新信息队列中所有的状态更新请求 if(statusUpdateQueue.isEmpty()) return true; status=statusUpdateQueue; statusUpdateQueue=new LinkedList<TextMessage>(); } if(status==null||status.size()==0) return true; try{ // 将状态更新请求发送回数据库缓冲池 databaseCache.get().updateMessageStatus(status); } catch(RemoteException ex){ ex.printStackTrace(); observer.get().fatal(ex); return false; } return true; } /** * 设置数据库缓冲服务器远程连接(仅供状态机使用,不对外开放) * @param cache 数据库缓冲服务器远程连接 */ protected void stateSetDatabaseCache(DatabaseCache cache){ databaseCache.set(cache); } /** * 设置数据库缓冲服务器远程连接(仅供状态机使用,不对外开放) * @param cache 数据库缓冲服务器远程连接 */ protected void stateAddTask(Runnable task){ executor.get().addTask(task); } /** * 从数据库缓冲服务器获取新的待发短信息(仅供状态机使用,不对外开放) * @param cache 数据库缓冲服务器远程连接 * @return 新的待发短信息 */ protected Queue<TextMessage> stateFetchNewMessage(){ try{ return databaseCache.get().getNewMessage(tag.get()); }catch(RemoteException ex){ ex.printStackTrace(); observer.get().fatal(ex); } return null; } /** * 将上行短信发送给数据库缓冲服务器 */ protected void stateSaveIncomingMessage(){ Queue<TextMessage> incomingMessages=null; synchronized(incomingMessageQueueMutex){ incomingMessages=incomingMessageQueue; incomingMessageQueue=new LinkedList<TextMessage>(); } if(incomingMessages==null||incomingMessages.size()==0) return; try{ databaseCache.get().newIncomingMessage(incomingMessages); }catch(RemoteException ex){ ex.printStackTrace(); observer.get().fatal(ex); } } /** * 增加新消息到消息队列(仅供状态机使用,不对外开放) * @param message 新消息 */ protected void stateAddMessage(TextMessage message){ if(message==null) return; synchronized(generalMessageQueueMutex){ generalMessageQueue.addMessage(message); } Queue<TextMessage> bucket=new LinkedList<TextMessage>(); bucket.add(message); observer.get().newGeneralMessageBucket(bucket); } /** * 从批量消息队列取出一条消息待发(仅供状态机使用,不对外开放) * @return 新消息 */ protected TextMessage stateGetMassMessage(){ synchronized(massMessageQueueMutex){ TextMessage result=massMessageQueue.getMessage(); observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize()); return result; } } /** * 从普通消息队列取出一条消息待发(仅供状态机使用,不对外开放) * @return 新消息 */ protected TextMessage stateGetGeneralMessage(){ synchronized(generalMessageQueueMutex){ TextMessage result=generalMessageQueue.getMessage(); observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize()); return result; } } /** * 增加一批内容相同的新消息到消息队列(仅供状态机使用,不对外开放) * @param message 一批内容相同的新消息 */ protected void stateAddMessage(Queue<TextMessage> messages){ if(messages==null) return; // 如果此批消息的数目大于预先设定的阈值,将其判定为批量消息,否则判定为普通消息 if(messages.size()>=massMessageThreshold.get()){ observer.get().newMassMessageBucket(messages); synchronized(massMessageQueueMutex){ massMessageQueue.addMessage(messages); observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize()); } }else{ observer.get().newGeneralMessageBucket(messages); synchronized(generalMessageQueueMutex){ generalMessageQueue.addMessage(messages); observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize()); } } } /** * 从批量消息队列取出一定数量的消息待发(仅供状态机使用,不对外开放) * @return 一批新消息 */ protected Queue<TextMessage> stateGetMassMessage(int number){ synchronized(massMessageQueueMutex){ Queue<TextMessage> result=massMessageQueue.getMessage(number); observer.get().massMessageQueueSize(massMessageQueue.getMessageQueueSize()); return result; } } /** * 从普通消息队列取出一定数量的消息待发(仅供状态机使用,不对外开放) * @return 一批新消息 */ protected Queue<TextMessage> stateGetGeneralMessage(int number){ synchronized(generalMessageQueueMutex){ Queue<TextMessage> result=generalMessageQueue.getMessage(number); observer.get().generalMessageQueueSize(generalMessageQueue.getMessageQueueSize()); return result; } } /** * 信息发送服务器入口点 * @param args 参数 */ public static void main(String[] args) { boolean noConsole=false; for(int idx=0;idx<args.length;++idx){ if(args[idx].equalsIgnoreCase("-noconsole")){ noConsole=true; continue; } if(args[idx].equalsIgnoreCase("-tag")){ if(++idx==args.length){ throw new RuntimeException("invalid tag parameter"); }else{ tag.set(args[idx]); } continue; } if(args[idx].equalsIgnoreCase("-token")){ if(++idx==args.length){ throw new RuntimeException("invalid token parameter");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -