⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clustermanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*------------------------------------------------------------------------------Name:      ClusterManager.javaProject:   xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment:   Main manager class for clustering------------------------------------------------------------------------------*/package org.xmlBlaster.engine.cluster;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.key.QueryKeyData;import org.xmlBlaster.util.plugin.I_Plugin;import org.xmlBlaster.util.plugin.PluginInfo;import org.xmlBlaster.util.qos.ClientProperty;import org.xmlBlaster.util.qos.QosData;import org.xmlBlaster.util.qos.QueryQosData;import org.xmlBlaster.util.MsgUnit;import org.xmlBlaster.engine.ServerScope;import org.xmlBlaster.engine.runlevel.I_RunlevelListener;import org.xmlBlaster.engine.runlevel.RunlevelManager;import org.xmlBlaster.util.qos.address.Address;import org.xmlBlaster.util.qos.address.Destination;import org.xmlBlaster.util.def.Constants;import org.xmlBlaster.util.cluster.NodeId;import org.xmlBlaster.util.cluster.RouteInfo;import org.xmlBlaster.util.context.ContextNode;import org.xmlBlaster.engine.qos.SubscribeQosServer;import org.xmlBlaster.engine.qos.UnSubscribeQosServer;import org.xmlBlaster.engine.qos.GetQosServer;import org.xmlBlaster.engine.qos.EraseQosServer;import org.xmlBlaster.authentication.SessionInfo;import org.xmlBlaster.protocol.I_Driver;import org.xmlBlaster.client.key.GetKey;import org.xmlBlaster.client.qos.GetQos;import org.xmlBlaster.client.key.SubscribeKey;import org.xmlBlaster.client.qos.SubscribeQos;import org.xmlBlaster.client.qos.SubscribeReturnQos;import org.xmlBlaster.client.key.UnSubscribeKey;import org.xmlBlaster.client.qos.UnSubscribeQos;import org.xmlBlaster.client.qos.UnSubscribeReturnQos;import org.xmlBlaster.client.qos.PublishReturnQos;import org.xmlBlaster.client.key.EraseKey;import org.xmlBlaster.client.qos.EraseQos;import org.xmlBlaster.client.qos.EraseReturnQos;import org.xmlBlaster.client.I_XmlBlasterAccess;import java.util.Properties;import java.util.Set;import java.util.TreeSet;import java.util.Map;import java.util.TreeMap;import java.util.Iterator;import java.util.Comparator;/** * The manager instance for a cluster node.  * <p /> * Each xmlBlaster server instance has one instance * of this class to manage its behavior in the cluster.  * <p /> * Note: Our own node id is available via glob.getNodeId() * <p /> * See the <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/cluster.html">cluster requirement</a> * for a detailed description. * @author xmlBlaster@marcelruff.info * @since 0.79e */public final class ClusterManager implements I_RunlevelListener, I_Plugin, ClusterManagerMBean{   private String ME;   // The following 3 declarations are 'final' but the SUN JDK 1.3.1 does not like it   private ServerScope glob;   private static Logger log = Logger.getLogger(ClusterManager.class.getName());   private SessionInfo sessionInfo;   private MapMsgToMasterPluginManager mapMsgToMasterPluginManager;   private LoadBalancerPluginManager loadBalancerPluginManager;   private I_LoadBalancer loadBalancer;   public String pluginLoadBalancerType;   public String pluginLoadBalancerVersion;   private PluginInfo pluginInfo;   private ContextNode contextNode;   /** My JMX registration */   private Object mbeanHandle;   /**    * Map containing ClusterNode objects, the key is a 'node Id'    * The entries are sorted to contain the local node as first entry.    */   private Map clusterNodeMap;   private ClusterNode[] clusterNodesCache = new ClusterNode[0];   /** Info about myself */   private ClusterNode myClusterNode = null;   private boolean postInitialized = false;   /**    * Usually connecting on demand is enough (e.g. connecting when a message needs to be delivered).     * <p />    * If you want to immediately resend tail back messages on server startup we can    * force to establish the connections to all nodes immediately.<br />    * The I_XmlBlasterAccess checks then for tailed back messages which where not yet delivered    * and sends them.    */   private boolean lazyConnect = false;   /**    * If loaded by RunlevelManager.     */   public ClusterManager() {   }   /**    * You need to call postInit() after all drivers are loaded.     * Loaded by RequestBroker.java (hard coded)    *    * @param sessionInfo Internal handle to be used directly with RequestBroker    *                    NOTE: We (the cluster code) are responsible for security checks    *                    as we directly write into RequestBroker.    */   public ClusterManager(ServerScope glob, SessionInfo sessionInfo) {      this.glob = glob;      this.sessionInfo = sessionInfo;      this.ME = "ClusterManager" + this.glob.getLogPrefixDashed();      this.glob.getRunlevelManager().addRunlevelListener(this);      this.glob.setUseCluster(true);   }   /**    * Enforced by I_Plugin    * @return The configured type in xmlBlaster.properties, defaults to "SOCKET"    */   public String getType() {      return (this.pluginInfo == null) ? "cluster" : this.pluginInfo.getType();   }   /*    * The command line key prefix    * @return The configured type in xmlBlasterPlugins.xml, defaults to "plugin/cluster"   public String getEnvPrefix() {      return (addressServer != null) ? addressServer.getEnvPrefix() : "plugin/"+getType().toLowerCase();   }    */   /** Enforced by I_Plugin */   public String getVersion() {      return (this.pluginInfo == null) ? "1.0" : this.pluginInfo.getVersion();   }    /**    * This method is called by the PluginManager (enforced by I_Plugin).    * @see org.xmlBlaster.util.plugin.I_Plugin#init(org.xmlBlaster.util.Global,org.xmlBlaster.util.plugin.PluginInfo)    */   public void init(org.xmlBlaster.util.Global globUtil, PluginInfo pluginInfo)      throws XmlBlasterException {      this.pluginInfo = pluginInfo;      this.ME = "ClusterManager";      this.glob = (org.xmlBlaster.engine.ServerScope)globUtil.getObjectEntry(Constants.OBJECT_ENTRY_ServerScope);      if (this.glob == null)         throw new XmlBlasterException(globUtil, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "could not retreive the ServerNodeScope. Am I really on the server side ?");            if (!this.glob.useCluster()) {         log.info("Activating cluster is switched off with '-cluster false'");         return;      }      this.sessionInfo = this.glob.getInternalSessionInfo();      this.glob.getRunlevelManager().addRunlevelListener(this);      this.glob.setClusterManager(this);            // For JMX instanceName may not contain ","      String vers = ("1.0".equals(getVersion())) ? "" : getVersion();      this.contextNode = new ContextNode(ContextNode.SERVICE_MARKER_TAG,            "ClusterManager[" + getType() + vers + "]", this.glob.getContextNode());      this.ME = this.contextNode.getRelativeName();      this.mbeanHandle = this.glob.registerMBean(this.contextNode, this);            try {         postInit();      }      catch (XmlBlasterException ex) {         throw ex;      }      catch (Throwable ex) {         throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME + ".init", "init. Could'nt initialize ClusterManager.", ex);      }   }      /**    * To initialize ClusterNode we need the addresses from the protocol drivers.    */   public void postInit() throws XmlBlasterException {      this.pluginLoadBalancerType = this.glob.getProperty().get("cluster.loadBalancer.type", "RoundRobin");      this.pluginLoadBalancerVersion = this.glob.getProperty().get("cluster.loadBalancer.version", "1.0");      this.loadBalancerPluginManager = new LoadBalancerPluginManager(this.glob, this);      loadBalancer = loadBalancerPluginManager.getPlugin(                this.pluginLoadBalancerType, this.pluginLoadBalancerVersion); // "RoundRobin", "1.0"      if (loadBalancer == null) {         String tmp = "No load balancer plugin type='" + this.pluginLoadBalancerType + "' version='" + this.pluginLoadBalancerVersion + "' found, clustering switched off";         log.severe(tmp);         //Thread.currentThread().dumpStack();         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CONFIGURATION_PLUGINFAILED,            ME, tmp); // is caught in RequestBroker.java      }      this.clusterNodeMap = new TreeMap(new NodeComparator());      this.mapMsgToMasterPluginManager = new MapMsgToMasterPluginManager(this.glob, this);      if (this.glob.getNodeId() == null)         log.severe("Node ID is still unknown, please set '-cluster.node.id' to a unique name.");      else         initClusterNode();      // Look for environment settings to configure startup clustering      String[] env = { "cluster.node", "cluster.node.info", "cluster.node.master" };      for (int ii=0; ii<env.length; ii++) {         Map nodeMap = this.glob.getProperty().get(env[ii], (Map)null);         if (nodeMap != null) {            Iterator iter = nodeMap.keySet().iterator();            if (log.isLoggable(Level.FINE)) log.fine("Found -" + env[ii] + " with " + nodeMap.size() + " array size, ii=" + ii);            while (iter.hasNext()) {               String nodeIdName = (String)iter.next();       // e.g. "heron" from "cluster.node.master[heron]=..."               String xml = (String)nodeMap.get(nodeIdName);  // The "<clusternode>..." xml ASCII string for heron               if (xml == null || xml.length() < 1) {                  log.info("Ignoring environment setting -" + env[ii]);                  continue;               }               if (log.isLoggable(Level.FINE)) log.fine("Parsing environment -" + env[ii] + " for node '" + nodeIdName + "' ...");               /*NodeParser nodeParser =*/ new NodeParser(this.glob, this, xml, sessionInfo); // fills the info to ClusterManager               log.info("Environment for node '" + nodeIdName + "' parsed.");            }         }      }      publish();      subscribe();      if (log.isLoggable(Level.FINEST)) log.finest(toXml());      log.info("Initialized and ready");      postInitialized = true;      if (!lazyConnect)         initConnections();   }   /**    * On xmlBlaster startup we need to wait for incoming messages until clusterManager is ready.     * NOTE: This should be resolved in future by the runlevel manager    * @return false on timeout (manager was never ready)    */   public boolean blockUntilReady() {      if (this.postInitialized)         return true;      for (int i=0; i<2000; i++) {         try { Thread.sleep(10L); } catch( InterruptedException ie) {}         if (this.postInitialized)            return true;      }      log.severe("Waited for " + (2000*10L) + " millis for cluster manager to be ready, giving up");      return false;   }   public boolean isReady() {      return this.postInitialized;   }   /**    * TODO: not implemented yet    * You can't currently configure the cluster setup with messages, only statically    * on startup    */   private void publish() {      if (log.isLoggable(Level.FINE)) log.fine("publish() of cluster internal messages is missing");   /*      StringBuffer keyBuf = new StringBuffer(256);      keyBuf.append("<key oid='").append(Constants.OID_CLUSTER_INFO).append("[").append(getId()).append("]").append("'><").append(Constants.OID_CLUSTER_INFO)("/></key>");      String qos = pubQos.toXml());      XmlKey xmlKey = new XmlKey(msgUnit.getXmlKey(), true);      clone msgUnit      retArr[ii] = publish(unsecureSessionInfo, xmlKey, msgUnit, new PublishQosServer(this.glob, msgUnit.getQos()));   */   }   /**    * TODO: not implemented yet    * You can't currently configure the cluster setup with messages, only statically    * on startup    */   private void subscribe() {      if (log.isLoggable(Level.FINE)) log.fine("subscribe() of cluster internal messages is missing");   }   /**    * Initialize ClusterNode object, containing all informations about myself.     */   private void initClusterNode() throws XmlBlasterException {      this.myClusterNode = new ClusterNode(this.glob, this.glob.getNodeId(), this.sessionInfo);      this.addClusterNode(this.myClusterNode);/*      I_Driver[] drivers = glob.getProtocolManager().getPublicProtocolDrivers();      for (int ii=0; ii<drivers.length; ii++) {         I_Driver driver = drivers[ii];         Address addr = new Address(glob, driver.getProtocolId(), glob.getId());         addr.setRawAddress(driver.getRawAddress());         this.myClusterNode.getNodeInfo().addAddress(addr);      }      if (drivers.length > 0) {         if (log.isLoggable(Level.FINE)) log.trace(ME, "Setting " + drivers.length + " addresses for cluster node '" + getId() + "'");      }*/      //java.util.Vector drivers = glob.getPluginRegistry().getPluginsOfGroup("protocol");      I_Driver[] drivers = this.glob.getPluginRegistry().getPluginsOfInterfaceI_Driver();      for (int i=0; i < drivers.length; i++) {         I_Driver driver = drivers[i];         String rawAddr = driver.getRawAddress();         if (rawAddr != null) {            Address addr = new Address(this.glob, driver.getProtocolId(), this.glob.getId());            addr.setRawAddress(rawAddr);            this.myClusterNode.getNodeInfo().addAddress(addr);         }      }      if (drivers.length > 0) {         if (log.isLoggable(Level.FINE)) log.fine("Setting " + drivers.length + " addresses for cluster node '" + getId() + "'");      }      else {         log.severe("ClusterNode is not properly initialized, no protocol pluging - no local xmlBlaster (node=" + getId() + ") address available");         Thread.dumpStack();      }   }   /**    * Check if supplied address would connect to our own node.     */   public final boolean isLocalAddress(Address other) {      return getMyClusterNode().getNodeInfo().contains(other);   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -