⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgerrorhandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
字号:
/*------------------------------------------------------------------------------Name:      MsgErrorHandler.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.qos.storage.QueuePropertyBase;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.dispatch.DispatchManager;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.error.I_MsgErrorHandler;import org.xmlBlaster.util.error.I_MsgErrorInfo;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.client.qos.DisconnectQos;import java.util.ArrayList;/** * The default error recovery implementation for messages which are lost * in time and universe. * @author xmlBlaster@marcelruff.info * @author michele@laghi.eu */public final class MsgErrorHandler implements I_MsgErrorHandler{   private final String ME;   private final long MAX_BYTES = 1000000L; // to avoid out of mem, max 1 MB during error handling      private final ServerScope glob;   private static Logger log = Logger.getLogger(MsgErrorHandler.class.getName());   private /*final -> shutdown*/ SessionInfo sessionInfo;   /**    * @param sessionInfo Can be null (e.g. for Subject errors)    */   public MsgErrorHandler(ServerScope glob, SessionInfo sessionInfo) {      this.ME = "MsgErrorHandler-" + ((sessionInfo==null) ? "" : sessionInfo.getId());      this.glob = glob;      this.sessionInfo = sessionInfo;   }   /**    * The final recovery, all informations necessary are transported in msgErrorInfo    * We expect three error types:    * <table border="1">    *   <tr><th>ErrorCode</th><th>Action</th></tr>    *   <tr>    *     <td>ErrorCode.USER*</td>    *     <td> User errors are thrown from remote clients update() method,    *       we handle only the bounced back messages.    *       We take care to remove them from the queue</td>    *   </tr>    *   <tr>    *     <td>ErrorCode.COMMUNICATION*</td>    *     <td>The connection went to state==DEAD, we recover all    *       messages from the queue</td>    *   </tr>    *   <tr>    *     <td>Other exceptions from mime access plugin</td>    *     <td>Those messages have not entered the queue yet, they have no impact on    *         the dispatcher framework (no queue flushing etc.)</td>    *  </tr>    *   <tr>    *     <td>Other exceptions from dispatcher framework</td>    *     <td>They are handled as internal problems,    *       we do the same as with COMMUNICATION* exception</td>    *  </tr>    * </table>    */   public void handleError(I_MsgErrorInfo msgErrorInfo) {      if (msgErrorInfo == null) return;      XmlBlasterException xmlBlasterException = msgErrorInfo.getXmlBlasterException();      ErrorCode errorCode = xmlBlasterException.getErrorCode();      String message = xmlBlasterException.getMessage();      MsgQueueEntry[] msgQueueEntries = msgErrorInfo.getMsgQueueEntries();      DispatchManager dispatchManager = (this.sessionInfo == null) ? null : this.sessionInfo.getDispatchManager();      I_Queue msgQueue = msgErrorInfo.getQueue();  // is null if entry is not yet in queue      if (log.isLoggable(Level.FINER)) log.finer("Error handling started: " + msgErrorInfo.toString());      if (msgQueueEntries != null && msgQueueEntries.length > 0) {         // 1. Generate dead letters from passed messages         glob.getRequestBroker().deadMessage(msgQueueEntries, msgQueue, message);         if (msgQueue != null) {            // Remove the above published dead message from the queue            try {               if (log.isLoggable(Level.FINE)) log.fine("Removing " + msgQueueEntries.length + " dead messages from queue");               long removed = 0L;               boolean tmp[] = msgQueue.removeRandom(msgQueueEntries);               for (int i=0; i < tmp.length; i++) if (tmp[i]) removed++;               if (removed != msgQueueEntries.length) {                  log.warning("Expected to remove " + msgQueueEntries.length + " messages from queue but where only " + removed + ": " + message);               }            }            catch (XmlBlasterException e) {               log.warning("Can't remove " + msgQueueEntries.length + " messages from queue: " + e.getMessage() + ". Original cause was: " + message);            }         }      }      if (xmlBlasterException.isUser()) {         // The update() method from the client has thrown a ErrorCode.USER* error         if (log.isLoggable(Level.FINE)) log.fine("Error handlig for exception " + errorCode.toString() + " done");         return;      }      // 2. Generate dead letters if there are some entries in the queue      long size = (msgQueue == null) ? 0 : msgQueue.getNumOfEntries();      if (log.isLoggable(Level.FINE)) log.fine("Flushing " + size + " remaining message from queue");      if (size > 0) {         try {            QueuePropertyBase queueProperty = (QueuePropertyBase)msgQueue.getProperties();            if (queueProperty == null || queueProperty.onFailureDeadMessage()) {               while (msgQueue.getNumOfEntries() > 0L) {                  ArrayList list = msgQueue.peek(-1, MAX_BYTES);                  MsgQueueEntry[] msgArr = (MsgQueueEntry[])list.toArray(new MsgQueueEntry[list.size()]);                  if (msgArr.length > 0) {                     glob.getRequestBroker().deadMessage(msgArr, (I_Queue)null, message);                  }                  if (msgArr.length > 0) {                     msgQueue.removeRandom(msgArr);                  }               }            }            else {               log.severe("PANIC: Only onFailure='" + Constants.ONOVERFLOW_DEADMESSAGE +                     "' is implemented, " + msgQueue.getNumOfEntries() + " messages are lost: " + message);            }         }         catch(Throwable e) {            e.printStackTrace();            log.severe("PANIC: givingUpDelivery failed, " + size +                           " messages are lost: " + message + ": " + e.toString());         }      }      // We do a auto logout if the callback is down      if (dispatchManager == null || dispatchManager.isDead()) {         if (log.isLoggable(Level.FINE)) log.fine("Doing error handling for dead connection state ...");         if (dispatchManager!=null) dispatchManager.shutdown();         // 3. Kill login session         if (this.sessionInfo != null && // if callback has been configured (async)              sessionInfo.getConnectQos().getSessionCbQueueProperty().getCallbackAddresses().length > 0) {                        try {               //if (address == null || address.getOnExhaustKillSession()) {                  log.warning("Callback server is lost, killing login session of client " +                                ((msgQueue == null) ? "unknown" : msgQueue.getStorageId().toString()) +                                ": " + message);                  try {                     DisconnectQos disconnectQos = new DisconnectQos(glob);                     disconnectQos.deleteSubjectQueue(false);                     glob.getAuthenticate().disconnect(this.sessionInfo.getAddressServer(),                                          this.sessionInfo.getSecretSessionId(), disconnectQos.toXml());                  }                  catch (Throwable e) {                     log.severe("PANIC: givingUpDelivery error handling failed, " +                                    size + " messages are lost: " + message + ": " + e.toString());                  }               //}               //else {               //   log.error(ME, "PANIC: givingUpDelivery error handling failed, '" + address.getOnExhaust() +               //       "' is not supported, " + size + " messages are lost: " + message);               //}            }            catch(Throwable e) {               log.severe("PANIC: givingUpDelivery error handling failed, " + size +                              " messages are lost: " + message + ": " + e.toString());            }         }      }   }   /**    * This should never happen on server side, so we just call handleError(I_MsgErrorInfo).     * @exception XmlBlasterException is thrown if we are in sync mode and we have no COMMUNICATION problem,    * the client shall handle it himself    */   public void handleErrorSync(I_MsgErrorInfo msgErrorInfo) throws XmlBlasterException {      if (log.isLoggable(Level.FINE)) log.fine("Unexpected sync error handling invocation, we try our best");      //Thread.currentThread().dumpStack();      handleError(msgErrorInfo);   }   public void shutdown() {      //this.sessionInfo = null;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -