📄 databasecachedaemon.java
字号:
* 设置数据库缓冲服务器的当前状态(仅供状态机使用,不对外开放) * @param state 新状态 */ protected void setState(DatabaseCacheDaemonState state){ DatabaseCacheDaemon.state.set(state); } /** * 数据库缓冲服务器的入口函数 * @param args 命令行参数 */ public static void main(String[] args){ boolean noConsole=false; for(int idx=0;idx<args.length;++idx){ if(args[idx].equalsIgnoreCase("-noconsole")){ noConsole=true; continue; } if(args[idx].equalsIgnoreCase("-token")){ if(++idx==args.length){ throw new RuntimeException("invalid token parameter"); } else{ String token=args[idx]; UUID sid=UUID.fromString(token); if(sid==null) throw new RuntimeException("invalid token format"); ControllerUserToken aToken=new ControllerUserToken(sid); internalUserToken.set(aToken); } } } if(internalUserToken.get()==null) throw new RuntimeException("no user token is provided"); if(noConsole){ // redirect stderr and stdout to null PrintStream nullStream=new PrintStream(new OutputStream(){ public void close(){ } public void flush(){ } public void write(byte[] b){ } public void write(byte[] b, int off, int len){ } public void write(int b){ } }); System.setErr(nullStream); System.setOut(nullStream); } // 创建数据库缓冲服务器实例 DatabaseCacheDaemon daemon=new DatabaseCacheDaemon(); instance.set(daemon); // 初始化状态机 if(!DatabaseCacheDaemonState.init(daemon)){ throw new RuntimeException("无法初始化状态机"); } // 初始化服务器配置 if(!DaemonConfiguration.init()){ throw new RuntimeException("无法初始化服务器配置"); } config.set(DaemonConfiguration.getInstance()); // 设置消息发送重试次数 maxRetry.set(config.get().getInt("/daemon[@type='DatabaseCache']/message/maxRetry")); messageQueueCapacity.set(config.get().getInt("/daemon[@type='DatabaseCache']/message/queue/capacity")); // 初始化服务器任务执行器 if(!DaemonTaskExecutor.init(daemon)){ throw new RuntimeException("无法初始化任务执行器"); } executor.set(DaemonTaskExecutor.getInstance()); // 初始化数据库连接池,创建默认数据源 if(!DatabaseConnectionPool.init(daemon)){ throw new RuntimeException("无法初始化数据库连接池"); } LoadBlackListTask.init(daemon); // 设置状态机 daemon.setState(DatabaseCacheDaemonStoppedState.getInstance()); // 创建数据库缓冲服务器控制端 controller=new DatabaseCacheControllerImpl(daemon); // 启动服务器 if(!daemon.start()){ // 启动失败 DaemonTaskExecutor.fini(daemon); DatabaseConnectionPool.fini(daemon); System.exit(0); } } /** * 暂停数据库缓冲服务器 * @return 操作是否成功 */ public boolean pause() { return state.get().pause(); } /** * 重新启动数据库缓冲服务器 * @return 操作是否成功 */ public boolean restart() { return state.get().restart(); } /** * 继续以暂停的数据库缓冲服务器 * @return 操作是否成功 */ public boolean resume() { return state.get().resume(); } /** * 启动数据库缓冲服务器 * @return 操作是否成功 */ public boolean start() { return state.get().start(); } /** * 停止数据库缓冲服务器 * @return 操作是否成功 */ public boolean stop() { return state.get().stop(); } /** * 新增待发信息到待发信息队列 * @param messages 待发消息 * @param tag message tag * @return 操作是否成功 */ public boolean addNewMessage(Queue<TextMessage> messages, String tag){ if(messages==null||messages.isEmpty()) return false; return state.get().addNewMessage(messages, tag); } /** * 取出待发短信息队列中所有的信息 * @param tag message tag * @return 待发短信息队列中所有的信息 */ public Queue<TextMessage> getNewMessage(String tag){ return state.get().getNewMessage(tag); } /** * 将所给信息的新状态更新回数据库 * @param status 信息新状态更新请求 * @return 操作是否成功 */ public boolean updateMessageStatus(Queue<TextMessage> status){ if(status==null||status.size()==0) return false; return state.get().updateMessageStatus(status); } /** * 取出信息状态待更新队列中所有状态待更新的信息 * @return 信息状态待更新队列中所有状态待更新的信息 */ public Queue<TextMessage> getPendingStatusUpdate(){ return state.get().getPendingStatusUpdate(); } /** * 当前数据库缓冲服务器是否可运行 * @return 当前数据库缓冲服务器是否可运行 */ @Override public boolean isRunnable(){ return state.get().isRunnable(); } /** * 当前数据库缓冲服务器是否出于暂停态 * @return 当前数据库缓冲服务器是否出于暂停态 */ @Override public boolean isPaused() { return state.get().isPaused(); } /** * 重新读取数据库缓冲服务器的配置 * @return 操作是否成功 */ public boolean reload() { return state.get().reload(); } /** * 设置数据库缓冲服务器的当前活动状态监控器 * @param observer 数据库缓冲服务器的状态监控器 */ public void attachObserver(DatabaseCacheStatusObserver observer){ DatabaseCacheDaemon.observer=observer; // 将当前状态信息更新到新的状态监控器 synchronized(incomingMessageQueueMutex){ observer.incomingMessageQueueSize(incomingMessageQueue.size()); } synchronized(messageQueueMutex){ observer.messageQueueSize(messageQueue.size()); } observer.numberOfFailedMessages(numberOfFailedMessages.get()); observer.numberOfSuccessfulMessages(numberOfSuccessfulMessages.get()); Collection<BlackList> blackLists=blackListMap.values(); for(BlackList blackList:blackLists){ observer.reloadBlackList(blackList); } synchronized(statusQueueMutex){ observer.statusQueueSize(statusQueue.size()); } } /** * 获得数据库缓冲服务器的当前状态 * @return 数据库缓冲服务器的当前状态 */ public String getState(){ return state.get().getState(); } /** * 强制将状态待更新消息回写回数据库(仅供状态机使用,不对外开放) */ protected void stateForceUpdateStatusQueue(){ try{ // 在系统出现异常的情况下,强制将内存中的状态数据回写回数据库 DAO dao=DAO.getInstance(); Queue<TextMessage> status=new LinkedList<TextMessage>(); status.addAll(statusQueue.values()); dao.updateMessageStatus(status); } catch(Exception ex){ observer.error(ex); } } @Override public void setBlackList(BlackList blacklist) { state.get().setBlackList(blacklist); } public boolean newIncomingMessage(Queue<TextMessage> messages){ return state.get().newIncomingMessage(messages); } public Queue<TextMessage> getNewIncomingMessage() { return state.get().getNewIncomingMessage(); } public boolean isMessageQueueFull(){ synchronized(messageQueueMutex){ return messageQueue.size()>=messageQueueCapacity.get(); } } public ControllerUserToken getInternalUserToken(){ return internalUserToken.get(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -