⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topichandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      StatusQosData qos = new StatusQosData(serverScope, MethodName.PUBLISH);      qos.setKeyOid(this.uniqueKey);      qos.setState(Constants.STATE_OK);      qos.setRcvTimestamp(publishQosServer.getRcvTimestamp());      publishReturnQos = new PublishReturnQos(serverScope, qos);      MsgKeyData msgKeyData = (MsgKeyData)msgUnit.getKeyData();      msgQosData = (MsgQosData)msgUnit.getQosData();      /* Happens in RequestBroker already      if (msgQosData.getSender() == null) {         msgQosData.setSender(publisherSessionInfo.getSessionName());      }      */      // Do a log.warning if topic meta XML is different      if (!clientTagLog) {         try {            XmlKey xmlKey = this.xmlKey;            if (xmlKey != null) {               String newTags = msgKeyData.getClientTags();               if (newTags != null && newTags.length() > 0) {                  String oldTags = ((MsgKeyData)xmlKey.getKeyData()).getClientTags();                  if (!newTags.equals(oldTags)) {                     log.warning(ME+": Changing topic meta information from '" + oldTags + "' to '" + newTags + "' is not supported and this change is ignored, please check your publisher.");                     clientTagLog = true;                  }               }            }         }         catch (Throwable e) {            e.printStackTrace();            log.severe(ME+": Ignoring unexpected exception during meta info check:" + e.toString());         }      }      if (msgQosData.isAdministrative()) {         if ( isUnconfigured() || isSoftErased() || allowedToReconfigureTopicAndFixWrongLimits(msgQosData)) {            administrativeInitialize(msgKeyData, msgQosData, publishQosServer);            if (!msgQosData.isFromPersistenceStore()) {               msgQosData.setAdministrative(true);               msgQosData.setRcvTimestamp(this.topicEntry.getMsgQosData().getRcvTimestamp());               msgQosData.setPersistent(true);               this.topicEntry.setMsgUnit(msgUnit);               this.requestBroker.changePersistentTopicHandler(this.topicEntry);            }         }         else {            log.warning(ME+": Sorry we are in state '" + getStateStr() + "', reconfiguring TopicHandler is not yet supported, we ignore the reconfiguration request");         }         if (this.handlerIsNewCreated) {            this.handlerIsNewCreated = false;            // Check all known query subscriptions if the new message fits as well (does it only if TopicHandler is new)            serverScope.getRequestBroker().checkExistingSubscriptions(publisherSessionInfo, this, publishQosServer);         }         if (msgQosData.isFromPersistenceStore()) {            log.info(ME+": Topic is successfully recovered from persistency to state " + getStateStr() +                     //((requestBroker.getTopicStore()!=null) ? (" '" + requestBroker.getTopicStore().getStorageId() + "'") : "") +                     " with " + getNumOfHistoryEntries() + " history entries (" + getNumOfCacheEntries() + " currently referenced msgUnits are loaded).");         }         else {            log.info(ME+": Topic is successfully configured by administrative message.");         }         publishReturnQos.getData().setStateInfo("Administrative configuration request handled");         return publishReturnQos;      }      if (!this.administrativeInitialize) {         administrativeInitialize(msgKeyData, msgQosData, publishQosServer);      }      if (!isAlive()) {         toAlive();      }      if (this.handlerIsNewCreated) {         // Check all known query subscriptions if the new message fits as well (does it only if TopicHandler is new)         serverScope.getRequestBroker().checkExistingSubscriptions(publisherSessionInfo, this, publishQosServer);         this.handlerIsNewCreated = false;      }      int initialCounter = 1; // Force referenceCount until update queues are filled (volatile messages)      MsgUnitWrapper msgUnitWrapper = null;      try { // finally         boolean changed = true;         final boolean isInvisiblePtp = publishQosServer.isPtp() && !publishQosServer.isSubscribable();         final boolean addToHistoryQueue = this.historyQueue != null && !isInvisiblePtp;         if (!isInvisiblePtp) {  // readonly is only checked for Pub/Sub?            if (this.topicProperty.isReadonly() && hasHistoryEntries()) {               log.warning(ME+": Sorry, published message '" + msgKeyData.getOid() + "' rejected, topic is readonly.");               throw new XmlBlasterException(serverScope, ErrorCode.USER_PUBLISH_READONLY, ME, "Sorry, published message '" + msgKeyData.getOid() + "' rejected, topic is readonly.");            }         }         msgUnitWrapper = new MsgUnitWrapper(serverScope, msgUnit, this.msgUnitCache, initialCounter, 0, -1);         if (!isAlive()) {             toAlive();         }         // Forcing RAM entry temporary (reset in finally below) to avoid performance critical harddisk IO during initialization, every callback/subject/history queue put()/take() is changing the reference counter of MsgUnitWrapper. For persistent messages this needs to be written to harddisk         // If the server crashed during this RAM operation it is not critical as the publisher didn't get an ACK yet         synchronized(this.msgUnitWrapperUnderConstructionMutex) {            // A queue (e.g. callback queue) could swap its entry and reload it during this initialization phase,            // in this case we need to assure that it receives our RAM based MsgUnitWrapper (with all current settings)            // in case it changes the referenceCounter            this.msgUnitWrapperUnderConstruction = msgUnitWrapper;         }         try {            // marker if we are working on the history queue            // A historyQueue.take() could trigger entry.remove() -> topicHandler.entryDestroyed() -> toUnreferenced() ==> The topic would be invalid for the current publish            this.isHistoryHandling = true;            if (addToHistoryQueue && msgUnitWrapper.hasRemainingLife()) { // no sense to remember               if (msgQosData.isForceUpdate() == false && hasHistoryEntries()) {                  MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)this.historyQueue.peek();                  if (entry != null) {                     MsgUnitWrapper old = entry.getMsgUnitWrapper();                     if (old != null) {                        changed = !old.getMsgUnit().sameContent(msgUnit.getContent());                     }                  }               }               try { // Cleanup if persistent queue was temporary unavailable                  long numHist = getNumOfHistoryEntries();                  if (numHist > 1L && numHist > this.historyQueue.getMaxNumOfEntries()) {                     long count = numHist-this.historyQueue.getMaxNumOfEntries();                     // TODO: Implement count>1 in takeLowest():                     ArrayList entryList = this.historyQueue.takeLowest((int)count, -1L, null, false);                     if (entryList.size() != count) {                        log.severe(ME+": Can't remove expected entry, entryList.size()=" + entryList.size() + ": " + this.historyQueue.toXml(""));                     }                  }               }               catch (XmlBlasterException e) {                  log.severe(ME+": History queue take() problem: " + e.getMessage());               }               try { // increments reference counter += 1                  this.historyQueue.put(new MsgQueueHistoryEntry(serverScope, msgUnitWrapper, this.historyQueue.getStorageId()), I_Queue.USE_PUT_INTERCEPTOR);               }               catch (XmlBlasterException e) {                  log.severe(ME+": History queue put() problem: " + e.getMessage());               }               try {                  long numHist = getNumOfHistoryEntries();                  if (numHist > 1L && numHist > this.historyQueue.getMaxNumOfEntries()) {                     ArrayList entryList = this.historyQueue.takeLowest(1, -1L, null, false);                     if (entryList.size() != 1) {                        throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, ME,                              "Can't remove expected entry, entryList.size()=" + entryList.size() + ": " + this.historyQueue.toXml(""));                     }                     MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)entryList.get(0);                     if (log.isLoggable(Level.FINE)) { if (!entry.isInternal()) log.fine(ME+": Removed oldest entry in history queue."); }                  }               }               catch (XmlBlasterException e) {                  log.severe(ME+": History queue take() problem: " + e.getMessage());               }            }         }         finally {            this.isHistoryHandling = false;         }         // NOTE: Putting entries into callback queues must be outside of a synchronized(topicHandler) to avoid deadlock         //       The DispatchWorker removes a MsgUnitWrapper entry from the msgstore (see entryDestroyed()) and would deadlock         //       This is currently addressed as the MsgUnitWrapper.lookup is a dirty read on the topicHandler         //----- 2a. now we can send updates to all destination clients:         if (publishQosServer.isPtp()) {            /*publishReturnQos =*/ forwardToDestinations(publisherSessionInfo, msgUnitWrapper, publishQosServer);            if (!publishQosServer.isSubscribable()) {               publishReturnQos.getData().setStateInfo("PtP request handled");               return publishReturnQos;            }         }         //----- 2b. now we can send updates to all subscribed clients:         if (log.isLoggable(Level.FINE)) log.fine(ME+": Message " + msgUnit.getLogId() + " handled, now we can send updates to all interested clients.");         if (changed || msgQosData.isForceUpdate()) { // if the content changed of the publisher forces updates ...            invokeCallbackAndHandleFailure(publisherSessionInfo, msgUnitWrapper);         }         msgUnitWrapper.startExpiryTimer();      }      catch (XmlBlasterException e) {         log.severe(ME+": "+e.getMessage());         throw e;      }      catch (Throwable e) {         log.severe(ME+": "+e.toString());         e.printStackTrace();         throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, "TopicHandler", "", e);      }      finally {         if (msgUnitWrapper != null) {            try {               // Event to check if counter == 0 to remove cache entry again (happens e.g. for volatile msg without a no subscription)               // MsgUnitWrapper calls topicEntry.destroyed(MsgUnitWrapper) if it is in destroyed state               if (initialCounter != 0) {                  msgUnitWrapper.setReferenceCounter((-1)*initialCounter);               }               if (!msgUnitWrapper.isDestroyed()) {                  this.msgUnitCache.put(msgUnitWrapper);               }            }            finally {               synchronized(this.msgUnitWrapperUnderConstructionMutex) {                  this.msgUnitWrapperUnderConstruction = null;               }            }         }      }      return publishReturnQos;   }   /**    * Check if the MsgUnitWrapper is owned by the TopicHandler (during construction).    * NOTE: You need to synchronize this call over msgUnitCache    */   boolean isInMsgStore(MsgUnitWrapper msgUnitWrapper) {      synchronized(this.msgUnitWrapperUnderConstructionMutex) {         return this.msgUnitWrapperUnderConstruction == null || this.msgUnitWrapperUnderConstruction.getUniqueId() != msgUnitWrapper.getUniqueId();         //return !this.msgUnitWrapperUnderConstruction.containsKey(new Long(msgUnitWrapper.getUniqueId()));      }   }   /**    * Forward PtP messages.    * TODO: On exception continue to other destinations and return the    *       successful/not-successful destinations in PublishReturnQos!!!    */   private void forwardToDestinations(SessionInfo publisherSessionInfo,      MsgUnitWrapper cacheEntry, PublishQosServer publishQos)      throws XmlBlasterException {      // NOTE: cluster forwarded PtP destinations are removed already from this list:      Destination[] destinationArr = publishQos.getDestinationArr(); // !!! add XPath client query here !!!      Authenticate authenticate = this.requestBroker.getAuthenticate();      //-----    Send message to every destination client      for (int ii = 0; ii < destinationArr.length; ii++) {         Destination destination = destinationArr[ii];         if (log.isLoggable(Level.FINE)) log.fine(ME+": Working on PtP message for destination [" + destination.getDestination() + "]");         SessionName destinationSessionName = destination.getDestination();         boolean destinationIsSession = destinationSessionName.isSession();         boolean forceQueing = destination.forceQueuing();         boolean wantsPtP = true; // TODO if destination never has logged in spam would be possible!         SubjectInfo destinationClient = null;         // Handle PtP to subject in a thread safe manner         if (!destinationIsSession) { // -> subject            // 3 + 6 (force queing ignored since same reaction for both)            destinationClient = authenticate.getSubjectInfoByName(destination.getDestination());            if (!forceQueing && destinationClient==null) {               String tmp = ME+": Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the destination is unkown, the message rejected.";               log.warning(tmp);               throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_UNKNOWNDESTINATION, ME, tmp +                   " Client is not logged in and <destination forceQueuing='true'> is not set");            }            if (log.isLoggable(Level.FINE)) log.fine(ME+": Queuing PtP message '" + cacheEntry.getLogId() + "' for subject destination [" + destination.getDestination() + "], forceQueing="+forceQueing);            // We are responsible to call destinationClient.getLock().release()            final boolean returnLocked = true;            destinationClient = authenticate.getOrCreateSubjectInfoByName(destination.getDestination(), returnLocked, null, null);            try {               MsgQueueUpdateEntry msgEntrySubject = new MsgQueueUpdateEntry(serverScope, cacheEntry,                        destinationClient.getSubjectQueue().getStorageId(), destination.getDestination(),                        Constants.SUBSCRIPTIONID_PtP, false);               destinationClient.queueMessage(msgEntrySubject);               continue;            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -