⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topichandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      SubscriptionInfo subs = null;      synchronized(this.subscriberMap) {         subs = (SubscriptionInfo)this.subscriberMap.remove(subscriptionInfoUniqueKey);      }      if (subs == null && !isDead() && !isSoftErased()) {         //Thread.currentThread().dumpStack();         log.warning(ME+": can't unsubscribe, you where not subscribed to subscription ID=" + subscriptionInfoUniqueKey);      }      if (log.isLoggable(Level.FINE)) log.fine(ME+": After size of subscriberMap = " + getNumSubscribers());      if (isDead()) {         if (this.subscriptionListener != null && subs != null) {            try {               this.subscriptionListener.subscriptionRemove(new SubscriptionEvent(subs));            }            catch (XmlBlasterException ex) {               log.severe(ME+": removeSubscriber: an exception occured: " + ex.getMessage());            }         }         return subs; // during cleanup process      }      ArrayList notifyList = null;      if (!hasCacheEntries() && !hasExactSubscribers()) {         if (isUnconfigured())            notifyList = toDead(this.creatorSessionName, null, null);         else {            try {               notifyList = toUnreferenced(false, false);            }            catch (XmlBlasterException e) {               log.severe(ME+": Internal problem with removeSubscriber: " + e.getMessage() + ": " + toXml());            }         }      }      if (this.subscriptionListener != null && subs != null) {         try {            this.subscriptionListener.subscriptionRemove(new SubscriptionEvent(subs));         }         catch (XmlBlasterException ex) {            log.severe(ME+": removeSubscriber: an exception occured: " + ex.getMessage());         }      }      if (notifyList != null) notifySubscribersAboutErase(notifyList); // must be outside the synchronize      return subs;   }   /**    * This is the unique key of the topic and MsgUnit    * <p />    * @return the &lt;key oid='...'>    */   public String getUniqueKey() {      return uniqueKey;   }   /**    * @return The key data of this topic (not DOM parsed) or null of not yet known    */   public MsgKeyData getMsgKeyData() {      return this.msgKeyData;   }   /**    * What is the MIME type of this message content?    * <p />    * @return the MIME type of the MsgUnit.content or null if not known    */   public String getContentMime() {      return (this.msgKeyData != null) ? this.msgKeyData.getContentMime() : null;   }   public String getContentMimeExtended() {      return (this.msgKeyData != null) ? this.msgKeyData.getContentMimeExtended() : null;   }   /**    * Access the raw CORBA msgUnit    * @return MsgUnit object   public MsgUnit getMsgUnit() throws XmlBlasterException {      return getMsgUnitWrapper().getMsgUnit();   }    */   /**    * Send updates to all subscribed clients.    * <p />    * @param publisherSessionInfo The sessionInfo of the publisher or null if not known or not online    */   private final void invokeCallbackAndHandleFailure(SessionInfo publisherSessionInfo, MsgUnitWrapper msgUnitWrapper) throws XmlBlasterException {      if (msgUnitWrapper == null) {         Thread.dumpStack();         throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "MsgUnitWrapper is null");      }      if (log.isLoggable(Level.FINE)) log.fine(ME+": Going to update dependent clients for " + msgUnitWrapper.getKeyOid() + ", subscriberMap.size() = " + getNumSubscribers());      if (this.distributor != null &&  !msgUnitWrapper.isInternal()) { // if there is a plugin         this.distributor.distribute(msgUnitWrapper);         return;      }      // Take a copy of the map entries (a current snapshot)      // If we would iterate over the map directly we can risk a java.util.ConcurrentModificationException      // when one of the callback fails and the entry is removed by the callback worker thread      SubscriptionInfo[] subInfoArr = getSubscriptionInfoArr();      Set removeSet = null;      for (int ii=0; ii<subInfoArr.length; ii++) {         SubscriptionInfo sub = subInfoArr[ii];         if (!subscriberMayReceiveIt(sub, msgUnitWrapper)) continue;         if (invokeCallback(publisherSessionInfo, sub, msgUnitWrapper, true) < 1) {            if (removeSet == null) removeSet = new HashSet();            removeSet.add(sub); // We can't delete directly since we are in the iterator         }      }      if (removeSet != null) handleCallbackFailed(removeSet);   }   /**    * Checks if it is allowed to send the entry to the callback queue.    * @param publisherSessionInfo    * @param sub    * @return true if it is configured, there is a callback, and the topic is referenced    */   private boolean checkIfAllowedToSend(SessionInfo publisherSessionInfo, SubscriptionInfo sub) {      if (!sub.getSessionInfo().hasCallback()) {         log.warning(ME+": A client which subscribes " + sub.toXml() + " should have a callback server: "                       + sub.getSessionInfo().toXml("", (Properties)null));         Thread.dumpStack();         return false;      }      if (isUnconfigured()) {         log.warning(ME+": invokeCallback() not supported, this MsgUnit was created by a subscribe() and not a publish()");         return false;      }      if (isUnreferenced()) {         log.severe(ME+": PANIC: invoke callback is strange in state 'UNREFERENCED'");         Thread.dumpStack();         return false;      }      return true;   }   /**    * Checks if the filters allow this message to be sent to the specified session    *    * @param publisherSessionInfo    * @param sub    * @param msgUnitWrapper    * @return true if the message is approved to be sent, false otherwise    * @throws XmlBlasterException in case an exception happened when checking the filters.    * This method handles internally the publishing of dead letters in case of a throwable    * and after that it throws this XmlBlasterException to notify the invoked about the    * abnormal flow.    */   public final boolean checkFilter(SessionInfo publisherSessionInfo, SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper, boolean handleException)      throws XmlBlasterException {      AccessFilterQos[] filterQos = sub.getAccessFilterArr();      if (filterQos != null) {         //SubjectInfo publisher = (publisherSessionInfo == null) ? null : publisherSessionInfo.getSubjectInfo();         //SubjectInfo destination = (sub.getSessionInfo() == null) ? null : sub.getSessionInfo().getSubjectInfo();         for (int ii=0; ii<filterQos.length; ii++) {            try {               I_AccessFilter filter = requestBroker.getAccessPluginManager().getAccessFilter(                                         filterQos[ii].getType(), filterQos[ii].getVersion(),                                         getContentMime(), getContentMimeExtended());               if (filter != null && filter.match(sub.getSessionInfo(),                          msgUnitWrapper.getMsgUnit(), filterQos[ii].getQuery()) == false) {                  return false;               }            }            catch (Throwable e) {               // sender =      publisherSessionInfo.getLoginName()               // receiver =    sub.getSessionInfo().getLoginName()               // 1. We just log the situation:               SessionName publisherName = (publisherSessionInfo != null) ? publisherSessionInfo.getSessionName() :                                  msgUnitWrapper.getMsgQosData().getSender();               String reason = "Mime access filter '" + filterQos[ii].getType() + "' for message '" +                         msgUnitWrapper.getLogId() + "' from sender '" + publisherName + "' to subscriber '" +                         sub.getSessionInfo().getSessionName() + "' threw an exception, we don't deliver " +                         "the message to the subscriber: " + e.toString();               if (log.isLoggable(Level.FINE)) log.fine(ME+": "+reason);               if (handleException) {                  MsgQueueEntry[] entries = {                       new MsgQueueUpdateEntry(serverScope, msgUnitWrapper, sub.getMsgQueue().getStorageId(),                                   sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(),                                   sub.getSubscribeQosServer().getWantUpdateOneway()) };                  requestBroker.deadMessage(entries, null, reason);               }               // 2. This error handling is wrong as the plugin should not invalidate the subscribe:               //sub.getSessionInfo().getDispatchManager().internalError(e); // calls MsgErrorHandler               // 3. This error handling is wrong as we handle a subscribe and not a publish:               /*               MsgQueueEntry entry =                    new MsgQueueUpdateEntry(serverScope, msgUnitWrapper, sub.getMsgQueue().getStorageId(),                                sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(),                                sub.getSubscribeQosServer().getWantUpdateOneway());               publisherSessionInfo.getMsgErrorHandler().handleError(new MsgErrorInfo(serverScope, entry, null, e));               */               //retCount++;               throw new XmlBlasterException(this.serverScope, ErrorCode.INTERNAL_UNKNOWN, ME , "checkFilter: " + reason);            }         }      } // if filterQos      return true;   }   /**    * Checks if the subscriber is a cluster and the message has the 'dirtyRead' flag set.    * @param sub    * @param msgQosData    * @return true if dirtyRead is set, false otherwise.    */   public static boolean isDirtyRead(SubscriptionInfo sub, MsgUnitWrapper msgUnitWrapper)      throws XmlBlasterException {      MsgQosData msgQosData = msgUnitWrapper.getMsgQosData();      if (sub.getSessionInfo().getSubjectInfo().isCluster()) {         if (log.isLoggable(Level.FINEST)) log.finest("TopicHandler: Slave node '" + sub.getSessionInfo() + "' has dirty read message '" + msgUnitWrapper.toXml());         if (msgQosData.dirtyRead(sub.getSessionInfo().getSubjectInfo().getNodeId())) {            if (log.isLoggable(Level.FINE)) log.fine("TopicHandler: Slave node '" + sub.getSessionInfo() + "' has dirty read message '" + sub.getSubscriptionId() + "', '" + sub.getKeyData().getOid() + "' we don't need to send it back");            return true;         }      }      return false;   }   public static final MsgQueueUpdateEntry createEntryFromWrapper(MsgUnitWrapper msgUnitWrapper, SubscriptionInfo sub)      throws XmlBlasterException {      return new MsgQueueUpdateEntry(msgUnitWrapper.getServerScope(), msgUnitWrapper, sub.getMsgQueue().getStorageId(),               sub.getSessionInfo().getSessionName(), sub.getSubSourceSubscriptionId(),               sub.getSubscribeQosServer().getWantUpdateOneway());   }   /**    * Put the message into the callback queue of the subscribed client (Pub/Sub mode only).    * @param publisherSessionInfo The sessionInfo of the publisher or null if not known or not online    * @param sub The subscription handle of the client    * @return -1 in case it was not able to complete the invocation due to an incorrect status (for example    * if it is unconfigured, unreferenced or if the session has no callback). Returns 0 if it was not able    * to complete the request even if the status was OK, 1 if successful.    * Never throws an exception.    * Returning -1 tells the invoker not to continue with these invocations (performance)    */   private final int invokeCallback(SessionInfo publisherSessionInfo, SubscriptionInfo sub,      MsgUnitWrapper msgUnitWrapper, boolean doErrorHandling) {      if (!checkIfAllowedToSend(publisherSessionInfo, sub)) return -1;      if (msgUnitWrapper == null) {         log.severe(ME+": invokeCallback() MsgUnitWrapper is null: " +                       ((publisherSessionInfo != null) ? publisherSessionInfo.toXml() + "\n" : "") +                       ((sub != null) ? sub.toXml() + "\n" : "") +                       ((this.historyQueue != null) ? this.historyQueue.toXml("") : ""));         Thread.dumpStack();         return 0;         //throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_ILLEGALARGUMENT, ME, "MsgUnitWrapper is null");      }      try {         if (isDirtyRead(sub, msgUnitWrapper)) return 1;         try {            if (!checkFilter(publisherSessionInfo, sub, msgUnitWrapper, true)) return 1;         }         catch (XmlBlasterException ex) {            if (log.isLoggable(Level.FINEST)) log.finest(ex.getMessage());            return 0;         }         if (log.isLoggable(Level.FINER)) log.finer(ME+": pushing update() mess

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -