⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 topichandler.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
         long maxEntriesStore = this.topicProperty.getMsgUnitStoreProperty().getMaxEntries();         String store = (maxEntriesStore > 0) ? "persistence/msgUnitStore/maxEntries="+maxEntriesStore : "message storage is switched off with persistence/msgUnitStore/maxEntries=0";         log.info(ME+": New topic '" + this.msgKeyData.getOid() + "' is ready, " + hist + ", " + store);      }      this.administrativeInitialize = true;   }   /**    * This cache stores the 'real meat' (the MsgUnit data struct)    */   private void startupMsgstore() throws XmlBlasterException   {      MsgUnitStoreProperty msgUnitStoreProperty = this.topicProperty.getMsgUnitStoreProperty();      if (this.msgUnitCache == null) {         String type = msgUnitStoreProperty.getType();         String version = msgUnitStoreProperty.getVersion();         // ContextNode syntax: "/node/heron/topic/hello" (similar to callback queue)         // instead of "msgUnitStore:heron_hello"         // This change would be nice but then existing entries on restart wouldn't be found         // This syntax is also used in RequestBroker:checkConsistency to reverse lookup the TopicHandler by a given I_Map         StorageId msgUnitStoreId = new StorageId(Constants.RELATING_MSGUNITSTORE, serverScope.getNodeId()+"/"+getUniqueKey());         this.msgUnitCache = serverScope.getStoragePluginManager().getPlugin(type, version, msgUnitStoreId, msgUnitStoreProperty); //this.msgUnitCache = new org.xmlBlaster.engine.msgstore.ram.MapPlugin();         if (this.msgUnitCache == null) {            throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, ME, "Can't load msgUnitStore persistence plugin [" + type + "][" + version + "]");         }      }      else {         log.info(ME+": Reconfiguring message store.");         this.msgUnitCache.setProperties(msgUnitStoreProperty);      }   }   /**    * Should be invoked delayed as soon as TopicHandler instance is created an registered everywhere    * as we ask the msgUnitStore for the real messages if some history entries existed.    * <p>    * NOTE: queue can be null if maxEntries=0 is configured    * </p>    * <p>    * This history queue entries hold weak references to the msgUnitCache entries    * </p>    */   private void startupHistoryQueue() throws XmlBlasterException {      this.historyQueue = initQueue(this.historyQueue, "history");   }   /**    * Creates a queue with the properties specified in the historyQueueProperty    * @param queue the queue instance (if already existing or null otherwise)    * @param queueName The name to be given as Id to this queue    * @return returns the instance of the queue    * @throws XmlBlasterException    */   private I_Queue initQueue(I_Queue queue, String queueName) throws XmlBlasterException {      QueuePropertyBase prop = this.topicProperty.getHistoryQueueProperty();      if (queue == null) {         if (prop.getMaxEntries() > 0L) {            String type = prop.getType();            String version = prop.getVersion();            StorageId queueId = new StorageId(queueName, serverScope.getNodeId()+"/"+getUniqueKey());            queue = serverScope.getQueuePluginManager().getPlugin(type, version, queueId, prop);            queue.setNotifiedAboutAddOrRemove(true); // Entries are notified to support reference counting         }         else {            if (log.isLoggable(Level.FINE)) log.fine(ME+": " + queueName + " queuing of this topic is switched off with maxEntries=0");         }      }      else {         if (prop.getMaxEntries() > 0L) {            log.info(ME+": Reconfiguring " + queueName + " queue.");            queue.setProperties(prop);         }         else {            log.warning(ME+": Destroying " + queueName + " queue with " + queue.getNumOfEntries() +                         " entries because of new configuration with maxEntries=0");            queue.clear();            queue.shutdown();            queue = null;         }      }      return queue;   }   // JMX does not allow hasXY   public boolean getDomTreeExists() {      return hasDomTree();   }   /**    * @return false if topicProperty.isCreateDomEntry() was configured to false    */   public boolean hasDomTree() {      if (this.topicProperty == null) {         return false;      }      return this.topicProperty.createDomEntry();   }   public void finalize() {      if (log.isLoggable(Level.FINE)) log.fine("finalize - garbage collect " + getId());   }   public RequestBroker getRequestBroker() {      return this.requestBroker;   }   /**    * Check if there is a valid DOM parsed XML key available    * @return false in state UNCONFIGURED    */   public final boolean hasXmlKey() {      return this.msgKeyData != null;   }   // JMX   public final boolean getTopicXmlExists() {      return hasXmlKey();   }   // JMX   public final String getTopicXml() throws org.xmlBlaster.util.XmlBlasterException {      return getXmlKey().literal();   }   /**    * Accessing the DOM parsed key of this message.    * @return Never null    * @exception XmlBlasterException in state UNCONFIGURED or on DOM parse problems    */   public final XmlKey getXmlKey() throws XmlBlasterException {      if (this.msgKeyData == null) { // isUnconfigured()) {         throw new XmlBlasterException(serverScope, ErrorCode.INTERNAL_UNKNOWN, getId(), "In state '" + getStateStr() + "' no XmlKey object is available");      }      if (this.xmlKey == null) {  // expensive DOM parse         this.xmlKey = new XmlKey(serverScope, this.msgKeyData);      }      return this.xmlKey;   }   /**    * Create or access the cached persistence storage entry of this topic.    * @return null If no PublishQos is available to create persistent information    */   private TopicEntry persistTopicEntry() throws XmlBlasterException {      if (this.topicEntry == null) {         boolean isNew = false;         if (this.topicEntry == null) {            if (log.isLoggable(Level.FINE)) log.fine(ME+": Creating TopicEntry to make topic persistent");            if (this.topicProperty==null || this.msgKeyData==null) {               log.severe(ME+": Can't create useful TopicEntry in state=" + getStateStr() + " no QoS is available");               return null;            }            MsgQosData msgQosData = new MsgQosData(serverScope, MethodName.PUBLISH);            msgQosData.setTopicProperty(this.topicProperty);            msgQosData.setAdministrative(true);            msgQosData.touchRcvTimestamp();            msgQosData.setPersistent(true);            msgQosData.setSender(creatorSessionName);            MsgUnit msgUnit = new MsgUnit(this.msgKeyData, null, msgQosData);            this.topicEntry = new TopicEntry(serverScope, msgUnit);            isNew = true;            if (log.isLoggable(Level.FINE)) log.fine(ME+": Created persistent topicEntry '" + this.topicEntry.getUniqueId() + "'"); //: " + this.topicEntry.toXml());         }         if (isNew) {            persistTopic(this.topicEntry);         }      }      return this.topicEntry;   }   /**    * @return true if this topicEntry was made persistent    */   private boolean persistTopic(TopicEntry entry) {      try {         if (log.isLoggable(Level.FINE)) log.fine(ME+": Making topicHandler persistent, topicEntry=" + topicEntry.getUniqueId());         int numAdded = this.requestBroker.addPersistentTopicHandler(entry);         if (log.isLoggable(Level.FINE)) log.fine(ME+": Persisted " + numAdded + " TopicHandler");         return numAdded>0;      }      catch (XmlBlasterException e) {         log.severe(ME+": Persisting TopicHandler failed, we continue memory based: " + e.getMessage());      }      return false;   }   /**    * Currently we support growing and shrinking, note that shrinking is not thoroughly tested.    * @param msgQosData The new configuration, can be adjusted by this method if limits are reached    * @return true if the change is accepted    */   private final boolean allowedToReconfigureTopicAndFixWrongLimits(MsgQosData msgQosData) {      if (this.topicProperty == null)         return true;      TopicProperty topicProps = msgQosData.getTopicProperty();      if (topicProps == null) {         log.warning("The TopicProperty is null, not reconfiguring anything");         return false;      }      MsgUnitStoreProperty msgUnitStoreProps = topicProps.getMsgUnitStoreProperty();      if (msgUnitStoreProps == null) {         log.warning("The msgUnitStoreProps are null, not reconfiguring anything");         return false;      }      MsgUnitStoreProperty currentMsgUnitStoreProps = this.topicProperty.getMsgUnitStoreProperty();      long currentMaxBytes = currentMsgUnitStoreProps.getMaxBytes();      long currentMaxBytesCache = currentMsgUnitStoreProps.getMaxBytesCache();      long currentMaxEntries = currentMsgUnitStoreProps.getMaxEntries();      long currentMaxEntriesCache = currentMsgUnitStoreProps.getMaxEntriesCache();      StringBuffer report = new StringBuffer(1024);      // msgUnitStoreProps.getMaxBytesProp().isSet() checks if the value is explicitely set by a client      if (msgUnitStoreProps.getMaxBytes() == 0 || !msgUnitStoreProps.getMaxBytesProp().isSet() && currentMaxBytes > msgUnitStoreProps.getMaxBytes()) {         report.append("msgUnitStore: 'currentMaxBytes='" + currentMaxBytes + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxBytes() + "' will leave it to '" + currentMaxBytes + "'\n");         msgUnitStoreProps.setMaxBytes(currentMaxBytes);      }      if (msgUnitStoreProps.getMaxBytesCache() == 0 || !msgUnitStoreProps.getMaxBytesCacheProp().isSet() && currentMaxBytesCache > msgUnitStoreProps.getMaxBytesCache()) {         report.append("msgUnitStore: 'currentMaxBytesCache='" + currentMaxBytesCache + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxBytesCache() + "' will leave it to '" + currentMaxBytesCache + "'\n");         msgUnitStoreProps.setMaxBytesCache(currentMaxBytesCache);      }      if (msgUnitStoreProps.getMaxEntries() == 0 || !msgUnitStoreProps.getMaxEntriesProp().isSet() && currentMaxEntries > msgUnitStoreProps.getMaxEntries()) {         report.append("msgUnitStore: 'currentMaxEntries='" + currentMaxEntries + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxEntries() + "' will leave it to '" + currentMaxEntries + "'\n");         msgUnitStoreProps.setMaxEntries(currentMaxEntries);      }      if (msgUnitStoreProps.getMaxEntriesCache() == 0 || !msgUnitStoreProps.getMaxEntriesCacheProp().isSet() && currentMaxEntriesCache > msgUnitStoreProps.getMaxEntriesCache()) {         report.append("msgUnitStore: 'currentMaxEntriesCache='" + currentMaxEntriesCache + "' > than what publish proposed: '" + msgUnitStoreProps.getMaxEntriesCache() + "' will leave it to '" + currentMaxEntriesCache + "'\n");         msgUnitStoreProps.setMaxEntriesCache(currentMaxEntriesCache);      }      log.info("new msgUnitStore Props: " + msgUnitStoreProps.toXml());      HistoryQueueProperty historyProps = topicProps.getHistoryQueueProperty();      if (historyProps != null) {         HistoryQueueProperty currentHistoryProps = this.topicProperty.getHistoryQueueProperty();         currentMaxBytes = currentHistoryProps.getMaxBytes();         currentMaxBytesCache = currentHistoryProps.getMaxBytesCache();         currentMaxEntries = currentHistoryProps.getMaxEntries();         currentMaxEntriesCache = currentHistoryProps.getMaxEntriesCache();         if (!historyProps.getMaxBytesProp().isSet() && currentMaxBytes > historyProps.getMaxBytes()) {            report.append("history: 'currentMaxBytes='" + currentMaxBytes + "' > than what publish proposed: '" + historyProps.getMaxBytes() + "' will leave it to '" + currentMaxBytes + "'\n");            historyProps.setMaxBytes(currentMaxBytes);         }         if (!historyProps.getMaxBytesCacheProp().isSet() && currentMaxBytesCache > historyProps.getMaxBytesCache()) {            report.append("history: 'currentMaxBytesCache='" + currentMaxBytesCache + "' > than what publish proposed: '" + historyProps.getMaxBytesCache() + "' will leave it to '" + currentMaxBytesCache + "'\n");            historyProps.setMaxBytesCache(currentMaxBytesCache);         }         if (!historyProps.getMaxEntriesProp().isSet() && currentMaxEntries > historyProps.getMaxEntries()) {            report.append("history: 'currentMaxEntries='" + currentMaxEntries + "' > than what publish proposed: '" + historyProps.getMaxEntries() + "' will leave it to '" + currentMaxEntries + "'\n");            historyProps.setMaxEntries(currentMaxEntries);         }         if (!historyProps.getMaxEntriesCacheProp().isSet() && currentMaxEntriesCache > historyProps.getMaxEntriesCache()) {            report.append("history: 'currentMaxEntriesCache='" + currentMaxEntriesCache + "' > than what publish proposed: '" + historyProps.getMaxEntriesCache() + "' will leave it to '" + currentMaxEntriesCache + "'\n");            historyProps.setMaxEntriesCache(currentMaxEntriesCache);         }         log.info("new history Props: " + historyProps.toXml());      }      log.info(report.toString());      return true;   }   /**    * A new publish event (PubSub or PtP) arrives.    * <br />    * Publish filter plugin checks are done already<br />    * Cluster forwards are done already.    *    * @param publisherSessionInfo  The publisher    * @param msgUnit     The new message    * @param publishQosServer  The decorator for msgUnit.getQosData()    *    * @return not null for PtP messages    */   public PublishReturnQos publish(SessionInfo publisherSessionInfo, MsgUnit msgUnit, PublishQosServer publishQosServer) throws XmlBlasterException   {      if (log.isLoggable(Level.FINE)) log.fine(ME+": publish() publisherSessionInfo '" + publisherSessionInfo.getId() + "', message '" + msgUnit.getLogId() + "' ...");      PublishReturnQos publishReturnQos = null;      MsgQosData msgQosData = null;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -