⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchmanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      getMsgErrorHandler().handleError(new MsgErrorInfo(glob, (MsgQueueEntry)null, this, ex));      shutdown();   }      public void postSendNotification(MsgQueueEntry entry) {      MsgQueueEntry[] entries = new MsgQueueEntry[] { entry };      postSendNotification(entries);   }      public void postSendNotification(MsgQueueEntry[] entries) {      I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();      if (postSendListener != null) {         try {            postSendListener.postSend(entries);         }         catch (Throwable e) {            e.printStackTrace();            log.warning("postSendListener.postSend() exception: " + e.toString());         }      }   }      /**    * Notify I_PostSendListener about problem.     * <p>    * Typically XmlBlasterAccess is notified when message came asynchronously from queue    *      * @param entryList    * @param ex    * @return true if processed    * @see I_PostSendListener#postSend(MsgQueueEntry) for explanation    */   public boolean sendingFailedNotification(MsgQueueEntry[] entries, XmlBlasterException ex) {      I_PostSendListener postSendListener = this.dispatchConnectionsHandler.getPostSendListener();      if (postSendListener == null)         return false;      try {         return postSendListener.sendingFailed(entries, ex);      }      catch (Throwable e) {         e.printStackTrace();         log.warning("postSendListener.sendingFailed() exception: " + e.toString());         return false;      }   }   /**    * Called by DispatchWorker if an Exception occured in sync mode    * Only on client side    */   void handleSyncWorkerException(ArrayList entryList, Throwable throwable) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Sync delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());      XmlBlasterException xmlBlasterException = XmlBlasterException.convert(glob,ME,null,throwable);      if (xmlBlasterException.isUser()) {         // Exception from remote client from update(), pass it to error handler and carry on ...?         // A PublishPlugin could throw it         MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);         getMsgErrorHandler().handleErrorSync(new MsgErrorInfo(glob, entries, this, xmlBlasterException));         return;      }      else if (xmlBlasterException.isCommunication()) {         if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it            try {               entryList = this.msgInterceptor.handleNextMessages(this, entryList);               if (entryList != null && entryList.size() > 0) {                  MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);                  getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));               }            }            catch (XmlBlasterException xmlBlasterException2) {               internalError(xmlBlasterException2);            }            if (entryList != null && entryList.size() > 0) {               MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);               getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, xmlBlasterException));            }            return;         }         // Exception from connection to remote client (e.g. from Corba layer)         // DispatchManager handles this         // Error handling in sync mode         // 1. throwExceptionBackToPusher         // 2. Switch to async mode and collect message (wait on better times)         // 3. If we have serious problems (programming exceptions or isDead()) throw exception back         // 4. Pass exception to an error handler plugin         switchToASyncMode();         // Simulate return values, and manipulate missing informations into entries ...         I_QueueEntry[] entries = (I_QueueEntry[])entryList.toArray(new I_QueueEntry[entryList.size()]);         getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);         msgQueue.put(entries, I_Queue.IGNORE_PUT_INTERCEPTOR);         if (log.isLoggable(Level.FINE)) log.fine(ME+": Delivery failed, pushed " + entries.length + " entries into tail back queue");      }      else {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation failed: " + xmlBlasterException.getMessage());         throw xmlBlasterException;      }   }   /**    * Messages are successfully sent, remove them now from queue (sort of a commit()):    * We remove filtered/destroyed messages as well (which doen't show up in entryListChecked)    * @param postSendNotify TODO    */   public void removeFromQueue(MsgQueueEntry[] entries, boolean postSendNotify) throws XmlBlasterException {      I_MsgDispatchInterceptor msgInterceptor = getMsgDispatchInterceptor();      MsgUnit[] msgUnits = null;      if (msgInterceptor != null) { // we need to do this before removal since the msgUnits are weak references and would be deleted by gc         msgUnits = new MsgUnit[entries.length];         for (int i=0; i < msgUnits.length; i++) {            msgUnits[i] = entries[i].getMsgUnit();         }      }      this.msgQueue.removeRandom(entries);      /*(currently only done in sync invocation)      ArrayList defaultEntries = sendAsyncResponseEvent(entryList);      if (defaultEntries.size() > 0) {         MsgQueueEntry[] entries = (MsgQueueEntry[])defaultEntries.toArray(new MsgQueueEntry[defaultEntries.size()]);         this.msgQueue.removeRandom(entries);      }      */            if (postSendNotify)         postSendNotification(entries);            if (msgInterceptor != null) {         msgInterceptor.postHandleNextMessages(this, msgUnits);      }            if (log.isLoggable(Level.FINE)) log.fine("Commit of successful sending of " +            entries.length + " messages done, current queue size is " +            this.msgQueue.getNumOfEntries() + " '" + entries[0].getLogId() + "'");   }   /**    * Called by DispatchWorker if an Exception occurred in async mode.     * @throws XmlBlasterException should never happen but is possible during removing entries from queue    */   void handleWorkerException(ArrayList entryList, Throwable throwable) throws XmlBlasterException {      // Note: The DispatchManager is notified about connection problems directly by its DispatchConnectionsHandler      //       we don't need to take care of ErrorCode.COMMUNICATION*      if (log.isLoggable(Level.FINER)) log.finer(ME+": Async delivery failed connection state is " + this.dispatchConnectionsHandler.getState().toString() + ": " + throwable.toString());      //Thread.currentThread().dumpStack();      if (entryList == null) {         if (!this.isShutdown)            log.warning(ME+": Didn't expect null entryList in handleWorkerException() for throwable " + throwable.getMessage() + toXml(""));         return;      }      getDispatchStatistic().setLastDeliveryException(throwable.toString());      getDispatchStatistic().incrNumDeliveryExceptions(1);      if (throwable instanceof XmlBlasterException) {         XmlBlasterException ex = (XmlBlasterException)throwable;         if (log.isLoggable(Level.FINE)) log.fine(ME+": Invocation or callback failed: " + ex.getMessage());         if (ex.isUser()) {            // Exception from remote client from update(), pass it to error handler and carry on ...            MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);            boolean isHandled = sendingFailedNotification(entries, ex);            if (isHandled)               removeFromQueue(entries, false);            else               getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));         }         else if (ex.isCommunication()) {            if (this.msgInterceptor != null && isPolling()) { // If we have a plugin it shall handle it               try {                  entryList = this.msgInterceptor.handleNextMessages(this, entryList);                  if (entryList != null && entryList.size() > 0) {                     MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);                     getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));                  }               }               catch (XmlBlasterException ex2) {                  internalError(ex2);               }               if (entryList != null && entryList.size() > 0) {                  MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);                  getMsgErrorHandler().handleError(new MsgErrorInfo(glob, entries, this, ex));               }            }            // Exception from connection to remote client (e.g. from Corba layer)            // DispatchManager handles this         }         else {            //log.severe(ME+": Callback failed: " + ex.toString());            //ex.printStackTrace();            MsgQueueEntry[] entries = (MsgQueueEntry[])entryList.toArray(new MsgQueueEntry[entryList.size()]);            boolean isHandled = sendingFailedNotification(entries, ex);            if (isHandled)               removeFromQueue(entries, false);            else               internalError(ex);         }      }      else {         //log.severe(ME+": Callback failed: " + throwable.toString());         //throwable.printStackTrace();         XmlBlasterException ex = new XmlBlasterException(glob, ErrorCode.COMMUNICATION_NOCONNECTION_DEAD, ME, "", throwable);         // sendingFailedNotification() not called as the msgs remain in queue until problem is resolved by admin         internalError(ex);      }   }   public I_MsgErrorHandler getMsgErrorHandler() {      return this.failureListener;   }   /**    * We register a QueuePutListener and all put() into the queue are    * intercepted - our put() is called instead.    * We then deliver this QueueEntry directly to the remote    * connection and return synchronously the returned value or the    * Exception if one is thrown.    */   public void switchToSyncMode() {      if (this.isSyncMode) return;      synchronized (this) {         if (this.isSyncMode) return;         if (this.syncDispatchWorker == null) this.syncDispatchWorker = new DispatchWorker(glob, this);         this.isSyncMode = true;         log.info(ME+": Switched to synchronous message delivery");         if (this.timerKey != null)            log.severe(ME+": Burst mode timer was activated and we switched to synchronous delivery" +                          " - handling of this situation is not coded yet");         removeBurstModeTimer();      }   }   /**    * Switch back to asynchronous mode.    * Our thread pool will take the messages out of the queue    * and deliver them in asynchronous mode.    */   public void switchToASyncMode() {      if (!this.isSyncMode) return;      synchronized (this) {         if (!this.isSyncMode) return;         //this.msgQueue.removePutListener(this);         this.isSyncMode = false;         activateDispatchWorker(); // just in case there are some messages pending in the queue         log.info(ME+": Switched to asynchronous message delivery");      }   }   /**    * @see I_QueuePutListener#putPre(I_QueueEntry)    */   public boolean putPre(I_QueueEntry queueEntry) throws XmlBlasterException {      //I_QueueEntry[] queueEntries = new I_QueueEntry[1];      //queueEntries[0] = queueEntry;      return putPre(new I_QueueEntry[] { queueEntry });   }   /**    * @see #putPre(I_QueueEntry)    * @see I_QueuePutListener#putPre(I_QueueEntry[])    */   public boolean putPre(I_QueueEntry[] queueEntries) throws XmlBlasterException {      if (!this.isSyncMode) {         if (this.inAliveTransition) {            // Do not allow other threads to put messages to queue during transition to alive            synchronized (ALIVE_TRANSITION_MONITOR) {               // don't allow            }         }         return true; // Add entry to queue      }      if (log.isLoggable(Level.FINE)) log.fine(ME+": putPre() - Got " + queueEntries.length + " QueueEntries to deliver synchronously ...");      ArrayList entryList = new ArrayList(queueEntries.length);      for (int ii=0; ii<queueEntries.length; ii++) {         if (this.trySyncMode && !this.isSyncMode && queueEntries[ii] instanceof MsgQueueGetEntry) { // this.trySyncMode === isClientSide            throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_UNAVAILABLE, ME, "You can't call get() in asynchronous mode (gets can't be queued because we don't know its return value)");         }         entryList.add(queueEntries[ii]);      }      this.syncDispatchWorker.run(entryList);      return false;   }   /**    * @see I_QueuePutListener#putPost(I_QueueEntry)    */   public void putPost(I_QueueEntry queueEntry) throws XmlBlasterException {      if (!this.isSyncMode) {         if (this.dispatcherActive) notifyAboutNewEntry();         if (((MsgQueueEntry)queueEntry).wantReturnObj()) {            // Simulate return values, and manipulate missing informations into entries ...            I_QueueEntry[] entries = new I_QueueEntry[] { queueEntry };            getDispatchConnectionsHandler().createFakedReturnObjects(entries, Constants.STATE_OK, Constants.INFO_QUEUED);         }      }   }   /**    * @see #putPost(I_QueueEntry)    * @see I_QueuePutListener#putPost(I_QueueEntry[])    */   public void putPost(I_QueueEntry[] queueEntries) throws XmlBlasterException {      if (!this.isSyncMode) {         if (this.dispatcherActive) notifyAboutNewEntry();         if (queueEntries.length > 0 && ((MsgQueueEntry)queueEntries[0]).wantReturnObj()) {            // Simulate return values, and manipulate missing informations into entries ...            getDispatchConnectionsHandler().createFakedReturnObjects(queueEntries, Constants.STATE_OK, Constants.INFO_QUEUED);         }      }   }   /**    * Here we prepare messages which are coming directly from the queue.    * <ol>    *   <li>We eliminate destroyed messages</li>    *   <li>We make a shallow copy of the message.    *       We need to do this, out messages are references directly into the queue.    *       The delivery framework is later changing the QoS    *       and plugins may change the content - and this should not modify the queue entries</li>    * </ol>    */   public ArrayList prepareMsgsFromQueue(ArrayList entryList) {      if (entryList == null || entryList.size() < 1) {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Got zero messages from queue, expected at least one, can happen if client disconnected in the mean time: " + toXml(""));         return null;      }      return prepareMsgsFromQueue(ME, log, this.msgQueue, entryList);   }   public static ArrayList prepareMsgsFromQueue(String logId, Logger log, I_Queue queue, ArrayList entryList) {      // Remove all expired messages and do a shallow copy      int size = entryList.size();      ArrayList result = new ArrayList(size);      for (int ii=0; ii<size; ii++) {         MsgQueueEntry entry = (MsgQueueEntry)entryList.get(ii);         // Take care to remove the filtered away messages from the queue as well         if (entry.isDestroyed()) {            log.info(logId+": Message " + entry.getLogId() + " is destroyed, ignoring it");            if (log.isLoggable(Level.FINE)) log.fine("Message " + entry.getLogId() + " is destroyed, ignoring it: " + entry.toXml());            try {               queue.removeRandom(entry); // Probably change to use [] for better performance            }            catch (Throwable e) {               log.severe(logId+": Internal error when removing expired message " + entry.getLogId() + " from queue, no recovery implemented, we continue: " + e.toString());            }            continue;         }         result.add(entry.clone()); // expired messages are sent as well      }      return result;   }   /**    * When somebody puts a new entry into the queue, we want to be    * notified about this after the entry is fed.    * <p>    * Called by I_Queue.putPost()    */   public void notifyAboutNewEntry() {      if (log.isLoggable(Level.FINER)) log.finer(ME+": Entering notifyAboutNewEntry("+this.notifyCounter+")");      this.notifyCounter++;      //activateDispatchWorker();      if (checkSending(true) == false)         return;      if (useBurstModeTimer() == true)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -