📄 memorypersistencemessagelauncherstatusobserver.java
字号:
package com.infobank.superchannel.observer.implementation;import java.util.LinkedList;import java.util.List;import java.util.Queue;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference;import org.apache.commons.configuration.Configuration;import com.infobank.superchannel.daemon.messagelauncher.MessageLauncherDaemon;import com.infobank.superchannel.observer.ChannelStatusObserver;import com.infobank.superchannel.observer.MessageLauncherStatusObserver;import com.infobank.superchannel.pojo.GenericLog;import com.infobank.superchannel.pojo.MessageBucketLog;import com.infobank.superchannel.pojo.TextMessage;import com.infobank.superchannel.pojo.TextMessageLog;import com.infobank.superchannel.util.DaemonConfiguration;/** * @author Xiaoguang Sun * @version 类说明: MemoryPersistence状态观察者 */public class MemoryPersistenceMessageLauncherStatusObserver extends MemoryPersistenceLogObserver implements MessageLauncherStatusObserver{ private List<TextMessageLog> changeMessageStatusLog=new LinkedList<TextMessageLog>(); private Object changeMessageStatusLogMutex=new Object(); private static int maximumChangeMessageStatusLogSize=100; private List<MessageBucketLog> newMassMessageBucketLog=new LinkedList<MessageBucketLog>(); private Object newMassMessageBucketLogMutex=new Object(); private static int maximumMassMessageBucketLogSize=100; private List<MessageBucketLog> newGeneralMessageBucketLog=new LinkedList<MessageBucketLog>(); private Object newGeneralMessageBucketLogMutex=new Object(); private static int maximumGeneralMessageBucketLogSize=100; private AtomicInteger massMessageQueueSize=new AtomicInteger(0); private AtomicInteger generalMessageQueueSize=new AtomicInteger(0); private AtomicReference<String> state=new AtomicReference<String>(new String()); public static void init(MessageLauncherDaemon daemon){ String tag=daemon.getTag(); DaemonConfiguration config=DaemonConfiguration.getInstance(); StringBuilder sb=new StringBuilder(); sb.append("/daemon[@type='MessageLauncher' and @tag='"); sb.append(tag); sb.append("']/observer[@type='MessageLauncherStatusObserver']/capacity[@persistence='Memory']/"); String basePath=sb.toString(); sb.append("changeMessageStatus"); maximumChangeMessageStatusLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("massMessageBucket"); maximumMassMessageBucketLogSize=config.getInt(sb.toString()); sb=new StringBuilder(); sb.append(basePath); sb.append("generalMessageBucket"); maximumGeneralMessageBucketLogSize=config.getInt(sb.toString()); } /** * Check the size of a given list. If it is greater than maxsize remove the first one * to keep it smaller than the preset size * @param list * @param maxsize */ private void checkListSize(List list,int maxsize){ int listSize=list.size(); if(listSize>=maxsize){ //list.subList(0, listSize-maxsize+1).clear(); list.remove(0); } } /** * 获取自上次调用此方法后产生的所有消息状态更新日志 * @param token 用户访问令牌 * @return 自上次调用此方法后产生的所有消息状态更新日志 */ public List<TextMessageLog> getChangeMessageStatusLog(){ List<TextMessageLog> log=null; synchronized(changeMessageStatusLogMutex){ log=changeMessageStatusLog; changeMessageStatusLog=new LinkedList<TextMessageLog>(); } return log; } /** * 获取自上次调用此方法后产生的所有新批量消息组日志 * @param token 用户访问令牌 * @return 自上次调用此方法后产生的所有新批量消息组日志 */ public List<MessageBucketLog> getNewMassMessageBucketLog(){ List<MessageBucketLog> log=null; synchronized(newMassMessageBucketLogMutex){ log=newMassMessageBucketLog; newMassMessageBucketLog=new LinkedList<MessageBucketLog>(); } return log; } /** * 获取自上次调用此方法后产生的所有新普通消息组日志 * @param token 用户访问令牌 * @return 自上次调用此方法后产生的所有新普通消息组日志 */ public List<MessageBucketLog> getNewGeneralMessageBucketLog(){ List<MessageBucketLog> log=null; synchronized(newGeneralMessageBucketLogMutex){ log=newGeneralMessageBucketLog; newGeneralMessageBucketLog=new LinkedList<MessageBucketLog>(); } return log; } /** * 获取当前批量消息队列尺寸 * @param token 用户访问令牌 * @return 当前批量消息队列尺寸 */ public int getMassMessageQueueSize(){ return massMessageQueueSize.get(); } /** * 获取当前普通消息队列尺寸 * @param token 用户访问令牌 * @return 当前普通消息队列尺寸 */ public int getGeneralMessageQueueSize(){ return generalMessageQueueSize.get(); } /** * 消息状态变更 * @param message 状态变更的消息 */ public void changeMessageStatus(TextMessage message){ synchronized(changeMessageStatusLogMutex){ checkListSize(changeMessageStatusLog,maximumChangeMessageStatusLogSize); changeMessageStatusLog.add(new TextMessageLog(message)); } } /** * 设置普通信息队列尺寸 * @param size 普通信息队列尺寸 */ public void generalMessageQueueSize(int size) { generalMessageQueueSize.set(size); } /** * 设置批量信息队列尺寸 * @param size 批量信息队列尺寸 */ public void massMessageQueueSize(int size) { massMessageQueueSize.set(size); } /** * 新的普通消息组到来 * @param bucket 新普通消息组 */ public void newGeneralMessageBucket(Queue<TextMessage> bucket) { MessageBucketLog log=null; synchronized(newGeneralMessageBucketLogMutex){ checkListSize(newGeneralMessageBucketLog,maximumGeneralMessageBucketLogSize); log=new MessageBucketLog(bucket); newGeneralMessageBucketLog.add(log); } } /** * 新的批量消息组到来 * @param bucket 新批量消息组 */ public void newMassMessageBucket(Queue<TextMessage> bucket) { MessageBucketLog log=null; synchronized(newMassMessageBucketLogMutex){ checkListSize(newMassMessageBucketLog,maximumMassMessageBucketLogSize); log=new MessageBucketLog(bucket); newMassMessageBucketLog.add(log); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -