⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clustermanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
   public final void removeConnection(NodeId nodeId) {      ClusterNode info = getClusterNode(nodeId);      if (info == null) {         log.error(ME, "Unknown node id = " + nodeId.toString() + ", can't remove xmlBlasterConnection");         return;      }      info.resetI_XmlBlasterAccess();   }   */   /**    * Usually the connection is established on demand (a message wants to travel to a node).     * <p />    * Here you can force to establish connections to all known cluster nodes.    */   private void initConnections() throws XmlBlasterException {      Iterator it = getClusterNodeMap().values().iterator();      // for each cluster node ...      while (it.hasNext()) {         ClusterNode clusterNode = (ClusterNode)it.next();         // force a connect (not allowed and local node are checked to do nothing) ...         clusterNode.getXmlBlasterAccess();    // should we check for Exception and proceed with other nodes ?      }   }   public final NodeDomainInfo getConnection(SessionInfo publisherSession, MsgUnit msgUnit) throws XmlBlasterException {      return getConnection(publisherSession, msgUnit, null);   }   /**    * Get connection to the master node (or a node at a closer stratum to the master).     * @param destination For PtP, else null    * @return null if local node, otherwise access other node with <code>nodeDomainInfo.getClusterNode().getI_XmlBlasterAccess()</code>    */   public final NodeDomainInfo getConnection(SessionInfo publisherSession, MsgUnit msgUnit, Destination destination) throws XmlBlasterException {      if (!postInitialized) {         // !!! we need proper run level initialization         if (log.isLoggable(Level.FINE)) log.fine("Entering getConnection(" + msgUnit.getLogId() + "), but clustering is not ready, handling in local node");         return null;      }      if (log.isLoggable(Level.FINER)) log.finer("Entering getConnection(" + msgUnit.getLogId() + "), testing " + getClusterNodeMap().size() + " known cluster nodes ...");      // e.g. unSubscribe(__subId:heron-55) shall be forwarded      if (msgUnit.getQosData().isPublish() && msgUnit.getKeyData().isInternal()) {         // key oid can be null for XPath subscription         // internal system messages are handled locally         String keyOid = msgUnit.getKeyOid();         if (keyOid.startsWith(Constants.INTERNAL_OID_CLUSTER_PREFIX))            log.severe("Forwarding of '" + msgUnit.getLogId() + "' implementation is missing");            // !!! TODO: forward system messages with cluster info of foreign nodes!         return null;      }      // Search all other cluster nodes to find the masters of this message ...      // NOTE: If no filters are used, the masterSet=f(msgUnit) could be cached for performance gain      //       Cache implementation is currently missing      Set masterSet = new TreeSet(); // Contains the NodeDomainInfo objects which match this message                                     // Sorted by stratum (0 is the first entry) -> see NodeDomainInfo.compareTo      int numRulesFound = 0;                             // For nicer logging of warnings      QosData publishQos = msgUnit.getQosData();      if (publishQos.count(glob.getNodeId()) > 1) { // Checked in RequestBroker as well with warning         log.warning("Warning, message '" + msgUnit.getLogId() +            "' passed my node id='" + glob.getId() + "' before, we have a circular routing problem, keeping message locally");         return null;      }      Iterator it = getClusterNodeMap().values().iterator();      // for each cluster node ...      while (it.hasNext()) {         ClusterNode clusterNode = (ClusterNode)it.next();         if (clusterNode.getDomainInfoMap().size() < 1)            continue;         if (clusterNode.isAllowed() == false) {            if (log.isLoggable(Level.FINE)) log.fine("Ignoring master node id='" + clusterNode.getId() + "' because it is not available");            continue;         }         if (!clusterNode.isLocalNode() && publishQos.count(clusterNode.getNodeId()) > 0) {            if (log.isLoggable(Level.FINE)) log.fine("Ignoring node id='" + clusterNode.getId() + "' for routing, message '" +                            msgUnit.getLogId() + "' has been there already");            continue;         }         Iterator domains = clusterNode.getDomainInfoMap().values().iterator();         if (log.isLoggable(Level.FINE)) log.fine("Testing " + clusterNode.getDomainInfoMap().size() + " domains rules of node " +                                  clusterNode.getId() + " for " + msgUnit.getLogId());         numRulesFound += clusterNode.getDomainInfoMap().size();         // for each domain mapping rule ...         while (domains.hasNext()) {            NodeDomainInfo nodeDomainInfo = (NodeDomainInfo)domains.next();            I_MapMsgToMasterId domainMapper = this.mapMsgToMasterPluginManager.getMapMsgToMasterId(                                 nodeDomainInfo.getType(), nodeDomainInfo.getVersion(), // "DomainToMaster", "1.0"                                 msgUnit.getContentMime(), msgUnit.getContentMimeExtended());            if (domainMapper == null) {               log.warning("No domain mapping plugin type='" + nodeDomainInfo.getType() + "' version='" + nodeDomainInfo.getVersion() +                              "' found for message mime='" + msgUnit.getContentMime() + "' and '" + msgUnit.getContentMimeExtended() +                              "' ignoring rules " + nodeDomainInfo.toXml());               continue;            }            // Now invoke the plugin to find out who is the master ...            nodeDomainInfo = domainMapper.getMasterId(nodeDomainInfo, msgUnit);            if (nodeDomainInfo != null) {               masterSet.add(nodeDomainInfo);               break; // found one            }         }      }      if (masterSet.size() < 1) {         if (numRulesFound == 0) {            if (log.isLoggable(Level.FINE)) log.fine("Using local node for message, no master mapping rules are known.");         }         else {            if (destination == null) {               log.info("No master found for message '" + msgUnit.getLogId() + "' mime='" +                         msgUnit.getContentMime() + "' domain='" + msgUnit.getDomain() + "', using local node.");            }            else {               if (log.isLoggable(Level.FINE)) log.fine("No master found for PtP message '" + msgUnit.getLogId() + "' mime='" +                         msgUnit.getContentMime() + "' domain='" + msgUnit.getDomain() + "', using local node.");                        }         }         return null;      }      if (masterSet.size() > 1) {         if (log.isLoggable(Level.FINE)) log.fine(masterSet.size() + " masters found for message '" + msgUnit.getLogId() +                                      "' domain='" + msgUnit.getDomain() + "'");      }      NodeDomainInfo nodeDomainInfo = loadBalancer.getClusterNode(masterSet); // Invoke for masterSet.size()==1 as well, the balancer may choose to ignore it      /*      if (nodeDomainInfo == null) {         log.error(ME, "Message '" + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'" +                   "has no master, message is lost (implementation to handle this case is missing)!");         return null;      }      */      if (nodeDomainInfo == null || nodeDomainInfo.getClusterNode().isLocalNode()) {         if (log.isLoggable(Level.FINE)) log.fine("Using local node '" + getMyClusterNode().getId() + "' as master for message '"               + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'");         if (log.isLoggable(Level.FINEST)) log.finest("Received message at master node: " + msgUnit.toXml());         return null;      }      else {         if (log.isLoggable(Level.FINE)) log.fine("Using master node '" + nodeDomainInfo.getClusterNode().getId() + "' for message '"               + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'");      }      return nodeDomainInfo;   }   public final I_XmlBlasterAccess getConnection(NodeId nodeId) {      log.severe("getConnection() is not implemented");      return null;      /*      ClusterNode clusterNode = getClusterNode(nodeId);      return (I_XmlBlasterAccess)connectionMap.get(nodeId.getId());      */   }   public void shutdown() {      if (this.clusterNodeMap != null && this.clusterNodeMap.size() > 0) {         ClusterNode[] clusterNodes = getClusterNodes();         for(int i=0; i<clusterNodes.length; i++) {            clusterNodes[i].shutdown();         }         this.clusterNodesCache = null;         this.clusterNodeMap.clear();      }      if (this.glob != null)         this.glob.unregisterMBean(this.mbeanHandle);   }   /**    * Dump state of this object into a XML ASCII string.    */   public final String toXml() {      return toXml((String)null);   }   /**    * Dump state of this object into a XML ASCII string.    * @param extraOffset indenting of tags for nice output    */   public final String toXml(String extraOffset) {      StringBuffer sb = new StringBuffer(1024);      if (extraOffset == null) extraOffset = "";      String offset = Constants.OFFSET + extraOffset;      sb.append(offset).append("<clusterManager>");      if (this.clusterNodeMap != null && this.clusterNodeMap.size() > 0) {         ClusterNode[] clusterNodes = getClusterNodes();         for(int i=0; i<clusterNodes.length; i++) {            sb.append(clusterNodes[i].toXml(extraOffset + Constants.INDENT, (Properties)null));         }      }      sb.append(offset).append("</clusterManager>");      return sb.toString();   }   /**    * Sorts the cluster nodes for the clusterNodeMap    * <ol>    *   <li>First is the local node</li>    *   <li>Others by node id</li>    * </ol>    */   class NodeComparator implements Comparator   {      /**       * We compare the cluster node id string.        */      public final int compare(Object o1, Object o2) {         String id1 = (String)o1;         String id2 = (String)o2;         //log.info("NodeComparator", "Compare " + id1 + " to " + id2);         if (id1.equals(id2))            return 0;         if (id1.equals(glob.getId())) // id1 is local node            return -1;         if (id2.equals(glob.getId())) // id2 is local node            return 1;         return id1.compareTo(id2);      }   }   /**    * Sorts the cluster nodes for the masterSet    * <ol>    *   <li>First is the local node</li>    *   <li>Others by node id</li>    * </ol>    */    /*   class MasterNodeComparator implements Comparator   {      public final int compare(Object o1, Object o2) {         NodeDomainInfo id1 = (NodeDomainInfo)o1;         NodeDomainInfo id2 = (NodeDomainInfo)o2;         //log.info("MasterNodeComparator", "Compare " + id1 + " to " + id2);         if (id1.equals(id2))            return 0;         if (id1.equals(glob.getId())) // id1 is local node            return -1;         if (id2.equals(glob.getId())) // id2 is local node            return 1;         return id1.compareTo(id2);      }   }  */   /**    * A human readable name of the listener for logging.     * <p />    * Enforced by I_RunlevelListener    */   public String getName() {      return ME;   }   /**    * Invoked on run level change, see RunlevelManager.RUNLEVEL_HALTED and RunlevelManager.RUNLEVEL_RUNNING    * <p />    * Enforced by I_RunlevelListener    */   public void runlevelChange(int from, int to, boolean force) throws org.xmlBlaster.util.XmlBlasterException {      //if (log.isLoggable(Level.FINER)) log.call(ME, "Changing from run level=" + from + " to level=" + to + " with force=" + force);      if (to == from)         return;      if (this.glob.useCluster() == false)         return;      if (to > from) { // startup         if (to == RunlevelManager.RUNLEVEL_STANDBY_POST) {            if (this.pluginInfo == null) { // Old style: Instantiate hard coded by RequestBroker.java               postInit(); // Assuming the protocol drivers are initialized to deliver their addresses, currently they are started at run level 3            }         }      }      if (to < from) { // shutdown         if (to == RunlevelManager.RUNLEVEL_STANDBY) {            if (this.pluginInfo == null) { // Old style: Instantiate hard coded by RequestBroker.java               shutdown();            }         }      }   }   /**    * @return A link for JMX usage    */   public java.lang.String getUsageUrl() {      return ServerScope.getJavadocUrl(this.getClass().getName(), null);   }   /* dummy to have a copy/paste functionality in jconsole */   public void setUsageUrl(java.lang.String url) {   }   /**    * @return For JMX usage    */   public String usage() {      return staticUsage();   }   public boolean isShutdown() {      return this.clusterNodeMap.size() == 0;   }   /**    * Command line usage.    * <p />    * These variables may be set in your property file as well.    * Don't use the "-" prefix there.    * <p />    * Set the verbosity when loading properties (outputs with System.out).    * <p />    * 0=nothing, 1=info, 2=trace, configure with    * <pre>    * java -Dproperty.verbose 2 ...    *    * java org.xmlBlaster.Main -property.verbose 2    * </pre>    */   public static String staticUsage()   {      StringBuffer sb = new StringBuffer(512);      sb.append("Cluster support (activated in xmlBlasterPlugins.xml):\n");      sb.append("   -cluster.node.id    A unique name for this xmlBlaster instance, e.g. 'com.myCompany.myHost'.\n");      sb.append("                       If not specified a unique name is chosen and displayed on command line.\n");      sb.append("   ...                 See http://www.xmlBlaster.org/xmlBlaster/doc/requirements/cluster.html\n");      return sb.toString();   }} // class ClusterManager

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -