⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchmanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
         return;      startWorkerThread(false);   }   /**    * Counts how often a new entry was added since the current worker thread was started.    */   public int getNotifyCounter() {      return this.notifyCounter;   }   /**    * Give the callback worker thread a kick to deliver the messages.    * Throws no exception.    */   private void activateDispatchWorker() {      if (checkSending(false) == false)         return;      if (useBurstModeTimer() == true)         return;      startWorkerThread(false);   }   /**    * @return true if a burst mode timer was activated    */   private boolean useBurstModeTimer() {      if (collectTime <= 0L) return false;      // Messages are sent delayed on timeout (burst mode)      if (log.isLoggable(Level.FINE)) log.fine(ME+": Executing useBurstModeTimer() collectTime=" + collectTime + " dispatchWorkerIsActive=" + dispatchWorkerIsActive);      synchronized (this) {         if (this.isShutdown) return false;         if (this.timerKey == null) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Starting burstMode timer with " + collectTime + " msec");            this.timerKey = this.glob.getBurstModeTimer().addTimeoutListener(this, collectTime, null);         }      }      return true;   }   /**    * Remove the burst mode timer    */   private void removeBurstModeTimer() {      synchronized (this) {         if (this.timerKey != null) {            this.glob.getBurstModeTimer().removeTimeoutListener(timerKey);            this.timerKey = null;         }      }   }   /**    * @param fromTimeout for logging only    */   private void startWorkerThread(boolean fromTimeout) {      if (this.dispatchWorkerIsActive == false) {         synchronized (this) {            if (this.isShutdown) {               if (log.isLoggable(Level.FINE)) log.fine(ME+": startWorkerThread() failed, we are shutdown: " + toXml(""));               return;            }            if (this.dispatchWorkerIsActive == false) { // send message directly               this.dispatchWorkerIsActive = true;               this.notifyCounter = 0;               try {                  this.glob.getDispatchWorkerPool().execute(new DispatchWorker(glob, this));               }               catch (Throwable e) {                  this.dispatchWorkerIsActive = false;                  log.severe(ME+": Unexpected error occurred: " + e.toString());                  e.printStackTrace();               }            }         }         return;      }      if (fromTimeout) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");      }      else {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Last callback worker thread is not finished - we do nothing (the worker thread will give us a kick)");      }   }   public boolean isDead() {      return this.dispatchConnectionsHandler.isDead();   }   public boolean isPolling() {      return this.dispatchConnectionsHandler.isPolling();   }   /**    * Can be called when client connection is lost (NOT the callback connection).    * Currently only detected by the SOCKET protocol plugin.    * Others can only detect lost clients with their callback protocol pings    */   public void lostClientConnection() {      log.warning(ME+": Lost client connection");      // If SOCKET: the cb connection is lost as well and we can go to polling mode      pingCallbackServer(false);   }   public void pingCallbackServer(boolean sync) {      DispatchConnection dispatchConnection = this.dispatchConnectionsHandler.getCurrentDispatchConnection();      if (dispatchConnection != null) {         if (sync) {            dispatchConnection.timeout(null); // force a ping         }         else {            // force a ping via another thread            this.glob.getPingTimer().addTimeoutListener(dispatchConnection, 0L, null);         }      }   }   /**    * @param isPublisherThread We take care that the publisher thread, coming through putPost()    *        does never too much work to return fast enough and avoid possible dead locks.    * @return true is status is OK and we can try to send a message    */   private boolean checkSending(boolean isPublisherThread) {      if (this.isShutdown) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": The dispatcher is shutdown, can't activate callback worker thread" + toXml(""));         return false; // assert      }      if (this.isSyncMode) {         return false;      }      if (!this.dispatcherActive) {         return false;      }      if (msgQueue.isShutdown() && !isPublisherThread) { // assert         if (log.isLoggable(Level.FINE)) log.fine(ME+": The queue is shutdown, can't activate callback worker thread.");         // e.g. client has disconnected on the mean time.         //Thread.currentThread().dumpStack();         shutdown();         return false;      }      if (this.dispatchConnectionsHandler.isUndef()) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Not connected yet, state is UNDEF");         return false;      }      if (this.dispatchConnectionsHandler.isDead() && !isPublisherThread) {         String text = "No recoverable remote connection available, giving up queue " + msgQueue.getStorageId() + ".";         if (log.isLoggable(Level.FINE)) log.fine(ME+": "+text);         givingUpDelivery(new XmlBlasterException(glob,ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, text));         return false;      }      if (msgQueue.getNumOfEntries() == 0L) {         return false;      }      if (this.msgInterceptor != null) {         if (this.msgInterceptor.doActivate(this) == false) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": this.msgInterceptor.doActivate==false");            return false; // A plugin told us to suppress sending the message         }         return true;      }      /*       * The msgInterceptor plugin needs to have a chance to take care of this even in polling mode       */      if (this.dispatchConnectionsHandler.isPolling()) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Can't send message as connection is lost and we are polling");         return false;      }      return true;   }   /**    * We are notified about the burst mode timeout through this method.    * @param userData You get bounced back your userData which you passed    *                 with Timeout.addTimeoutListener()    */   public void timeout(Object userData) {      this.timerKey = null;      if (log.isLoggable(Level.FINE)) log.fine(ME+": Burst mode timeout occurred, queue entries=" + msgQueue.getNumOfEntries() + ", starting callback worker thread ...");      startWorkerThread(true);   }   /**    * @return The interceptor plugin if available, otherwise null    */   public I_MsgDispatchInterceptor getMsgDispatchInterceptor() {      return this.msgInterceptor;   }   /**    * Set new callback addresses, typically after a session login/logout    */   public void setAddresses(AddressBase[] addr) throws XmlBlasterException {      this.dispatchConnectionsHandler.initialize(addr);   }   /**    * Switch on/off the sending of messages.    */   private void initDispatcherActive(AddressBase[] addrArr) {      if (addrArr != null) {         for (int ii=0; ii<addrArr.length; ii++) { // TODO: How to handle setting of multiple addresses??            this.dispatcherActive = addrArr[ii].isDispatcherActive();         }      }   }   /**    * The worker notifies us that it is finished, if messages are available    * it is triggered again.    */   void setDispatchWorkerIsActive(boolean val) {      this.dispatchWorkerIsActive = val;      if (val == false) {         if (this.isShutdown) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": setDispatchWorkerIsActive(" + val + ") failed, we are shutdown: " + toXml(""));            return;         }         if (msgQueue.getNumOfEntries() > 0) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Finished callback job. Giving a kick to send the remaining " + msgQueue.getNumOfEntries() + " messages.");            try {               activateDispatchWorker();            }            catch(Throwable e) {               log.severe(ME+": "+e.toString()); e.printStackTrace(); // Assure the queue is flushed with another worker            }         }         else {            if (this.trySyncMode && !this.isSyncMode) {               switchToSyncMode();            }         }      }   }   /**    * Called locally and from TopicHandler when internal error (Throwable) occurred to avoid infinite looping    */   public void internalError(Throwable throwable) {      givingUpDelivery((throwable instanceof XmlBlasterException) ? (XmlBlasterException)throwable :                       new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable));      log.severe(ME+": PANIC: Internal error, doing shutdown: " + throwable.getMessage());      shutdown();   }   /**    * @return A container holding some statistical delivery information    */   public DispatchStatistic getDispatchStatistic() {      return this.dispatchConnectionsHandler.getDispatchStatistic();   }   public boolean isShutdown() {      return this.isShutdown;   }   /**    * Stop all callback drivers of this client.    * Possibly invoked twice (givingUpDelivery() calls it indirectly as well)    * We don't shutdown the corresponding queue.    */   public void shutdown() {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering shutdown ...");      if (this.isShutdown) return;      synchronized (this) {         if (this.isShutdown) return;         this.isShutdown = true;         this.msgQueue.removePutListener(this);         // remove all ConnectionStatusListeners         this.connectionStatusListeners.clear();         removeBurstModeTimer();         // NOTE: We would need to remove the 'final' qualifier to be able to set to null         if (this.msgInterceptor != null) {            try {               this.msgInterceptor.shutdown(this);            }            catch (XmlBlasterException e) {               log.warning(ME+": Ignoring problems during shutdown of plugin: " + e.getMessage());            }            //this.msgInterceptor = null;         }         if (this.dispatchConnectionsHandler != null) {            this.dispatchConnectionsHandler.shutdown();            //this.dispatchConnectionsHandler = null;         }         removeBurstModeTimer();         //this.msgQueue = null;         //this.failureListener = null;         //this.securityInterceptor = null;         //if (this.dispatchWorkerPool != null) {         //   this.dispatchWorkerPool.shutdown(); NO: not here, is the scope and duty of Global         //   this.dispatchWorkerPool = null;         //}         if (this.syncDispatchWorker != null)            this.syncDispatchWorker.shutdown();      }   }   /**    * For logging    */   public String getId() {      return this.msgQueue.getStorageId().getId();   }   /**    * Dump state of this object into a XML ASCII string.    * <br>    * @param extraOffset indenting of tags for nice output    * @return internal state as a XML ASCII string    */   public String toXml(String extraOffset) {      StringBuffer sb = new StringBuffer(2000);      if (extraOffset == null) extraOffset = "";      String offset = Constants.OFFSET + extraOffset;      sb.append(offset).append("<DispatchManager id='").append(getId());      if (this.msgQueue != null)         sb.append("' numEntries='").append(this.msgQueue.getNumOfEntries());      sb.append("' isShutdown='").append(this.isShutdown).append("'>");      sb.append(this.dispatchConnectionsHandler.toXml(extraOffset+Constants.INDENT));      sb.append(offset).append(" <dispatchWorkerIsActive>").append(dispatchWorkerIsActive).append("</dispatchWorkerIsActive>");      sb.append(offset).append("</DispatchManager>");      return sb.toString();   }   /**    * Inhibits/activates the delivery of asynchronous dispatches of messages.    * @param dispatcherActive    */   public void setDispatcherActive(boolean dispatcherActive) {      if (log.isLoggable(Level.FINE)) log.fine(ME+": Changed dispatcherActive from " + this.dispatcherActive + " to " + dispatcherActive);      this.dispatcherActive = dispatcherActive;      if (this.dispatcherActive) notifyAboutNewEntry();   }   /**    *    * @return true if the dispacher is currently activated, i.e. if it is    * able to deliver asynchronousy messages from the callback queue.    */   public boolean isDispatcherActive() {      return this.dispatcherActive;   }   public ArrayList filterDistributorEntries(ArrayList entries, Throwable ex) {      return this.dispatchConnectionsHandler.filterDistributorEntries(entries, ex);   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -