📄 requestbroker.java
字号:
/*------------------------------------------------------------------------------Name: RequestBroker.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Handling the Client dataAuthor: xmlBlaster@marcelruff.info------------------------------------------------------------------------------*/package org.xmlBlaster.engine;import java.io.ByteArrayOutputStream;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStream;import java.util.logging.LogRecord;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.FileLocator;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.ThreadLister;import org.xmlBlaster.util.Timeout;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.Timestamp;import org.xmlBlaster.util.qos.StatusQosData;import org.xmlBlaster.util.key.KeyData;import org.xmlBlaster.util.key.MsgKeyData;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.log.I_LogListener;import org.xmlBlaster.util.log.XbNotifyHandler;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.util.def.MethodName;import org.xmlBlaster.util.SessionName;import org.xmlBlaster.util.queue.I_Entry;import org.xmlBlaster.util.queue.I_EntryFilter;import org.xmlBlaster.util.queue.I_Queue;import org.xmlBlaster.util.queue.I_Storage;import org.xmlBlaster.util.queue.StorageId;import org.xmlBlaster.util.queue.jdbc.JdbcManagerCommonTable;import org.xmlBlaster.util.queuemsg.MsgQueueEntry;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.dispatch.DispatchStatistic;import org.xmlBlaster.util.qos.MsgQosData;import org.xmlBlaster.util.qos.TopicProperty;import org.xmlBlaster.util.qos.storage.HistoryQueueProperty;import org.xmlBlaster.util.qos.storage.MsgUnitStoreProperty;import org.xmlBlaster.util.qos.storage.TopicStoreProperty;import org.xmlBlaster.util.qos.AccessFilterQos;import org.xmlBlaster.util.checkpoint.I_Checkpoint;import org.xmlBlaster.util.cluster.RouteInfo;import org.xmlBlaster.client.key.UpdateKey;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.key.PublishKey;import org.xmlBlaster.client.qos.PublishQos;import org.xmlBlaster.engine.queuemsg.ReferenceEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueHistoryEntry;import org.xmlBlaster.engine.queuemsg.MsgQueueUpdateEntry;import org.xmlBlaster.engine.queuemsg.TopicEntry;import org.xmlBlaster.engine.mime.I_AccessFilter;import org.xmlBlaster.engine.mime.AccessPluginManager;import org.xmlBlaster.engine.mime.I_PublishFilter;import org.xmlBlaster.engine.mime.PublishPluginManager;import org.xmlBlaster.engine.xml2java.XmlKey;import org.xmlBlaster.engine.qos.PublishQosServer;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.engine.qos.EraseQosServer;import org.xmlBlaster.engine.qos.GetQosServer;import org.xmlBlaster.engine.qos.GetReturnQosServer;import org.xmlBlaster.engine.cluster.PublishRetQosWrapper;import org.xmlBlaster.engine.msgstore.I_Map;import org.xmlBlaster.engine.msgstore.I_MapEntry;import org.xmlBlaster.engine.msgstore.cache.PersistenceCachePlugin;import org.xmlBlaster.authentication.Authenticate;import org.xmlBlaster.authentication.I_ClientListener;import org.xmlBlaster.authentication.ClientEvent;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.engine.runlevel.I_RunlevelListener;import org.xmlBlaster.engine.runlevel.RunlevelManager;// import org.xmlBlaster.util.log.LogNotifierDeviceFactory;import org.xmlBlaster.util.admin.extern.JmxMBeanHandle;import java.util.*;import javax.management.NotificationBroadcasterSupport;import javax.management.AttributeChangeNotification;import javax.management.MBeanNotificationInfo;/** * This is the central message broker, all requests are routed through this singleton. * <p> * The interface I_ClientListener informs about Client login/logout<br /> * <p> * Most events are fired from the RequestBroker * <p> * See <a href="http://www.xmlBlaster.org/xmlBlaster/src/java/org/xmlBlaster/protocol/corba/xmlBlaster.idl">xmlBlaster.idl</a>, * the CORBA access interface on how clients can access xmlBlaster. * * @author <a href="mailto:xmlBlaster@marcelruff.info">Marcel Ruff</a> */public final class RequestBroker extends NotificationBroadcasterSupport implements I_ClientListener, /*I_AdminNode,*/ RequestBrokerMBean, I_RunlevelListener, I_LogListener{ private String ME = "RequestBroker"; private final ServerScope glob; private static Logger log = Logger.getLogger(RequestBroker.class.getName()); /** * Contains total count of published messages and get() invocations. */ private DispatchStatistic dispatchStatistic = new DispatchStatistic(); private String lastWarning = ""; private String lastError = ""; /** the authentication service */ private Authenticate authenticate; private final Set remotePropertiesListeners = new TreeSet(); /** * Store configuration of all topics in xmlBlaster for recovery */ private I_Map topicStore; /** * This client is only for internal use, it is un secure to pass it outside because * there is no authentication.<br /> * The login name "__RequestBroker_internal__" is reserved!<br /> * TODO: security discussion */ private final SessionInfo unsecureSessionInfo; private final SessionName myselfLoginName; // "__RequestBroker_internal[heron]"; public final static String internalLoginNamePrefix = "__RequestBroker_internal"; /** * Helper to handle the subscriptions */ private final ClientSubscriptions clientSubscriptions; /** * For listeners who want to be informed about subscribe/unsubscribe events. * The key is an Integer number where the lowest is the first invoked on subscribe and the * last invoked on unsubscribe. */ private final Map subscriptionListenerMap = new TreeMap(); /** * This is a handle on the big DOM tree with all XmlKey keys (all message meta data) */ private BigXmlKeyDOM bigXmlKeyDOM = null; /** * This Interface allows to hook in you own persistence driver, configure it through xmlBlaster.properties */ // private I_PersistenceDriver persistenceDriver = null; /** Flag for performance reasons only */ private boolean useOldStylePersistence; /** The messageUnit for a login event */ private boolean publishLoginEvent = true; private MsgKeyData xmlKeyLoginEvent = null; private org.xmlBlaster.client.qos.PublishQos publishQosForEvents; private PublishQosServer publishQosLoginEvent; /** Initialize a messageUnit for a userList event */ private boolean publishUserList = true; private MsgKeyData xmlKeyUserListEvent = null; /** Initialize a messageUnit for a logout event */ private boolean publishLogoutEvent = true; private MsgKeyData xmlKeyLogoutEvent = null; private PublishQosServer publishQosLogoutEvent; //private Timeout burstModeTimer; private AccessPluginManager accessPluginManager = null; private PublishPluginManager publishPluginManager = null; // Enforced by I_AdminNode /** Incarnation time of this object instance in millis */ private long startupTime; /** State during construction */ private static final int UNDEF = -1; private static final int ALIVE = 0; private int state = UNDEF; /** My JMX registration */ private JmxMBeanHandle mbeanHandle; /** * One instance of this represents one xmlBlaster server. * @param authenticate The authentication service */ public RequestBroker(Authenticate authenticate) throws XmlBlasterException { this.authenticate = authenticate; this.glob = this.authenticate.getGlobal(); glob.setRequestBroker(this); glob.setTopicAccessor(new TopicAccessor(this.glob)); this.startupTime = System.currentTimeMillis(); this.mbeanHandle = this.glob.registerMBean(this.glob.getContextNode(), this); XbNotifyHandler.instance().register(Level.WARNING.intValue(), this); XbNotifyHandler.instance().register(Level.SEVERE.intValue(), this); this.useOldStylePersistence = glob.getProperty().get("useOldStylePersistence", false); if (this.useOldStylePersistence) { log.warning("Old style fielstorage is switched on which is deprecated (-useOldStylePersistence true)."); } glob.getRunlevelManager().addRunlevelListener(this); //this.burstModeTimer = new Timeout("BurstmodeTimer"); myselfLoginName = new SessionName(glob, glob.getNodeId(), internalLoginNamePrefix + "[" + glob.getId() + "]/1"); initHelperQos(); org.xmlBlaster.client.qos.ConnectQos connectQos = new org.xmlBlaster.client.qos.ConnectQos(glob); connectQos.setSessionName(myselfLoginName); connectQos.getSessionQos().setSessionTimeout(0L); // Lasts forever this.unsecureSessionInfo = authenticate.unsecureCreateSession(connectQos); this.glob.setInternalSessionInfo(this.unsecureSessionInfo); try { glob.getCommandManager(this.unsecureSessionInfo); } catch(XmlBlasterException e) { log.severe(e.toString()); } this.ME = "RequestBroker" + glob.getLogPrefixDashed(); accessPluginManager = new AccessPluginManager(glob); publishPluginManager = new PublishPluginManager(glob); this.clientSubscriptions = new ClientSubscriptions(glob, this, authenticate); this.bigXmlKeyDOM = new BigXmlKeyDOM(this, authenticate); authenticate.addClientListener(this); this.state = ALIVE; if (log.isLoggable(Level.FINER)) log.finer("Server " + glob.getInstanceId() + " instance is created"); } Authenticate getAuthenticate() { return this.authenticate; } /** * Put this code in a generic internal message producer class (future release) */ private void initHelperQos() throws XmlBlasterException { // Create properties with infinite life time, forceUpdate and historySize=1 org.xmlBlaster.client.qos.PublishQos publishQos = new org.xmlBlaster.client.qos.PublishQos(glob); publishQos.setLifeTime(-1L); publishQos.setForceUpdate(true); TopicProperty topicProperty = new TopicProperty(glob);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -