⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 requestbroker.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
      HistoryQueueProperty historyQueueProperty = new HistoryQueueProperty(glob, glob.getId());      historyQueueProperty.setMaxEntriesCache(2);      historyQueueProperty.setMaxEntries(2);      topicProperty.setHistoryQueueProperty(historyQueueProperty);      publishQos.setTopicProperty(topicProperty);      this.publishQosForEvents = publishQos;      // Should we configure historyQueue and msgUnitStore to be RAM based only?      this.publishLoginEvent = glob.getProperty().get("loginEvent", true);      if (this.publishLoginEvent) {         // Key '__sys__Login' for login event (allows you to subscribe on new clients which do a login)         org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_LOGIN/*"__sys__Login"*/, "text/plain");         this.xmlKeyLoginEvent = publishKey.getData();         this.publishQosLoginEvent = new PublishQosServer(glob, publishQos.getData().toXml(), false); // take copy      }      this.publishLogoutEvent = glob.getProperty().get("logoutEvent", true);      if (this.publishLogoutEvent) {         // Key '__sys__Logout' for logout event (allows you to subscribe on clients which do a logout)         org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_LOGOUT/*"__sys__Logout"*/, "text/plain");         this.xmlKeyLogoutEvent = publishKey.getData();         this.publishQosLogoutEvent = new PublishQosServer(glob, publishQos.getData().toXml(), false);      }      this.publishUserList = glob.getProperty().get("userListEvent", true);      if (this.publishUserList) {         // Key '__sys__UserList' for login/logout event         org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_USERLIST/*"__sys__UserList"*/, "text/plain");         publishKey.setClientTags("<__sys__internal/>");         this.xmlKeyUserListEvent = publishKey.getData();      }   }   /**    * A human readable name of the listener for logging.    * <p />    * Enforced by I_RunlevelListener    */   public String getName() {      return ME;   }   BigXmlKeyDOM getBigXmlKeyDOM() {      return  this.bigXmlKeyDOM;   }   /**    * Holds all subscriptions.    * @return Is never null    */   public ClientSubscriptions getClientSubscriptions() {      return this.clientSubscriptions;   }   /**    * Invoked on run level change, see RunlevelManager.RUNLEVEL_HALTED and RunlevelManager.RUNLEVEL_RUNNING    * <p />    * Enforced by I_RunlevelListener    */   public void runlevelChange(int from, int to, boolean force) throws org.xmlBlaster.util.XmlBlasterException {      //if (log.isLoggable(Level.FINER)) log.call(ME, "Changing from run level=" + from + " to level=" + to + " with force=" + force);      if (to == from)         return;      if (to > from) { // startup         if (to == RunlevelManager.RUNLEVEL_STANDBY_PRE) {           startupTopicStore();         }//         else if (to == RunlevelManager.RUNLEVEL_CLEANUP_PRE) {           // Load all persistent topics from persistent storage//           loadPersistentMessages();//         }      }      if (to < from) { // shutdown         if (to == RunlevelManager.RUNLEVEL_HALTED) {            XbNotifyHandler.instance().unregister(Level.WARNING.intValue(), this);            XbNotifyHandler.instance().unregister(Level.SEVERE.intValue(), this);            this.glob.unregisterMBean(this.mbeanHandle);         }      }   }   /**    * Access the ServerScope handle.    * @return The ServerScope instance of this xmlBlaster server    */   public final ServerScope getServerScope() {      return this.glob;   }   /**    * @return The handle on the persistence storage for all topics    */   I_Map getTopicStore() {      return this.topicStore;   }   /**    * This stores the topics configuration (the publish administrative message - the MsgUnit data struct)    */   private void startupTopicStore() throws XmlBlasterException   {      if (log.isLoggable(Level.FINER)) log.finer("Entering startupTopicStore(), looking for persisted topics");      boolean wipeOutJdbcDB = glob.getProperty().get("wipeOutJdbcDB", false);      if (wipeOutJdbcDB) {         this.glob.setWipeOutDB(wipeOutJdbcDB);         // it is now the responsability of the QueuePlugin (see JdbcQueueCommonTable) to         // really perform the wipeout, the request broker only gives him an order to do so./*         String tableNamePrefix = "XMLBLASTER";         tableNamePrefix = glob.getProperty().get("queue.persistent.tableNamePrefix", tableNamePrefix).toUpperCase();         log.warn(ME, "You have set '-wipeOutJdbcDB true', we will destroy now the complete JDBC persistence store entries of prefix="+tableNamePrefix);         try {            java.util.Properties prop = new java.util.Properties();            prop.put("tableNamePrefix", tableNamePrefix);            JdbcManagerCommonTable.wipeOutDB(glob, "JDBC", "1.0", prop, true);         }          catch (XmlBlasterException e) {          log.error(ME, "Wipe out of JDBC database entries failed: " + e.getMessage());         }*/      }      boolean useTopicStore = glob.getProperty().get("useTopicStore", true);      if (!useTopicStore) {         log.warning("Persistent and recoverable topics are switched off with '-useTopicStore false', topics are handled RAM based only.");         return;      }      synchronized (this) {         // TODO: get TopicStoreProperty from administrator         //TopicStoreProperty topicStoreProperty = this.topicProperty.getTopicStoreProperty();         TopicStoreProperty topicStoreProperty = new TopicStoreProperty(glob, glob.getStrippedId());         if (this.topicStore == null) {            String type = topicStoreProperty.getType();            String version = topicStoreProperty.getVersion();            // e.g. "topicStore:/node/heron" is the unique name of the data store:            StorageId topicStoreId = new StorageId("topicStore", glob.getStrippedId());            this.topicStore = glob.getStoragePluginManager().getPlugin(type, version, topicStoreId, topicStoreProperty);            //this.topicStore = new org.xmlBlaster.engine.msgstore.ram.MapPlugin();            log.info("Activated storage '" + this.topicStore.getStorageId() + "' for persistent topics, found " + this.topicStore.getNumOfEntries() + " topics to recover.");            MsgUnitStoreProperty limitM = new MsgUnitStoreProperty(glob, null); // The current limit from xmlBlaster.properties            HistoryQueueProperty limitH = new HistoryQueueProperty(glob, null); // The current limit            I_MapEntry[] mapEntryArr = this.topicStore.getAll(null);            boolean fromPersistenceStore = true;            for(int i=0; i<mapEntryArr.length; i++) {               TopicEntry topicEntry = (TopicEntry)mapEntryArr[i];               PublishQosServer publishQosServer = new PublishQosServer(glob,                       (MsgQosData)topicEntry.getMsgUnit().getQosData(), fromPersistenceStore);               publishQosServer.setTopicEntry(topicEntry); // Misuse PublishQosServer to transport the topicEntry               boolean existsAlready = (glob.getTopicAccessor().accessDirtyRead(topicEntry.getKeyOid()) != null);               if (existsAlready) {                  log.warning("Removing duplicate of topic '" + topicEntry.getLogId() + "' from persistence store");                  try {                     this.topicStore.remove(topicEntry);                  }                  catch (XmlBlasterException e) {                     log.severe("Removing duplicate of topic '" + topicEntry.getLogId() + "' from persistence store makes problems: " + e.getMessage());                  }                  continue;               }               try {                  // Check limits                  TopicProperty topicProps = ((MsgQosData)topicEntry.getMsgUnit().getQosData()).getTopicProperty();                  if (topicProps != null && topicProps.getMsgUnitStoreProperty() != null) {                     MsgUnitStoreProperty p = topicProps.getMsgUnitStoreProperty();                     if (p.getMaxBytes() > limitM.getMaxBytes()) { // How to prevent a smaller limit than actual bytes on HD?                        p.setMaxBytes(limitM.getMaxBytes());                     }                     if (p.getMaxBytesCache() > limitM.getMaxBytesCache()) {                        p.setMaxBytesCache(limitM.getMaxBytesCache());                     }                     if (p.getMaxEntries() > limitM.getMaxEntries()) { // How to prevent a smaller limit than actual entries on HD?                        p.setMaxEntries(limitM.getMaxEntries());                     }                     if (p.getMaxEntriesCache() > limitM.getMaxEntriesCache()) {                        p.setMaxEntriesCache(limitM.getMaxEntriesCache());                     }                  }                  if (topicProps != null && topicProps.getHistoryQueueProperty() != null) {                     HistoryQueueProperty h = topicProps.getHistoryQueueProperty();                     if (h.getMaxBytes() > limitH.getMaxBytes()) {                        h.setMaxBytes(limitH.getMaxBytes());                     }                     if (h.getMaxBytesCache() > limitH.getMaxBytesCache()) {                        h.setMaxBytesCache(limitH.getMaxBytesCache());                     }                     if (h.getMaxEntries() > limitH.getMaxEntries()) {                        h.setMaxEntries(limitH.getMaxEntries());                     }                     if (h.getMaxEntriesCache() > limitH.getMaxEntriesCache()) {                        h.setMaxEntriesCache(limitH.getMaxEntriesCache());                     }                  }                  publish(unsecureSessionInfo, topicEntry.getMsgUnit(), publishQosServer);                  // Called after sessions/subscriptions are recovered from SessionPersistencePlugin:                  //   glob.getTopicAccessor().spanTopicDestroyTimeout();               }               catch (XmlBlasterException e) {                  log.severe("Restoring topic '" + topicEntry.getMsgUnit().getKeyOid() + "' from persistency failed: " + e.getMessage());               }            }         }         else {            log.info("Reconfiguring topics store.");            this.topicStore.setProperties(topicStoreProperty);         }      }   }   public final AccessPluginManager getAccessPluginManager() {      return this.accessPluginManager;   }   public final PublishPluginManager getPublishPluginManager() {      return this.publishPluginManager;   }   final SessionInfo getInternalSessionInfo() {      // Note: We could change to 'public' as the CommandManager transports it to public scope already      //       with glob.getCommandManager().getSessionInfo()      // and   serverScope.getInternalSessionInfo()      return this.unsecureSessionInfo;   }   /**    * Publish dead letters, expired letters should be filtered away before.    * <p />    * The key contains an attribute with the oid of the lost message:    * <pre>    *   &lt;key oid='__sys__deadMessage'>    *      &lt;oid>aMessage&lt;/oid>    *   &lt;key>    * </pre>    * <p>    * The usual sources to send dead letters are:    * </p>    * <ol>    *   <li>A publish of a message fails, the message is lost but you can handle it    *       if you subscribe to dead messages    *   </li>    *   <li>A subscribe fails because a mime plugin throws an exception    *   </li>    *   <li>A callback fails    *   </li>    * </ol>    * @param entries The message to send as dead letters    * @param queue The belonging queue or null    * @param reason A human readable text describing the problem    * @return State information returned from the publish call (is never null)    */   public String[] deadMessage(MsgQueueEntry[] entries, I_Queue queue, String reason)   {      if (log.isLoggable(Level.FINER)) log.finer("Publishing " + entries.length + " dead messages.");      if (entries == null) {         log.severe("deadMessage() with null argument");         Thread.dumpStack();         return new String[0];      }      try {         if (log.isLoggable(Level.FINE)) log.fine("Publishing " + entries.length + " volatile dead messages");         String[] retArr = new String[entries.length];         PublishQos pubQos = new PublishQos(glob);         pubQos.setVolatile(true);         for (int ii=0; ii<entries.length; ii++) {            MsgQueueEntry entry = entries[ii];            if (entry == null) {               log.severe("Didn't expect null element in MsgQueueEntry[], ignoring it");               continue;            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -