📄 consumablequeueplugin.java
字号:
/*------------------------------------------------------------------------------Name: ConsumableQueuePlugin.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.distributor.plugins;import java.util.ArrayList;import java.util.List;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.SubscriptionEvent;import org.xmlBlaster.engine.SubscriptionInfo;import org.xmlBlaster.engine.TopicHandler;import org.xmlBlaster.engine.distributor.I_MsgDistributor;import org.xmlBlaster.engine.qos.UpdateReturnQosServer;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.DispatchWorker;import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;/** * ConsumableQueuePlugin * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> * */public class ConsumableQueuePlugin implements I_MsgDistributor, I_ConnectionStatusListener { private final static String ME = "ConsumableQueuePlugin"; // int status; boolean isReady; boolean isRunning; private Global global; private static Logger log = Logger.getLogger(ConsumableQueuePlugin.class.getName()); private PluginInfo pluginInfo; private ServerScope serverScope; private String topicId; // <key oid="..." private Object mutex = new Object(); /** * The default constructor. Currently does nothing. */ public ConsumableQueuePlugin() { this.isReady = false; } /** * Invoked on status changes when it shall start to distribute * entries. This can either happen on publish, on subscribe or when * a dispatcher becomes alive again. This method is synchronized to avoid * more threads running concurrently (see processHistoryQueue). */ private void toRunning() { if (log.isLoggable(Level.FINER)) log.finer("toRunning, isRunning='" + this.isRunning + "' isReady='" + this.isReady + "'"); synchronized (this.mutex) { if (this.isRunning || !this.isReady) return; this.isRunning = true; } try { // the global owns a thread pool (Doug Lea's executor pattern) this.global.getDispatchWorkerPool().execute(new ConsumableQueueWorker(log, this)); } catch (InterruptedException ex) { log.severe("toRunning: exception " + ex.getMessage()); ex.printStackTrace(); } } /** * @see org.xmlBlaster.engine.distributor.I_MsgDistributor#distribute(org.xmlBlaster.engine.TopicHandler, org.xmlBlaster.authentication.SessionInfo, org.xmlBlaster.engine.MsgUnitWrapper) * Invoked by the TopicHandler on publish or subscribe. Starts the distributor thread and * returnes immeditately. From here distribution is handled by another thread. **/ public void distribute(MsgUnitWrapper msgUnitWrapper) { if (log.isLoggable(Level.FINER)) log.finer("distribute"); toRunning(); } /** * Initializes the plugin */ public void init(Global global, PluginInfo pluginInfo) throws XmlBlasterException { this.global = global; if (log.isLoggable(Level.FINER)) log.finer("init"); this.pluginInfo = pluginInfo; this.serverScope = (org.xmlBlaster.engine.ServerScope)this.global.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope); TopicHandler topicHandler = (TopicHandler)this.pluginInfo.getUserData(); this.topicId = topicHandler.getUniqueKey(); this.isReady = true; toRunning(); } public String getType() { return this.pluginInfo.getType(); } public String getVersion() { return this.pluginInfo.getVersion(); } /** * It removes all subscriptions done on this topic */ public void shutdown() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("shutdown"); SubscriptionInfo[] subs = this.serverScope.getTopicAccessor().getSubscriptionInfoArrDirtyRead(this.topicId); for (int i=0; i < subs.length; i++) subscriptionRemove(new SubscriptionEvent(subs[i])); this.isReady = false; } private final DispatchManager getDispatchManager(SubscriptionInfo subscriptionInfo) { if (subscriptionInfo == null) { log.severe("getDispatchManager the subscriptionInfo object is null"); Thread.dumpStack(); return null; } SessionInfo sessionInfo = subscriptionInfo.getSessionInfo(); if (sessionInfo == null) { log.severe("getDispatchManager the sessionInfo object is null"); Thread.dumpStack(); return null; } DispatchManager dispatchManager = sessionInfo.getDispatchManager(); if (dispatchManager == null) { log.severe("getDispatchManager the dispatcherManager object is null"); Thread.dumpStack(); return null; } return dispatchManager; } /** * Invoked when a subscriber is added to the TopicHandler * @param subscriptionInfo */ public void subscriptionAdd(SubscriptionEvent e) throws XmlBlasterException { SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); if (log.isLoggable(Level.FINER)) log.finer("onAddSubscriber"); DispatchManager dispatchManager = getDispatchManager(subscriptionInfo); if (dispatchManager != null) dispatchManager.addConnectionStatusListener(this); this.isReady = true; toRunning(); } /** * Invoked when a subscriber is removed from the TopicHandler * @param subscriptionInfo */ public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException { SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo(); if (log.isLoggable(Level.FINER)) log.finer("onRemoveSubscriber"); DispatchManager dispatchManager = getDispatchManager(subscriptionInfo); if (dispatchManager != null) dispatchManager.removeConnectionStatusListener(this); } /** * Event arriving from one DispatchManager telling this plugin it can * start distribute again. */ public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) { if (log.isLoggable(Level.FINER)) log.finer("toAlive"); this.isReady = true; toRunning(); } public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) { } public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) { } /** * Takes entries from the history queue and distributes it to the dispatcher * framework until there are entries available or until the dispatcher framework * is alive. */ void processHistoryQueue() { if (log.isLoggable(Level.FINER)) log.finer("processQueue"); try { ArrayList lst = null; while (true) { //synchronized(this) { TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId); try { I_Queue historyQueue = topicHandler.getHistoryQueue(); if (historyQueue == null) { this.isRunning = false; break; } lst = historyQueue.peek(-1, -1L); if (log.isLoggable(Level.FINE)) log.fine("processQueue: processing '" + lst.size() + "' entries from queue"); if (lst == null || lst.size() < 1) { this.isRunning = false; break; } } finally { this.serverScope.getTopicAccessor().release(topicHandler); } //}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -