📄 requestbroker.java
字号:
HistoryQueueProperty historyQueueProperty = new HistoryQueueProperty(glob, glob.getId()); historyQueueProperty.setMaxEntriesCache(2); historyQueueProperty.setMaxEntries(2); topicProperty.setHistoryQueueProperty(historyQueueProperty); publishQos.setTopicProperty(topicProperty); this.publishQosForEvents = publishQos; // Should we configure historyQueue and msgUnitStore to be RAM based only? this.publishLoginEvent = glob.getProperty().get("loginEvent", true); if (this.publishLoginEvent) { // Key '__sys__Login' for login event (allows you to subscribe on new clients which do a login) org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_LOGIN/*"__sys__Login"*/, "text/plain"); this.xmlKeyLoginEvent = publishKey.getData(); this.publishQosLoginEvent = new PublishQosServer(glob, publishQos.getData().toXml(), false); // take copy } this.publishLogoutEvent = glob.getProperty().get("logoutEvent", true); if (this.publishLogoutEvent) { // Key '__sys__Logout' for logout event (allows you to subscribe on clients which do a logout) org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_LOGOUT/*"__sys__Logout"*/, "text/plain"); this.xmlKeyLogoutEvent = publishKey.getData(); this.publishQosLogoutEvent = new PublishQosServer(glob, publishQos.getData().toXml(), false); } this.publishUserList = glob.getProperty().get("userListEvent", true); if (this.publishUserList) { // Key '__sys__UserList' for login/logout event org.xmlBlaster.client.key.PublishKey publishKey = new org.xmlBlaster.client.key.PublishKey(glob, Constants.EVENT_OID_USERLIST/*"__sys__UserList"*/, "text/plain"); publishKey.setClientTags("<__sys__internal/>"); this.xmlKeyUserListEvent = publishKey.getData(); } } /** * A human readable name of the listener for logging. * <p /> * Enforced by I_RunlevelListener */ public String getName() { return ME; } BigXmlKeyDOM getBigXmlKeyDOM() { return this.bigXmlKeyDOM; } /** * Holds all subscriptions. * @return Is never null */ public ClientSubscriptions getClientSubscriptions() { return this.clientSubscriptions; } /** * Invoked on run level change, see RunlevelManager.RUNLEVEL_HALTED and RunlevelManager.RUNLEVEL_RUNNING * <p /> * Enforced by I_RunlevelListener */ public void runlevelChange(int from, int to, boolean force) throws org.xmlBlaster.util.XmlBlasterException { //if (log.isLoggable(Level.FINER)) log.call(ME, "Changing from run level=" + from + " to level=" + to + " with force=" + force); if (to == from) return; if (to > from) { // startup if (to == RunlevelManager.RUNLEVEL_STANDBY_PRE) { startupTopicStore(); }// else if (to == RunlevelManager.RUNLEVEL_CLEANUP_PRE) { // Load all persistent topics from persistent storage// loadPersistentMessages();// } } if (to < from) { // shutdown if (to == RunlevelManager.RUNLEVEL_HALTED) { XbNotifyHandler.instance().unregister(Level.WARNING.intValue(), this); XbNotifyHandler.instance().unregister(Level.SEVERE.intValue(), this); this.glob.unregisterMBean(this.mbeanHandle); } } } /** * Access the ServerScope handle. * @return The ServerScope instance of this xmlBlaster server */ public final ServerScope getServerScope() { return this.glob; } /** * @return The handle on the persistence storage for all topics */ I_Map getTopicStore() { return this.topicStore; } /** * This stores the topics configuration (the publish administrative message - the MsgUnit data struct) */ private void startupTopicStore() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering startupTopicStore(), looking for persisted topics"); boolean wipeOutJdbcDB = glob.getProperty().get("wipeOutJdbcDB", false); if (wipeOutJdbcDB) { this.glob.setWipeOutDB(wipeOutJdbcDB); // it is now the responsability of the QueuePlugin (see JdbcQueueCommonTable) to // really perform the wipeout, the request broker only gives him an order to do so./* String tableNamePrefix = "XMLBLASTER"; tableNamePrefix = glob.getProperty().get("queue.persistent.tableNamePrefix", tableNamePrefix).toUpperCase(); log.warn(ME, "You have set '-wipeOutJdbcDB true', we will destroy now the complete JDBC persistence store entries of prefix="+tableNamePrefix); try { java.util.Properties prop = new java.util.Properties(); prop.put("tableNamePrefix", tableNamePrefix); JdbcManagerCommonTable.wipeOutDB(glob, "JDBC", "1.0", prop, true); } catch (XmlBlasterException e) { log.error(ME, "Wipe out of JDBC database entries failed: " + e.getMessage()); }*/ } boolean useTopicStore = glob.getProperty().get("useTopicStore", true); if (!useTopicStore) { log.warning("Persistent and recoverable topics are switched off with '-useTopicStore false', topics are handled RAM based only."); return; } synchronized (this) { // TODO: get TopicStoreProperty from administrator //TopicStoreProperty topicStoreProperty = this.topicProperty.getTopicStoreProperty(); TopicStoreProperty topicStoreProperty = new TopicStoreProperty(glob, glob.getStrippedId()); if (this.topicStore == null) { String type = topicStoreProperty.getType(); String version = topicStoreProperty.getVersion(); // e.g. "topicStore:/node/heron" is the unique name of the data store: StorageId topicStoreId = new StorageId("topicStore", glob.getStrippedId()); this.topicStore = glob.getStoragePluginManager().getPlugin(type, version, topicStoreId, topicStoreProperty); //this.topicStore = new org.xmlBlaster.engine.msgstore.ram.MapPlugin(); log.info("Activated storage '" + this.topicStore.getStorageId() + "' for persistent topics, found " + this.topicStore.getNumOfEntries() + " topics to recover."); MsgUnitStoreProperty limitM = new MsgUnitStoreProperty(glob, null); // The current limit from xmlBlaster.properties HistoryQueueProperty limitH = new HistoryQueueProperty(glob, null); // The current limit I_MapEntry[] mapEntryArr = this.topicStore.getAll(null); boolean fromPersistenceStore = true; for(int i=0; i<mapEntryArr.length; i++) { TopicEntry topicEntry = (TopicEntry)mapEntryArr[i]; PublishQosServer publishQosServer = new PublishQosServer(glob, (MsgQosData)topicEntry.getMsgUnit().getQosData(), fromPersistenceStore); publishQosServer.setTopicEntry(topicEntry); // Misuse PublishQosServer to transport the topicEntry boolean existsAlready = (glob.getTopicAccessor().accessDirtyRead(topicEntry.getKeyOid()) != null); if (existsAlready) { log.warning("Removing duplicate of topic '" + topicEntry.getLogId() + "' from persistence store"); try { this.topicStore.remove(topicEntry); } catch (XmlBlasterException e) { log.severe("Removing duplicate of topic '" + topicEntry.getLogId() + "' from persistence store makes problems: " + e.getMessage()); } continue; } try { // Check limits TopicProperty topicProps = ((MsgQosData)topicEntry.getMsgUnit().getQosData()).getTopicProperty(); if (topicProps != null && topicProps.getMsgUnitStoreProperty() != null) { MsgUnitStoreProperty p = topicProps.getMsgUnitStoreProperty(); if (p.getMaxBytes() > limitM.getMaxBytes()) { // How to prevent a smaller limit than actual bytes on HD? p.setMaxBytes(limitM.getMaxBytes()); } if (p.getMaxBytesCache() > limitM.getMaxBytesCache()) { p.setMaxBytesCache(limitM.getMaxBytesCache()); } if (p.getMaxEntries() > limitM.getMaxEntries()) { // How to prevent a smaller limit than actual entries on HD? p.setMaxEntries(limitM.getMaxEntries()); } if (p.getMaxEntriesCache() > limitM.getMaxEntriesCache()) { p.setMaxEntriesCache(limitM.getMaxEntriesCache()); } } if (topicProps != null && topicProps.getHistoryQueueProperty() != null) { HistoryQueueProperty h = topicProps.getHistoryQueueProperty(); if (h.getMaxBytes() > limitH.getMaxBytes()) { h.setMaxBytes(limitH.getMaxBytes()); } if (h.getMaxBytesCache() > limitH.getMaxBytesCache()) { h.setMaxBytesCache(limitH.getMaxBytesCache()); } if (h.getMaxEntries() > limitH.getMaxEntries()) { h.setMaxEntries(limitH.getMaxEntries()); } if (h.getMaxEntriesCache() > limitH.getMaxEntriesCache()) { h.setMaxEntriesCache(limitH.getMaxEntriesCache()); } } publish(unsecureSessionInfo, topicEntry.getMsgUnit(), publishQosServer); // Called after sessions/subscriptions are recovered from SessionPersistencePlugin: // glob.getTopicAccessor().spanTopicDestroyTimeout(); } catch (XmlBlasterException e) { log.severe("Restoring topic '" + topicEntry.getMsgUnit().getKeyOid() + "' from persistency failed: " + e.getMessage()); } } } else { log.info("Reconfiguring topics store."); this.topicStore.setProperties(topicStoreProperty); } } } public final AccessPluginManager getAccessPluginManager() { return this.accessPluginManager; } public final PublishPluginManager getPublishPluginManager() { return this.publishPluginManager; } final SessionInfo getInternalSessionInfo() { // Note: We could change to 'public' as the CommandManager transports it to public scope already // with glob.getCommandManager().getSessionInfo() // and serverScope.getInternalSessionInfo() return this.unsecureSessionInfo; } /** * Publish dead letters, expired letters should be filtered away before. * <p /> * The key contains an attribute with the oid of the lost message: * <pre> * <key oid='__sys__deadMessage'> * <oid>aMessage</oid> * <key> * </pre> * <p> * The usual sources to send dead letters are: * </p> * <ol> * <li>A publish of a message fails, the message is lost but you can handle it * if you subscribe to dead messages * </li> * <li>A subscribe fails because a mime plugin throws an exception * </li> * <li>A callback fails * </li> * </ol> * @param entries The message to send as dead letters * @param queue The belonging queue or null * @param reason A human readable text describing the problem * @return State information returned from the publish call (is never null) */ public String[] deadMessage(MsgQueueEntry[] entries, I_Queue queue, String reason) { if (log.isLoggable(Level.FINER)) log.finer("Publishing " + entries.length + " dead messages."); if (entries == null) { log.severe("deadMessage() with null argument"); Thread.dumpStack(); return new String[0]; } try { if (log.isLoggable(Level.FINE)) log.fine("Publishing " + entries.length + " volatile dead messages"); String[] retArr = new String[entries.length]; PublishQos pubQos = new PublishQos(glob); pubQos.setVolatile(true); for (int ii=0; ii<entries.length; ii++) { MsgQueueEntry entry = entries[ii]; if (entry == null) { log.severe("Didn't expect null element in MsgQueueEntry[], ignoring it"); continue; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -