📄 topichandler.java
字号:
StatusQosData qos = new StatusQosData(serverScope, MethodName.PUBLISH); qos.setKeyOid(this.uniqueKey); qos.setState(Constants.STATE_OK); qos.setRcvTimestamp(publishQosServer.getRcvTimestamp()); publishReturnQos = new PublishReturnQos(serverScope, qos); MsgKeyData msgKeyData = (MsgKeyData)msgUnit.getKeyData(); msgQosData = (MsgQosData)msgUnit.getQosData(); /* Happens in RequestBroker already if (msgQosData.getSender() == null) { msgQosData.setSender(publisherSessionInfo.getSessionName()); } */ // Do a log.warning if topic meta XML is different if (!clientTagLog) { try { XmlKey xmlKey = this.xmlKey; if (xmlKey != null) { String newTags = msgKeyData.getClientTags(); if (newTags != null && newTags.length() > 0) { String oldTags = ((MsgKeyData)xmlKey.getKeyData()).getClientTags(); if (!newTags.equals(oldTags)) { log.warning(ME+": Changing topic meta information from '" + oldTags + "' to '" + newTags + "' is not supported and this change is ignored, please check your publisher."); clientTagLog = true; } } } } catch (Throwable e) { e.printStackTrace(); log.severe(ME+": Ignoring unexpected exception during meta info check:" + e.toString()); } } if (msgQosData.isAdministrative()) { if ( isUnconfigured() || isSoftErased() || allowedToReconfigureTopicAndFixWrongLimits(msgQosData)) { administrativeInitialize(msgKeyData, msgQosData, publishQosServer); if (!msgQosData.isFromPersistenceStore()) { msgQosData.setAdministrative(true); msgQosData.setRcvTimestamp(this.topicEntry.getMsgQosData().getRcvTimestamp()); msgQosData.setPersistent(true); this.topicEntry.setMsgUnit(msgUnit); this.requestBroker.changePersistentTopicHandler(this.topicEntry); } } else { log.warning(ME+": Sorry we are in state '" + getStateStr() + "', reconfiguring TopicHandler is not yet supported, we ignore the reconfiguration request"); } if (this.handlerIsNewCreated) { this.handlerIsNewCreated = false; // Check all known query subscriptions if the new message fits as well (does it only if TopicHandler is new) serverScope.getRequestBroker().checkExistingSubscriptions(publisherSessionInfo, this, publishQosServer); } if (msgQosData.isFromPersistenceStore()) { log.info(ME+": Topic is successfully recovered from persistency to state " + getStateStr() + //((requestBroker.getTopicStore()!=null) ? (" '" + requestBroker.getTopicStore().getStorageId() + "'") : "") + " with " + getNumOfHistoryEntries() + " history entries (" + getNumOfCacheEntries() + " currently referenced msgUnits are loaded)."); } else { log.info(ME+": Topic is successfully configured by administrative message."); } publishReturnQos.getData().setStateInfo("Administrative configuration request handled"); return publishReturnQos; } if (!this.administrativeInitialize) { administrativeInitialize(msgKeyData, msgQosData, publishQosServer); } if (!isAlive()) { toAlive(); } if (this.handlerIsNewCreated) { // Check all known query subscriptions if the new message fits as well (does it only if TopicHandler is new) serverScope.getRequestBroker().checkExistingSubscriptions(publisherSessionInfo, this, publishQosServer); this.handlerIsNewCreated = false; } int initialCounter = 1; // Force referenceCount until update queues are filled (volatile messages) MsgUnitWrapper msgUnitWrapper = null; try { // finally boolean changed = true; final boolean isInvisiblePtp = publishQosServer.isPtp() && !publishQosServer.isSubscribable(); final boolean addToHistoryQueue = this.historyQueue != null && !isInvisiblePtp; if (!isInvisiblePtp) { // readonly is only checked for Pub/Sub? if (this.topicProperty.isReadonly() && hasHistoryEntries()) { log.warning(ME+": Sorry, published message '" + msgKeyData.getOid() + "' rejected, topic is readonly."); throw new XmlBlasterException(serverScope, ErrorCode.USER_PUBLISH_READONLY, ME, "Sorry, published message '" + msgKeyData.getOid() + "' rejected, topic is readonly."); } } msgUnitWrapper = new MsgUnitWrapper(serverScope, msgUnit, this.msgUnitCache, initialCounter, 0, -1); if (!isAlive()) { toAlive(); } // Forcing RAM entry temporary (reset in finally below) to avoid performance critical harddisk IO during initialization, every callback/subject/history queue put()/take() is changing the reference counter of MsgUnitWrapper. For persistent messages this needs to be written to harddisk // If the server crashed during this RAM operation it is not critical as the publisher didn't get an ACK yet synchronized(this.msgUnitWrapperUnderConstructionMutex) { // A queue (e.g. callback queue) could swap its entry and reload it during this initialization phase, // in this case we need to assure that it receives our RAM based MsgUnitWrapper (with all current settings) // in case it changes the referenceCounter this.msgUnitWrapperUnderConstruction = msgUnitWrapper; } try { // marker if we are working on the history queue // A historyQueue.take() could trigger entry.remove() -> topicHandler.entryDestroyed() -> toUnreferenced() ==> The topic would be invalid for the current publish this.isHistoryHandling = true; if (addToHistoryQueue && msgUnitWrapper.hasRemainingLife()) { // no sense to remember if (msgQosData.isForceUpdate() == false && hasHistoryEntries()) { MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)this.historyQueue.peek(); if (entry != null) { MsgUnitWrapper old = entry.getMsgUnitWrapper(); if (old != null) { changed = !old.getMsgUnit().sameContent(msgUnit.getContent()); } } } try { // Cleanup if persistent queue was temporary unavailable long numHist = getNumOfHistoryEntries(); if (numHist > 1L && numHist > this.historyQueue.getMaxNumOfEntries()) { long count = numHist-this.historyQueue.getMaxNumOfEntries(); // TODO: Implement count>1 in takeLowest(): ArrayList entryList = this.historyQueue.takeLowest((int)count, -1L, null, false); if (entryList.size() != count) { log.severe(ME+": Can't remove expected entry, entryList.size()=" + entryList.size() + ": " + this.historyQueue.toXml("")); } } } catch (XmlBlasterException e) { log.severe(ME+": History queue take() problem: " + e.getMessage()); } try { // increments reference counter += 1 this.historyQueue.put(new MsgQueueHistoryEntry(serverScope, msgUnitWrapper, this.historyQueue.getStorageId()), I_Queue.USE_PUT_INTERCEPTOR); } catch (XmlBlasterException e) { log.severe(ME+": History queue put() problem: " + e.getMessage()); } try { long numHist = getNumOfHistoryEntries(); if (numHist > 1L && numHist > this.historyQueue.getMaxNumOfEntries()) { ArrayList entryList = this.historyQueue.takeLowest(1, -1L, null, false); if (entryList.size() != 1) { throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, ME, "Can't remove expected entry, entryList.size()=" + entryList.size() + ": " + this.historyQueue.toXml("")); } MsgQueueHistoryEntry entry = (MsgQueueHistoryEntry)entryList.get(0); if (log.isLoggable(Level.FINE)) { if (!entry.isInternal()) log.fine(ME+": Removed oldest entry in history queue."); } } } catch (XmlBlasterException e) { log.severe(ME+": History queue take() problem: " + e.getMessage()); } } } finally { this.isHistoryHandling = false; } // NOTE: Putting entries into callback queues must be outside of a synchronized(topicHandler) to avoid deadlock // The DispatchWorker removes a MsgUnitWrapper entry from the msgstore (see entryDestroyed()) and would deadlock // This is currently addressed as the MsgUnitWrapper.lookup is a dirty read on the topicHandler //----- 2a. now we can send updates to all destination clients: if (publishQosServer.isPtp()) { /*publishReturnQos =*/ forwardToDestinations(publisherSessionInfo, msgUnitWrapper, publishQosServer); if (!publishQosServer.isSubscribable()) { publishReturnQos.getData().setStateInfo("PtP request handled"); return publishReturnQos; } } //----- 2b. now we can send updates to all subscribed clients: if (log.isLoggable(Level.FINE)) log.fine(ME+": Message " + msgUnit.getLogId() + " handled, now we can send updates to all interested clients."); if (changed || msgQosData.isForceUpdate()) { // if the content changed of the publisher forces updates ... invokeCallbackAndHandleFailure(publisherSessionInfo, msgUnitWrapper); } msgUnitWrapper.startExpiryTimer(); } catch (XmlBlasterException e) { log.severe(ME+": "+e.getMessage()); throw e; } catch (Throwable e) { log.severe(ME+": "+e.toString()); e.printStackTrace(); throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, "TopicHandler", "", e); } finally { if (msgUnitWrapper != null) { try { // Event to check if counter == 0 to remove cache entry again (happens e.g. for volatile msg without a no subscription) // MsgUnitWrapper calls topicEntry.destroyed(MsgUnitWrapper) if it is in destroyed state if (initialCounter != 0) { msgUnitWrapper.setReferenceCounter((-1)*initialCounter); } if (!msgUnitWrapper.isDestroyed()) { this.msgUnitCache.put(msgUnitWrapper); } } finally { synchronized(this.msgUnitWrapperUnderConstructionMutex) { this.msgUnitWrapperUnderConstruction = null; } } } } return publishReturnQos; } /** * Check if the MsgUnitWrapper is owned by the TopicHandler (during construction). * NOTE: You need to synchronize this call over msgUnitCache */ boolean isInMsgStore(MsgUnitWrapper msgUnitWrapper) { synchronized(this.msgUnitWrapperUnderConstructionMutex) { return this.msgUnitWrapperUnderConstruction == null || this.msgUnitWrapperUnderConstruction.getUniqueId() != msgUnitWrapper.getUniqueId(); //return !this.msgUnitWrapperUnderConstruction.containsKey(new Long(msgUnitWrapper.getUniqueId())); } } /** * Forward PtP messages. * TODO: On exception continue to other destinations and return the * successful/not-successful destinations in PublishReturnQos!!! */ private void forwardToDestinations(SessionInfo publisherSessionInfo, MsgUnitWrapper cacheEntry, PublishQosServer publishQos) throws XmlBlasterException { // NOTE: cluster forwarded PtP destinations are removed already from this list: Destination[] destinationArr = publishQos.getDestinationArr(); // !!! add XPath client query here !!! Authenticate authenticate = this.requestBroker.getAuthenticate(); //----- Send message to every destination client for (int ii = 0; ii < destinationArr.length; ii++) { Destination destination = destinationArr[ii]; if (log.isLoggable(Level.FINE)) log.fine(ME+": Working on PtP message for destination [" + destination.getDestination() + "]"); SessionName destinationSessionName = destination.getDestination(); boolean destinationIsSession = destinationSessionName.isSession(); boolean forceQueing = destination.forceQueuing(); boolean wantsPtP = true; // TODO if destination never has logged in spam would be possible! SubjectInfo destinationClient = null; // Handle PtP to subject in a thread safe manner if (!destinationIsSession) { // -> subject // 3 + 6 (force queing ignored since same reaction for both) destinationClient = authenticate.getSubjectInfoByName(destination.getDestination()); if (!forceQueing && destinationClient==null) { String tmp = ME+": Sending PtP message '" + cacheEntry.getLogId() + "' to '" + destination.getDestination() + "' failed, the destination is unkown, the message rejected."; log.warning(tmp); throw new XmlBlasterException(serverScope, ErrorCode.USER_PTP_UNKNOWNDESTINATION, ME, tmp + " Client is not logged in and <destination forceQueuing='true'> is not set"); } if (log.isLoggable(Level.FINE)) log.fine(ME+": Queuing PtP message '" + cacheEntry.getLogId() + "' for subject destination [" + destination.getDestination() + "], forceQueing="+forceQueing); // We are responsible to call destinationClient.getLock().release() final boolean returnLocked = true; destinationClient = authenticate.getOrCreateSubjectInfoByName(destination.getDestination(), returnLocked, null, null); try { MsgQueueUpdateEntry msgEntrySubject = new MsgQueueUpdateEntry(serverScope, cacheEntry, destinationClient.getSubjectQueue().getStorageId(), destination.getDestination(), Constants.SUBSCRIPTIONID_PtP, false); destinationClient.queueMessage(msgEntrySubject); continue; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -