📄 clustermanager.java
字号:
/*------------------------------------------------------------------------------Name: ClusterManager.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: Main manager class for clustering------------------------------------------------------------------------------*/package org.xmlBlaster.engine.cluster;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.plugin.I_Plugin;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.QosData;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.runlevel.I_RunlevelListener;import org.xmlBlaster.engine.runlevel.RunlevelManager;import org.xmlBlaster.util.qos.address.Address;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.cluster.NodeId;import org.xmlBlaster.util.cluster.RouteInfo;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.engine.qos.GetQosServer;import org.xmlBlaster.engine.qos.EraseQosServer;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.protocol.I_Driver;import org.xmlBlaster.client.key.GetKey;import org.xmlBlaster.client.qos.GetQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.I_XmlBlasterAccess;import java.util.Properties;import java.util.Set;import java.util.TreeSet;import java.util.Map;import java.util.TreeMap;import java.util.Iterator;import java.util.Comparator;/** * The manager instance for a cluster node. * <p /> * Each xmlBlaster server instance has one instance * of this class to manage its behavior in the cluster. * <p /> * Note: Our own node id is available via glob.getNodeId() * <p /> * See the <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/cluster.html">cluster requirement</a> * for a detailed description. * @author xmlBlaster@marcelruff.info * @since 0.79e */public final class ClusterManager implements I_RunlevelListener, I_Plugin, ClusterManagerMBean{ private String ME; // The following 3 declarations are 'final' but the SUN JDK 1.3.1 does not like it private ServerScope glob; private static Logger log = Logger.getLogger(ClusterManager.class.getName()); private SessionInfo sessionInfo; private MapMsgToMasterPluginManager mapMsgToMasterPluginManager; private LoadBalancerPluginManager loadBalancerPluginManager; private I_LoadBalancer loadBalancer; public String pluginLoadBalancerType; public String pluginLoadBalancerVersion; private PluginInfo pluginInfo; private ContextNode contextNode; /** My JMX registration */ private Object mbeanHandle; /** * Map containing ClusterNode objects, the key is a 'node Id' * The entries are sorted to contain the local node as first entry. */ private Map clusterNodeMap; private ClusterNode[] clusterNodesCache = new ClusterNode[0]; /** Info about myself */ private ClusterNode myClusterNode = null; private boolean postInitialized = false; /** * Usually connecting on demand is enough (e.g. connecting when a message needs to be delivered). * <p /> * If you want to immediately resend tail back messages on server startup we can * force to establish the connections to all nodes immediately.<br /> * The I_XmlBlasterAccess checks then for tailed back messages which where not yet delivered * and sends them. */ private boolean lazyConnect = false; /** * If loaded by RunlevelManager. */ public ClusterManager() { } /** * You need to call postInit() after all drivers are loaded. * Loaded by RequestBroker.java (hard coded) * * @param sessionInfo Internal handle to be used directly with RequestBroker * NOTE: We (the cluster code) are responsible for security checks * as we directly write into RequestBroker. */ public ClusterManager(ServerScope glob, SessionInfo sessionInfo) { this.glob = glob; this.sessionInfo = sessionInfo; this.ME = "ClusterManager" + this.glob.getLogPrefixDashed(); this.glob.getRunlevelManager().addRunlevelListener(this); this.glob.setUseCluster(true); } /** * Enforced by I_Plugin * @return The configured type in xmlBlaster.properties, defaults to "SOCKET" */ public String getType() { return (this.pluginInfo == null) ? "cluster" : this.pluginInfo.getType(); } /* * The command line key prefix * @return The configured type in xmlBlasterPlugins.xml, defaults to "plugin/cluster" public String getEnvPrefix() { return (addressServer != null) ? addressServer.getEnvPrefix() : "plugin/"+getType().toLowerCase(); } */ /** Enforced by I_Plugin */ public String getVersion() { return (this.pluginInfo == null) ? "1.0" : this.pluginInfo.getVersion(); } /** * This method is called by the PluginManager (enforced by I_Plugin). * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global,org.xmlBlaster.util.plugin.PluginInfo) */ public void init(org.xmlBlaster.util.Global globUtil, PluginInfo pluginInfo) throws XmlBlasterException { this.pluginInfo = pluginInfo; this.ME = "ClusterManager"; this.glob = (org.xmlBlaster.engine.ServerScope)globUtil.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope); if (this.glob == null) throw new XmlBlasterException(globUtil, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "could not retreive the ServerNodeScope. Am I really on the server side ?"); if (!this.glob.useCluster()) { log.info("Activating cluster is switched off with '-cluster false'"); return; } this.sessionInfo = this.glob.getInternalSessionInfo(); this.glob.getRunlevelManager().addRunlevelListener(this); this.glob.setClusterManager(this); // For JMX instanceName may not contain "," String vers = ("1.0".equals(getVersion())) ? "" : getVersion(); this.contextNode = new ContextNode(ContextNode.SERVICE_MARKER_TAG, "ClusterManager[" + getType() + vers + "]", this.glob.getContextNode()); this.ME = this.contextNode.getRelativeName(); this.mbeanHandle = this.glob.registerMBean(this.contextNode, this); try { postInit(); } catch (XmlBlasterException ex) { throw ex; } catch (Throwable ex) { throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "init. Could'nt initialize ClusterManager.", ex); } } /** * To initialize ClusterNode we need the addresses from the protocol drivers. */ public void postInit() throws XmlBlasterException { this.pluginLoadBalancerType = this.glob.getProperty().get("cluster.loadBalancer.type", "RoundRobin"); this.pluginLoadBalancerVersion = this.glob.getProperty().get("cluster.loadBalancer.version", "1.0"); this.loadBalancerPluginManager = new LoadBalancerPluginManager(this.glob, this); loadBalancer = loadBalancerPluginManager.getPlugin( this.pluginLoadBalancerType, this.pluginLoadBalancerVersion); // "RoundRobin", "1.0" if (loadBalancer == null) { String tmp = "No load balancer plugin type='" + this.pluginLoadBalancerType + "' version='" + this.pluginLoadBalancerVersion + "' found, clustering switched off"; log.severe(tmp); //Thread.currentThread().dumpStack(); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED, ME, tmp); // is caught in RequestBroker.java } this.clusterNodeMap = new TreeMap(new NodeComparator()); this.mapMsgToMasterPluginManager = new MapMsgToMasterPluginManager(this.glob, this); if (this.glob.getNodeId() == null) log.severe("Node ID is still unknown, please set '-cluster.node.id' to a unique name."); else initClusterNode(); // Look for environment settings to configure startup clustering String[] env = { "cluster.node", "cluster.node.info", "cluster.node.master" }; for (int ii=0; ii<env.length; ii++) { Map nodeMap = this.glob.getProperty().get(env[ii], (Map)null); if (nodeMap != null) { Iterator iter = nodeMap.keySet().iterator(); if (log.isLoggable(Level.FINE)) log.fine("Found -" + env[ii] + " with " + nodeMap.size() + " array size, ii=" + ii); while (iter.hasNext()) { String nodeIdName = (String)iter.next(); // e.g. "heron" from "cluster.node.master[heron]=..." String xml = (String)nodeMap.get(nodeIdName); // The "<clusternode>..." xml ASCII string for heron if (xml == null || xml.length() < 1) { log.info("Ignoring environment setting -" + env[ii]); continue; } if (log.isLoggable(Level.FINE)) log.fine("Parsing environment -" + env[ii] + " for node '" + nodeIdName + "' ..."); /*NodeParser nodeParser =*/ new NodeParser(this.glob, this, xml, sessionInfo); // fills the info to ClusterManager log.info("Environment for node '" + nodeIdName + "' parsed."); } } } publish(); subscribe(); if (log.isLoggable(Level.FINEST)) log.finest(toXml()); log.info("Initialized and ready"); postInitialized = true; if (!lazyConnect) initConnections(); } /** * On xmlBlaster startup we need to wait for incoming messages until clusterManager is ready. * NOTE: This should be resolved in future by the runlevel manager * @return false on timeout (manager was never ready) */ public boolean blockUntilReady() { if (this.postInitialized) return true; for (int i=0; i<2000; i++) { try { Thread.sleep(10L); } catch( InterruptedException ie) {} if (this.postInitialized) return true; } log.severe("Waited for " + (2000*10L) + " millis for cluster manager to be ready, giving up"); return false; } public boolean isReady() { return this.postInitialized; } /** * TODO: not implemented yet * You can't currently configure the cluster setup with messages, only statically * on startup */ private void publish() { if (log.isLoggable(Level.FINE)) log.fine("publish() of cluster internal messages is missing"); /* StringBuffer keyBuf = new StringBuffer(256); keyBuf.append("<key oid='").append(Constants.OID_CLUSTER_INFO).append("[").append(getId()).append("]").append("'><").append(Constants.OID_CLUSTER_INFO)("/></key>"); String qos = pubQos.toXml()); XmlKey xmlKey = new XmlKey(msgUnit.getXmlKey(), true); clone msgUnit retArr[ii] = publish(unsecureSessionInfo, xmlKey, msgUnit, new PublishQosServer(this.glob, msgUnit.getQos())); */ } /** * TODO: not implemented yet * You can't currently configure the cluster setup with messages, only statically * on startup */ private void subscribe() { if (log.isLoggable(Level.FINE)) log.fine("subscribe() of cluster internal messages is missing"); } /** * Initialize ClusterNode object, containing all informations about myself. */ private void initClusterNode() throws XmlBlasterException { this.myClusterNode = new ClusterNode(this.glob, this.glob.getNodeId(), this.sessionInfo); this.addClusterNode(this.myClusterNode);/* I_Driver[] drivers = glob.getProtocolManager().getPublicProtocolDrivers(); for (int ii=0; ii<drivers.length; ii++) { I_Driver driver = drivers[ii]; Address addr = new Address(glob, driver.getProtocolId(), glob.getId()); addr.setRawAddress(driver.getRawAddress()); this.myClusterNode.getNodeInfo().addAddress(addr); } if (drivers.length > 0) { if (log.isLoggable(Level.FINE)) log.trace(ME, "Setting " + drivers.length + " addresses for cluster node '" + getId() + "'"); }*/ //java.util.Vector drivers = glob.getPluginRegistry().getPluginsOfGroup("protocol"); I_Driver[] drivers = this.glob.getPluginRegistry().getPluginsOfInterfaceI_Driver(); for (int i=0; i < drivers.length; i++) { I_Driver driver = drivers[i]; String rawAddr = driver.getRawAddress(); if (rawAddr != null) { Address addr = new Address(this.glob, driver.getProtocolId(), this.glob.getId()); addr.setRawAddress(rawAddr); this.myClusterNode.getNodeInfo().addAddress(addr); } } if (drivers.length > 0) { if (log.isLoggable(Level.FINE)) log.fine("Setting " + drivers.length + " addresses for cluster node '" + getId() + "'"); } else { log.severe("ClusterNode is not properly initialized, no protocol pluging - no local xmlBlaster (node=" + getId() + ") address available"); Thread.dumpStack(); } } /** * Check if supplied address would connect to our own node. */ public final boolean isLocalAddress(Address other) { return getMyClusterNode().getNodeInfo().contains(other); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -