⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumablequeueplugin.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                        SubscriptionInfo[] subInfoArr = this.serverScope.getTopicAccessor().getSubscriptionInfoArrDirtyRead(this.topicId);            ArrayList subInfoList = new ArrayList();            for (int i=0; i < subInfoArr.length; i++) subInfoList.add(subInfoArr[i]);                        for (int i=0; i < lst.size(); i++) {               if (!this.isReady) return;               MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)lst.get(i);               MsgUnitWrapper msgUnitWrapper = (entry).getMsgUnitWrapper();               if (msgUnitWrapper != null) {                   if (!this.distributeOneEntry(msgUnitWrapper, entry, subInfoList)) {                     this.isReady = false;                     this.isRunning = false;                     return;                  }                }            }                       }         this.isReady = true;      }      catch (Throwable ex) {         this.isReady = false;         this.isRunning = false;         ex.printStackTrace();         log.severe("processQueue: " + ex.getMessage());      }   }   /**    * Distributes one single entry taken from the history queue. This method     * is strict, it does not throw any exceptions. If one exception occurs     * inside this method, the distribution is interrupted, a dead letter is     * generated and the entry is removed from the history queue.    *     * @param subInfoList contains the SubscriptionInfo objects to scan. Once the     *        message is processed by one of the dispatchers, the associated     *        SessionInfo is put at the end of the list to allow some simple    *        load balancing mechanism.    *     * @return true if the entry has been removed from the history queue. This happens    * if the entry could be sent successfully, or if distribution was given up due to    * an exception. It returns false if none of the subscribers were able to receive     * the message (to tell the invoker not to continue with distribution until    * the next event.    */   private boolean distributeOneEntry(MsgUnitWrapper msgUnitWrapper, MsgQueueHistoryEntry entry, List subInfoList) {       try {         if (msgUnitWrapper == null) {            log.severe("distributeOneEntry() MsgUnitWrapper is null");            Thread.dumpStack();            givingUpDistribution(null, msgUnitWrapper, entry, null);            return true; // let the loop continue: other entries could be OK         }         if (log.isLoggable(Level.FINER)) log.finer("distributeOneEntry '" + msgUnitWrapper.getUniqueId() + "' '" + msgUnitWrapper.getKeyOid() + "'");         // Take a copy of the map entries (a current snapshot)         // If we would iterate over the map directly we can risk a java.util.ConcurrentModificationException         // when one of the callback fails and the entry is removed by the callback worker thread         SubscriptionInfo[] subInfoArr = (SubscriptionInfo[])subInfoList.toArray(new SubscriptionInfo[subInfoList.size()]);         for (int ii=0; ii<subInfoArr.length; ii++) {            SubscriptionInfo sub = subInfoArr[ii];            if (TopicHandler.isDirtyRead(sub, msgUnitWrapper)) {               log.severe("ConsumableQueuePlugin used together with 'dirtyRead' is not supported");               TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId);               if (topicHandler == null) return true;               try {                  I_Queue srcQueue = topicHandler.getHistoryQueue();                  if (srcQueue != null) srcQueue.removeRandom(entry);               }               finally {                  this.serverScope.getTopicAccessor().release(topicHandler);               }               return true; // even if it has not been sent            }          }            for (int ii=0; ii<subInfoArr.length; ii++) {            SubscriptionInfo sub = subInfoArr[ii];                        if (!TopicHandler.subscriberMayReceiveIt(sub, msgUnitWrapper)) continue;            //Has no effect:            //if (!this.topicHandler.checkIfAllowedToSend(null, sub)) continue;            // this is specific for this plugin            if (sub.getSessionInfo().getDispatchManager() == null) continue;            if (!sub.getSessionInfo().getDispatchManager().getDispatchConnectionsHandler().isAlive()) continue;            try {               try {                  TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId);                  if (topicHandler == null) return true;                  try {                     // the 'false' here is to tell the filter not to send a dead letter in case of an ex                     if (!topicHandler.checkFilter(null, sub, msgUnitWrapper, false)) continue;                  }                  finally {                     this.serverScope.getTopicAccessor().release(topicHandler);                  }               }               catch (XmlBlasterException ex) {                  // continue;                  givingUpDistribution(sub, msgUnitWrapper, entry, ex);                  return true; // because the entry has been removed from the history queue               }               // put the current dispatcher at the end of the list for next invocation (round robin)               subInfoList.remove(sub);               subInfoList.add(sub);                              MsgQueueUpdateEntry updateEntry = TopicHandler.createEntryFromWrapper(msgUnitWrapper,sub);               UpdateReturnQosServer retQos = doDistribute(sub, updateEntry);               if (log.isLoggable(Level.FINE)) {                  if (retQos == null) log.fine("distributeOneEntry: the return object was null: callback has not sent the message (dirty reads ?)");               }               if (retQos == null || retQos.getException() == null) {                  TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId);                  if (topicHandler == null) return true;                  try {                     I_Queue srcQueue = topicHandler.getHistoryQueue();                     if (srcQueue != null) srcQueue.removeRandom(entry); // success                  }                  finally {                     this.serverScope.getTopicAccessor().release(topicHandler);                  }                  if (log.isLoggable(Level.FINE)) log.fine("distributeOneEntry: successfully removed entry from queue");                  return true;               }               else {                  log.severe("distributeOneEntry an exception occured: " + retQos.getException().getMessage());                  Throwable ex = retQos.getException();                  // continue if it is a communication exception stop otherwise                  if (ex instanceof XmlBlasterException && ((XmlBlasterException)ex).isCommunication()) continue;                  // we pass null for the exception since we don't want to shut down the dispatcher                  givingUpDistribution(sub, msgUnitWrapper, entry, null);                  return true; //since removed               }            }            catch (Throwable e) {               e.printStackTrace();               givingUpDistribution(sub, msgUnitWrapper, entry, e);               return true;            }         }      }      catch (Throwable ex) {         ex.printStackTrace();         log.severe("distributeOneEntry " + ex.getMessage());         givingUpDistribution(null, msgUnitWrapper, entry, ex);                  // TODO or should we return true here to allow to continue ?          // I think it is a serious ex: probably does not make sense to cont.      }      return false;   }   private void givingUpDistribution(SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, MsgQueueEntry entry, Throwable e) {      try {         String id = "";         if (sub != null) id = sub.getSessionInfo().getId();         String exTxt = "";         if (e != null) exTxt = e.toString();         SessionName publisherName = msgUnitWrapper.getMsgQosData().getSender();         if (log.isLoggable(Level.FINE)) log.fine("Sending of message from " + publisherName + " to " +                            id + " failed: " + exTxt);                        if (sub != null && e != null)             sub.getSessionInfo().getDispatchManager().internalError(e); // calls MsgErrorHandler         else {            this.serverScope.getRequestBroker().deadMessage(new MsgQueueEntry[] { entry }, null, ME + ".givingUpDistribution: " + exTxt);         }         // remove the entry from the history queue now that a dead letter has been sent.         TopicHandler topicHandler = this.serverScope.getTopicAccessor().access(this.topicId);         try {            I_Queue historyQueue = topicHandler.getHistoryQueue();            if (historyQueue != null)               historyQueue.removeRandom(entry);         }         finally {            this.serverScope.getTopicAccessor().release(topicHandler);         }      }      catch (XmlBlasterException ex) {         log.severe("givingUpDistribution: " + ex.getMessage());         ex.printStackTrace();      }   }   /**    * Enforced by the I_DistributionInterceptor interface. It sends sychronously to    * the DispatchWorker this entry.     */       private UpdateReturnQosServer doDistribute(SubscriptionInfo sub, MsgQueueUpdateEntry entry) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("doDistribute");      // this is a sync call (all in the same thread)      entry.setWantReturnObject(true);      DispatchWorker worker = new DispatchWorker(this.global, sub.getSessionInfo().getDispatchManager());      ArrayList list = new ArrayList();      list.add(entry);      worker.run(list);            return (UpdateReturnQosServer)entry.getReturnObj();   }   /**    * @see org.xmlBlaster.engine.I_SubscriptionListener#getPriority()    */   public Integer getPriority() {      return PRIO_05;   }}   

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -