📄 topichandler.java
字号:
long maxEntriesStore = this.topicProperty.getMsgUnitStoreProperty().getMaxEntries(); String store = (maxEntriesStore > 0) ? "persistence/msgUnitStore/maxEntries="+maxEntriesStore : "message storage is switched off with persistence/msgUnitStore/maxEntries=0"; log.info(ME+": New topic '" + this.msgKeyData.getOid() + "' is ready, " + hist + ", " + store); } this.administrativeInitialize = true; } /** * This cache stores the 'real meat' (the MsgUnit data struct) */ private void startupMsgstore() throws XmlBlasterException { MsgUnitStoreProperty msgUnitStoreProperty = this.topicProperty.getMsgUnitStoreProperty(); if (this.msgUnitCache == null) { String type = msgUnitStoreProperty.getType(); String version = msgUnitStoreProperty.getVersion(); // ContextNode syntax: "/node/heron/topic/hello" (similar to callback queue) // instead of "msgUnitStore:heron_hello" // This change would be nice but then existing entries on restart wouldn't be found // This syntax is also used in RequestBroker:checkConsistency to reverse lookup the TopicHandler by a given I_Map StorageId msgUnitStoreId = new StorageId(Constants.RELATING_MSGUNITSTORE, serverScope.getNodeId()+"/"+getUniqueKey()); this.msgUnitCache = serverScope.getStoragePluginManager().getPlugin(type, version, msgUnitStoreId, msgUnitStoreProperty); //this.msgUnitCache = new org.xmlBlaster.engine.msgstore.ram.MapPlugin(); if (this.msgUnitCache == null) { throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, ME, "Can't load msgUnitStore persistence plugin [" + type + "][" + version + "]"); } } else { log.info(ME+": Reconfiguring message store."); this.msgUnitCache.setProperties(msgUnitStoreProperty); } } /** * Should be invoked delayed as soon as TopicHandler instance is created an registered everywhere * as we ask the msgUnitStore for the real messages if some history entries existed. * <p> * NOTE: queue can be null if maxEntries=0 is configured * </p> * <p> * This history queue entries hold weak references to the msgUnitCache entries * </p> */ private void startupHistoryQueue() throws XmlBlasterException { this.historyQueue = initQueue(this.historyQueue, "history"); } /** * Creates a queue with the properties specified in the historyQueueProperty * @param queue the queue instance (if already existing or null otherwise) * @param queueName The name to be given as Id to this queue * @return returns the instance of the queue * @throws XmlBlasterException */ private I_Queue initQueue(I_Queue queue, String queueName) throws XmlBlasterException { QueuePropertyBase prop = this.topicProperty.getHistoryQueueProperty(); if (queue == null) { if (prop.getMaxEntries() > 0L) { String type = prop.getType(); String version = prop.getVersion(); StorageId queueId = new StorageId(queueName, serverScope.getNodeId()+"/"+getUniqueKey()); queue = serverScope.getQueuePluginManager().getPlugin(type, version, queueId, prop); queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting } else { if (log.isLoggable(Level.FINE)) log.fine(ME+": " + queueName + " queuing of this topic is switched off with maxEntries=0"); } } else { if (prop.getMaxEntries() > 0L) { log.info(ME+": Reconfiguring " + queueName + " queue."); queue.setProperties(prop); } else { log.warning(ME+": Destroying " + queueName + " queue with " + queue.getNumOfEntries() + " entries because of new configuration with maxEntries=0"); queue.clear(); queue.shutdown(); queue = null; } } return queue; } // JMX does not allow hasXY public boolean getDomTreeExists() { return hasDomTree(); } /** * @return false if topicProperty.isCreateDomEntry() was configured to false */ public boolean hasDomTree() { if (this.topicProperty == null) { return false; } return this.topicProperty.createDomEntry(); } public void finalize() { if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collect " + getId()); } public RequestBroker getRequestBroker() { return this.requestBroker; } /** * Check if there is a valid DOM parsed XML key available * @return false in state UNCONFIGURED */ public final boolean hasXmlKey() { return this.msgKeyData != null; } // JMX public final boolean getTopicXmlExists() { return hasXmlKey(); } // JMX public final String getTopicXml() throws org.xmlBlaster.util.XmlBlasterException { return getXmlKey().literal(); } /** * Accessing the DOM parsed key of this message. * @return Never null * @exception XmlBlasterException in state UNCONFIGURED or on DOM parse problems */ public final XmlKey getXmlKey() throws XmlBlasterException { if (this.msgKeyData == null) { // isUnconfigured()) { throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, getId(), "In state '" + getStateStr() + "' no XmlKey object is available"); } if (this.xmlKey == null) { // expensive DOM parse this.xmlKey = new XmlKey(serverScope, this.msgKeyData); } return this.xmlKey; } /** * Create or access the cached persistence storage entry of this topic. * @return null If no PublishQos is available to create persistent information */ private TopicEntry persistTopicEntry() throws XmlBlasterException { if (this.topicEntry == null) { boolean isNew = false; if (this.topicEntry == null) { if (log.isLoggable(Level.FINE)) log.fine(ME+": Creating TopicEntry to make topic persistent"); if (this.topicProperty==null || this.msgKeyData==null) { log.severe(ME+": Can't create useful TopicEntry in state=" + getStateStr() + " no QoS is available"); return null; } MsgQosData msgQosData = new MsgQosData(serverScope, MethodName.PUBLISH); msgQosData.setTopicProperty(this.topicProperty); msgQosData.setAdministrative(true); msgQosData.touchRcvTimestamp(); msgQosData.setPersistent(true); msgQosData.setSender(creatorSessionName); MsgUnit msgUnit = new MsgUnit(this.msgKeyData, null, msgQosData); this.topicEntry = new TopicEntry(serverScope, msgUnit); isNew = true; if (log.isLoggable(Level.FINE)) log.fine(ME+": Created persistent topicEntry '" + this.topicEntry.getUniqueId() + "'"); //: " + this.topicEntry.toXml()); } if (isNew) { persistTopic(this.topicEntry); } } return this.topicEntry; } /** * @return true if this topicEntry was made persistent */ private boolean persistTopic(TopicEntry entry) { try { if (log.isLoggable(Level.FINE)) log.fine(ME+": Making topicHandler persistent, topicEntry=" + topicEntry.getUniqueId()); int numAdded = this.requestBroker.addPersistentTopicHandler(entry); if (log.isLoggable(Level.FINE)) log.fine(ME+": Persisted " + numAdded + " TopicHandler"); return numAdded>0; } catch (XmlBlasterException e) { log.severe(ME+": Persisting TopicHandler failed, we continue memory based: " + e.getMessage()); } return false; } /** * Currently we support growing and shrinking, note that shrinking is not thoroughly tested. * @param msgQosData The new configuration, can be adjusted by this method if limits are reached * @return true if the change is accepted */ private final boolean allowedToReconfigureTopicAndFixWrongLimits(MsgQosData msgQosData) { if (this.topicProperty == null) return true; TopicProperty topicProps = msgQosData.getTopicProperty(); if (topicProps == null) { log.warning("The TopicProperty is null, not reconfiguring anything"); return false; } MsgUnitStoreProperty msgUnitStoreProps = topicProps.getMsgUnitStoreProperty(); if (msgUnitStoreProps == null) { log.warning("The msgUnitStoreProps are null, not reconfiguring anything"); return false; } MsgUnitStoreProperty currentMsgUnitStoreProps = this.topicProperty.getMsgUnitStoreProperty(); long currentMaxBytes = currentMsgUnitStoreProps.getMaxBytes(); long currentMaxBytesCache = currentMsgUnitStoreProps.getMaxBytesCache(); long currentMaxEntries = currentMsgUnitStoreProps.getMaxEntries(); long currentMaxEntriesCache = currentMsgUnitStoreProps.getMaxEntriesCache(); StringBuffer report = new StringBuffer(1024); // msgUnitStoreProps.getMaxBytesProp().isSet() checks if the value is explicitely set by a client if (msgUnitStoreProps.getMaxBytes() == 0 || !msgUnitStoreProps.getMaxBytesProp().isSet() && currentMaxBytes > msgUnitStoreProps.getMaxBytes()) { report.append("msgUnitStore: 'currentMaxBytes='" + currentMaxBytes + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxBytes() + "' will leave it to '" + currentMaxBytes + "'\n"); msgUnitStoreProps.setMaxBytes(currentMaxBytes); } if (msgUnitStoreProps.getMaxBytesCache() == 0 || !msgUnitStoreProps.getMaxBytesCacheProp().isSet() && currentMaxBytesCache > msgUnitStoreProps.getMaxBytesCache()) { report.append("msgUnitStore: 'currentMaxBytesCache='" + currentMaxBytesCache + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxBytesCache() + "' will leave it to '" + currentMaxBytesCache + "'\n"); msgUnitStoreProps.setMaxBytesCache(currentMaxBytesCache); } if (msgUnitStoreProps.getMaxEntries() == 0 || !msgUnitStoreProps.getMaxEntriesProp().isSet() && currentMaxEntries > msgUnitStoreProps.getMaxEntries()) { report.append("msgUnitStore: 'currentMaxEntries='" + currentMaxEntries + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxEntries() + "' will leave it to '" + currentMaxEntries + "'\n"); msgUnitStoreProps.setMaxEntries(currentMaxEntries); } if (msgUnitStoreProps.getMaxEntriesCache() == 0 || !msgUnitStoreProps.getMaxEntriesCacheProp().isSet() && currentMaxEntriesCache > msgUnitStoreProps.getMaxEntriesCache()) { report.append("msgUnitStore: 'currentMaxEntriesCache='" + currentMaxEntriesCache + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxEntriesCache() + "' will leave it to '" + currentMaxEntriesCache + "'\n"); msgUnitStoreProps.setMaxEntriesCache(currentMaxEntriesCache); } log.info("new msgUnitStore Props: " + msgUnitStoreProps.toXml()); HistoryQueueProperty historyProps = topicProps.getHistoryQueueProperty(); if (historyProps != null) { HistoryQueueProperty currentHistoryProps = this.topicProperty.getHistoryQueueProperty(); currentMaxBytes = currentHistoryProps.getMaxBytes(); currentMaxBytesCache = currentHistoryProps.getMaxBytesCache(); currentMaxEntries = currentHistoryProps.getMaxEntries(); currentMaxEntriesCache = currentHistoryProps.getMaxEntriesCache(); if (!historyProps.getMaxBytesProp().isSet() && currentMaxBytes > historyProps.getMaxBytes()) { report.append("history: 'currentMaxBytes='" + currentMaxBytes + "' > than what publish proposed: '" + historyProps.getMaxBytes() + "' will leave it to '" + currentMaxBytes + "'\n"); historyProps.setMaxBytes(currentMaxBytes); } if (!historyProps.getMaxBytesCacheProp().isSet() && currentMaxBytesCache > historyProps.getMaxBytesCache()) { report.append("history: 'currentMaxBytesCache='" + currentMaxBytesCache + "' > than what publish proposed: '" + historyProps.getMaxBytesCache() + "' will leave it to '" + currentMaxBytesCache + "'\n"); historyProps.setMaxBytesCache(currentMaxBytesCache); } if (!historyProps.getMaxEntriesProp().isSet() && currentMaxEntries > historyProps.getMaxEntries()) { report.append("history: 'currentMaxEntries='" + currentMaxEntries + "' > than what publish proposed: '" + historyProps.getMaxEntries() + "' will leave it to '" + currentMaxEntries + "'\n"); historyProps.setMaxEntries(currentMaxEntries); } if (!historyProps.getMaxEntriesCacheProp().isSet() && currentMaxEntriesCache > historyProps.getMaxEntriesCache()) { report.append("history: 'currentMaxEntriesCache='" + currentMaxEntriesCache + "' > than what publish proposed: '" + historyProps.getMaxEntriesCache() + "' will leave it to '" + currentMaxEntriesCache + "'\n"); historyProps.setMaxEntriesCache(currentMaxEntriesCache); } log.info("new history Props: " + historyProps.toXml()); } log.info(report.toString()); return true; } /** * A new publish event (PubSub or PtP) arrives. * <br /> * Publish filter plugin checks are done already<br /> * Cluster forwards are done already. * * @param publisherSessionInfo The publisher * @param msgUnit The new message * @param publishQosServer The decorator for msgUnit.getQosData() * * @return not null for PtP messages */ public PublishReturnQos publish(SessionInfo publisherSessionInfo, MsgUnit msgUnit, PublishQosServer publishQosServer) throws XmlBlasterException { if (log.isLoggable(Level.FINE)) log.fine(ME+": publish() publisherSessionInfo '" + publisherSessionInfo.getId() + "', message '" + msgUnit.getLogId() + "' ..."); PublishReturnQos publishReturnQos = null; MsgQosData msgQosData = null;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -