⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchworker.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------Name:      DispatchWorker.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.dispatch;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.dispatch.plugins.I_MsgDispatchInterceptor;import java.util.ArrayList;/** * Takes messages from the queue and tries to send them back to a client.  * @author xmlBlaster@marcelruff.info */public final class DispatchWorker implements Runnable{   public final String ME;   private static Logger log = Logger.getLogger(DispatchWorker.class.getName());   private DispatchManager dispatchManager;   private I_Queue msgQueue;   public DispatchWorker(Global glob, DispatchManager mgr) {      this.dispatchManager = mgr;      this.msgQueue = mgr.getQueue();      ME = "DispatchWorker-" + this.msgQueue.getStorageId();    }   /**    * Synchronous push mode.     * The 'synchronous' ACK is transported in entryList(i).getReturnObj()    * which contains pre parsed objects like SubscribeQosReturn etc.    */   public void run(ArrayList entryList) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Starting push remote dispatch of " + ((entryList!=null)?entryList.size():0) + " entries.");      MsgQueueEntry[] entries = null;      try {         I_MsgDispatchInterceptor msgInterceptor = dispatchManager.getMsgDispatchInterceptor();         if (msgInterceptor != null) {            log.severe("Communication dispatch plugin support is missing in sync mode - not implemented");            /*!!! filter or whatever            try {               entryList = msgInterceptor.handleNextMessages(dispatchManager, entryList); // should call prepareMsgsFromQueue() immediately            }            catch (Throwable e) {               entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);               throw e;            }            */         }         if (entryList == null || entryList.size() < 1) {            if (log.isLoggable(Level.FINE)) log.fine("Got zero messages from to deliver, expected at least one");            return;         }         entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);                  if (log.isLoggable(Level.FINE)) log.fine("Sending now " + entries.length + " messages ...");                  dispatchManager.getDispatchConnectionsHandler().send(entries); // entries are filled with return values         /*ArrayList defaultEntries = */this.dispatchManager.filterDistributorEntries(entryList, null);         if (log.isLoggable(Level.FINE)) log.fine("Commit of successful sending of " + entryList.size() + " messages done, current queue size is " + this.msgQueue.getNumOfEntries() + " '" + ((MsgQueueEntry)entryList.get(0)).getLogId() + "'");      }      catch(Throwable throwable) {         ArrayList entriesWithNoDistributor = this.dispatchManager.filterDistributorEntries(entryList, throwable);         if (entriesWithNoDistributor.size() > 0) dispatchManager.handleSyncWorkerException(entriesWithNoDistributor, throwable);      }   }   /**    * Asynchronous pull mode, invoked by DispatchWorkerPool.execute() -> see DispatchManager calling it    */   public void run() {      if (log.isLoggable(Level.FINER)) log.finer("Starting remote dispatch with " + this.msgQueue.getNumOfEntries() + " entries.");      ArrayList entryList = null;      ArrayList entryListChecked = null;      try {         I_MsgDispatchInterceptor msgInterceptor = dispatchManager.getMsgDispatchInterceptor();         if (msgInterceptor != null) {               entryListChecked = msgInterceptor.handleNextMessages(dispatchManager, null); // should call prepareMsgsFromQueue() immediately               entryList = entryListChecked;         }         else {            //synchronized (this.msgQueue) {               //entryList = (MsgQueueEntry[])this.msgQueue.take(-1); --> get()               // not blocking and only all of the same priority:               entryList = this.msgQueue.peekSamePriority(dispatchManager.getBurstModeMaxEntries(), dispatchManager.getBurstModeMaxBytes()); // -1, -1L -> get all entries in cache               entryListChecked = dispatchManager.prepareMsgsFromQueue(entryList);            //}         }         if (entryList == null || entryList.size() < 1) {            if (log.isLoggable(Level.FINE)) log.fine("Got zero messages from queue, expected at least one, can happen if messages expired or client disconnected in the mean time.");            return;         }         if (entryListChecked != null && entryListChecked.size() > 0) {            MsgQueueEntry[] entries = (MsgQueueEntry[])entryListChecked.toArray(new MsgQueueEntry[entryListChecked.size()]);                        if (log.isLoggable(Level.FINE)) log.fine("Sending now " + entries.length + " messages ..., current queue size is " + this.msgQueue.getNumOfEntries() + " '" + entries[0].getLogId() + "'");                        dispatchManager.getDispatchConnectionsHandler().send(entries);            // Here an exception is thrown or            // the RETURN value is transferred in the entries[i].getReturnObj(), for oneway updates it is null                        if (log.isLoggable(Level.FINE)) log.fine("Sending of " + entries.length + " messages done, current queue size is " + this.msgQueue.getNumOfEntries());         }         MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);         dispatchManager.removeFromQueue(entries, true);      }      catch(Throwable throwable) {         try {            dispatchManager.handleWorkerException(entryList, throwable);         }         catch (Throwable e) {            e.printStackTrace();            StringBuffer buf = new StringBuffer(2048);            for (int i=0; i<entryList.size(); i++)               buf.append(" ").append(((MsgQueueEntry)entryList.get(i)).getLogId());            log.severe("Commit of sending of " +                  entryList.size() + " messages failed, current queue size is " +                  this.msgQueue.getNumOfEntries() + " '" + buf.toString() + "': " + e.toString());         }         // ArrayList entriesWithNoDistributor = this.sendAsyncResponseEvent(entryList);         // if (entriesWithNoDistributor.size() > 0) dispatchManager.handleWorkerException(entriesWithNoDistributor, throwable);      }      finally {         this.dispatchManager.setDispatchWorkerIsActive(false);         entryList = null;         shutdown();      }   }      void shutdown() {      // Commented out to avoid NPE      //this.dispatchManager = null;      //this.msgQueue = null;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -