⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumablequeueplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*------------------------------------------------------------------------------Name:      ConsumableQueuePlugin.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine.distributor.plugins;import java.util.ArrayList;import java.util.List;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.MsgUnitWrapper;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.SubscriptionEvent;import org.xmlBlaster.engine.SubscriptionInfo;import org.xmlBlaster.engine.TopicHandler;import org.xmlBlaster.engine.distributor.I_MsgDistributor;import org.xmlBlaster.engine.qos.UpdateReturnQosServer;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.dispatch.ConnectionStateEnum;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.dispatch.DispatchWorker;import org.xmlBlaster.util.dispatch.I_ConnectionStatusListener;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;/** * ConsumableQueuePlugin * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> *  */public class ConsumableQueuePlugin implements I_MsgDistributor, I_ConnectionStatusListener {   private final static String ME = "ConsumableQueuePlugin";   // int status;   boolean isReady;   boolean isRunning;   private Global global;   private static Logger log = Logger.getLogger(ConsumableQueuePlugin.class.getName());   private PluginInfo pluginInfo;   private ServerScope serverScope;   private String topicId; // <key oid="..."   private Object mutex = new Object();   /**    * The default constructor. Currently does nothing.    */      public ConsumableQueuePlugin() {      this.isReady = false;   }   /**    * Invoked on status changes when it shall start to distribute     * entries. This can either happen on publish, on subscribe or when     * a dispatcher becomes alive again. This method is synchronized to avoid     * more threads running concurrently (see processHistoryQueue).    */   private void toRunning() {      if (log.isLoggable(Level.FINER)) log.finer("toRunning, isRunning='" + this.isRunning + "' isReady='" + this.isReady + "'");      synchronized (this.mutex) {         if (this.isRunning || !this.isReady) return;         this.isRunning = true;      }      try {         // the global owns a thread pool (Doug Lea's executor pattern)         this.global.getDispatchWorkerPool().execute(new ConsumableQueueWorker(log, this));      }      catch (InterruptedException ex) {         log.severe("toRunning: exception " + ex.getMessage());         ex.printStackTrace();      }   }   /**    * @see org.xmlBlaster.engine.distributor.I_MsgDistributor#distribute(org.xmlBlaster.engine.TopicHandler, org.xmlBlaster.authentication.SessionInfo, org.xmlBlaster.engine.MsgUnitWrapper)    * Invoked by the TopicHandler on publish or subscribe. Starts the distributor thread and     * returnes immeditately. From here distribution is handled by another thread.    **/   public void distribute(MsgUnitWrapper msgUnitWrapper) {      if (log.isLoggable(Level.FINER)) log.finer("distribute");      toRunning();    }      /**    * Initializes the plugin    */   public void init(Global global, PluginInfo pluginInfo)      throws XmlBlasterException {      this.global = global;      if (log.isLoggable(Level.FINER)) log.finer("init");      this.pluginInfo = pluginInfo;      this.serverScope = (org.xmlBlaster.engine.ServerScope)this.global.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope);      TopicHandler topicHandler = (TopicHandler)this.pluginInfo.getUserData();      this.topicId = topicHandler.getUniqueKey();      this.isReady = true;      toRunning();   }   public String getType() {      return this.pluginInfo.getType();   }   public String getVersion() {      return this.pluginInfo.getVersion();   }   /**    * It removes all subscriptions done on this topic    */   public void shutdown() throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("shutdown");      SubscriptionInfo[] subs = this.serverScope.getTopicAccessor().getSubscriptionInfoArrDirtyRead(this.topicId);      for (int i=0; i < subs.length; i++) subscriptionRemove(new SubscriptionEvent(subs[i]));      this.isReady = false;   }      private final DispatchManager getDispatchManager(SubscriptionInfo subscriptionInfo) {      if (subscriptionInfo == null) {         log.severe("getDispatchManager the subscriptionInfo object is null");         Thread.dumpStack();         return null;      }      SessionInfo sessionInfo = subscriptionInfo.getSessionInfo();      if (sessionInfo == null) {         log.severe("getDispatchManager the sessionInfo object is null");         Thread.dumpStack();         return null;      }      DispatchManager dispatchManager = sessionInfo.getDispatchManager();      if (dispatchManager == null) {         log.severe("getDispatchManager the dispatcherManager object is null");         Thread.dumpStack();         return null;      }      return dispatchManager;   }      /**    * Invoked when a subscriber is added to the TopicHandler    * @param subscriptionInfo    */   public void subscriptionAdd(SubscriptionEvent e)       throws XmlBlasterException {      SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo();      if (log.isLoggable(Level.FINER)) log.finer("onAddSubscriber");      DispatchManager dispatchManager = getDispatchManager(subscriptionInfo);      if (dispatchManager != null) dispatchManager.addConnectionStatusListener(this);      this.isReady = true;      toRunning();   }      /**    * Invoked when a subscriber is removed from the TopicHandler    * @param subscriptionInfo    */   public void subscriptionRemove(SubscriptionEvent e) throws XmlBlasterException {      SubscriptionInfo subscriptionInfo = e.getSubscriptionInfo();      if (log.isLoggable(Level.FINER)) log.finer("onRemoveSubscriber");      DispatchManager dispatchManager = getDispatchManager(subscriptionInfo);      if (dispatchManager != null) dispatchManager.removeConnectionStatusListener(this);   }   /**    * Event arriving from one DispatchManager telling this plugin it can     * start distribute again.     */   public void toAlive(DispatchManager dispatchManager, ConnectionStateEnum oldState) {      if (log.isLoggable(Level.FINER)) log.finer("toAlive");      this.isReady = true;      toRunning();   }   public void toPolling(DispatchManager dispatchManager, ConnectionStateEnum oldState) {   }   public void toDead(DispatchManager dispatchManager, ConnectionStateEnum oldState, String errorText) {   }   /**    * Takes entries from the history queue and distributes it to the dispatcher    * framework until there are entries available or until the dispatcher framework    * is alive.    */   void processHistoryQueue() {      if (log.isLoggable(Level.FINER)) log.finer("processQueue");      try {         ArrayList lst = null;         while (true) {            //synchronized(this) {               TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId);               try {                  I_Queue historyQueue = topicHandler.getHistoryQueue();                  if (historyQueue == null) {                     this.isRunning = false;                     break;                  }                  lst = historyQueue.peek(-1, -1L);                  if (log.isLoggable(Level.FINE)) log.fine("processQueue: processing '" + lst.size() + "' entries from queue");                  if (lst == null || lst.size() < 1) {                     this.isRunning = false;                     break;                  }               }               finally {                  this.serverScope.getTopicAccessor().release(topicHandler);               }            //}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -