📄 messagelauncherdaemon.java.svn-base
字号:
} else{ String token=args[idx]; UUID sid=UUID.fromString(token); if(sid==null) throw new RuntimeException("invalid token format"); ControllerUserToken aToken=new ControllerUserToken(sid); internalUserToken.set(aToken); } } } if(tag.get()==null) throw new RuntimeException("no tag is provided"); if(internalUserToken.get()==null) throw new RuntimeException("no user token is provided"); if(noConsole){ // redirect stderr and stdout to null PrintStream nullStream=new PrintStream(new OutputStream(){ public void close(){ } public void flush(){ } public void write(byte[] b){ } public void write(byte[] b, int off, int len){ } public void write(int b){ } }); System.setErr(nullStream); System.setOut(nullStream); } try{ MessageLauncherDaemon daemon=new MessageLauncherDaemon(); instance.set(daemon); if(!MessageLauncherDaemonState.init(daemon)){ throw new RuntimeException("Could not initialize daemon state"); } if(!DaemonConfiguration.init()){ throw new RuntimeException("Could not initialize daemon configuration"); } config.set(DaemonConfiguration.getInstance()); daemon.observer.set(new FilePersistenceMessageLauncherStatusObserver(daemon)); StringBuilder sb=new StringBuilder(); sb.append("/daemon[@type='MessageLauncher' and @tag='"); sb.append(tag.get()); sb.append("']/message/queue[@type='Mass']/@threshold"); daemon.massMessageThreshold.set(config.get().getInt(sb.toString())); if(!MassMessageQueue.init(daemon)){ throw new RuntimeException("Could not initialize mass message queue"); } if(!GeneralMessageQueue.init(daemon)){ throw new RuntimeException("Could not initialize mass message queue"); } if(!DaemonTaskExecutor.init(daemon)){ throw new RuntimeException("Could not initialize task executor"); } if(!DatabaseConnectionPool.init(daemon)){ throw new RuntimeException("Could not initialize database connection pool"); } executor.set(DaemonTaskExecutor.getInstance()); daemon.setState(MessageLauncherDaemonStoppedState.getInstance()); daemon.start(); // have to initialize channel after the daemon has been initialized // work around here. we need to find a better solution later if(!ChannelManager.init(daemon)){ throw new RuntimeException("Could not initialize channel plugin manager"); } controller=new MessageLauncherControllerImpl(daemon); } catch(Exception e){ System.err.println("Client Exception: "+e.toString()); e.printStackTrace(); } } /** * 暂停守护进程 */ public boolean pause() { return state.get().pause(); } /** * 重新启动守护进程 */ public boolean restart() { return state.get().restart(); } /** * 继续运行暂停中的守护进程 */ public boolean resume() { return state.get().resume(); } /** * 启动守护进程 */ public boolean start() { return state.get().start(); } /** * 停止守护进程 */ public boolean stop() { return state.get().stop(); } /** * 从数据库缓冲服务器获取新的待发短信息 * @return 新的待发短信息 */ public Queue<TextMessage> fetchNewMessage(){ return state.get().fetchNewMessage(); } /** * 新增短消息到消息队列 * @param messages 新的待发短信息 */ public void addMessage(Queue<TextMessage> messages){ state.get().addMessage(messages); } /** * 新增短消息状态变更请求到等待队列 * @param message 状态更新的短消息 */ public void addStatusUpdate(TextMessage message){ state.get().addStatusUpdate(message); } /** * 将状态更新请求队列内所有待更新项目发送给数据库缓冲服务器 * @return 操作是否成功 */ public boolean updateMessageStatus(){ return state.get().updateMessageStatus(); } /** * 取得当前活动的信息发送服务器状态观察者 * @return 当前活动的信息发送服务器状态观察者 */ public MessageLauncherStatusObserver getObserver(){ return observer.get(); } /** * 从批量消息队列取出一条消息待发 * @return 新消息 */ @Override public TextMessage getMassMessage(){ return state.get().getMassMessage(); } /** * 从普通消息队列取出一条消息待发 * @return 新消息 */ @Override public TextMessage getGeneralMessage(){ return state.get().getGeneralMessage(); } /** * 从普通消息队列取出一定数量的消息待发 * @return 一批新消息 */ @Override public Queue<TextMessage> getGeneralMessage(int number) { return state.get().getGeneralMessage(number); } /** * 从批量消息队列取出一定数量的消息待发 * @return 一批新消息 */ @Override public Queue<TextMessage> getMassMessage(int number) { return state.get().getMassMessage(number); } /** * 当前信息发送服务器是否出于暂停态 * @return 当前信息发送服务器是否出于暂停态 */ @Override public boolean isPaused() { return state.get().isPaused(); } /** * 当前信息发送服务器是否可运行 * @return 当前信息发送服务器是否可运行 */ @Override public boolean isRunnable() { return state.get().isRunnable(); } /** * 重新加载守护进程配置 */ public boolean reload() { return state.get().reload(); } /** * 设置信息发送服务器的当前活动状态监控器 * @param observer 信息发送服务器的状态监控器 */ public void attachObserver(MessageLauncherStatusObserver observer){ this.observer.set(observer); synchronized(generalMessageQueueMutex){ observer.generalMessageQueueSize(generalMessageQueue.getMessageQueueSize()); } synchronized(massMessageQueueMutex){ observer.massMessageQueueSize(massMessageQueue.getMessageQueueSize()); } } /** * 撤销信息发送服务器的当前活动状态监控器 * @param observer 信息发送服务器的状态监控器 */ public void detachObserver(MessageLauncherStatusObserver observer){ this.observer.compareAndSet(observer, new FilePersistenceMessageLauncherStatusObserver(instance.get())); } /** * 获取守护进程当前状态 */ public String getState(){ return state.get().getState(); } /** * 强制将状态待更新消息回写回数据库,用于在致命错误发送时将内存中数据同步回数据库中(仅供状态机使用,不对外开放) */ public void stateForceUpdateStatusQueue(){ try{ DAO dao=DAO.getInstance(); synchronized(statusUpdateQueue){ dao.updateMessageStatus(statusUpdateQueue); statusUpdateQueue.clear(); } } catch(Exception ex){ ex.printStackTrace(); } } @Override public boolean newIncomingMessage(TextMessage message) { return state.get().newIncomingMessage(message); } public void saveIncomingMessage(){ state.get().saveIncomingMessage(); } public boolean isMessageQueueFull(){ boolean result=false; synchronized(massMessageQueueMutex){ result=massMessageQueue.isFull(); } if(result) return true; synchronized(generalMessageQueueMutex){ result=generalMessageQueue.isFull(); } return result; } public String getTag(){ return tag.get(); } public ControllerUserToken getInternalUserToken(){ return internalUserToken.get(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -