📄 clustermanager.java
字号:
public final void removeConnection(NodeId nodeId) { ClusterNode info = getClusterNode(nodeId); if (info == null) { log.error(ME, "Unknown node id = " + nodeId.toString() + ", can't remove xmlBlasterConnection"); return; } info.resetI_XmlBlasterAccess(); } */ /** * Usually the connection is established on demand (a message wants to travel to a node). * <p /> * Here you can force to establish connections to all known cluster nodes. */ private void initConnections() throws XmlBlasterException { Iterator it = getClusterNodeMap().values().iterator(); // for each cluster node ... while (it.hasNext()) { ClusterNode clusterNode = (ClusterNode)it.next(); // force a connect (not allowed and local node are checked to do nothing) ... clusterNode.getXmlBlasterAccess(); // should we check for Exception and proceed with other nodes ? } } public final NodeDomainInfo getConnection(SessionInfo publisherSession, MsgUnit msgUnit) throws XmlBlasterException { return getConnection(publisherSession, msgUnit, null); } /** * Get connection to the master node (or a node at a closer stratum to the master). * @param destination For PtP, else null * @return null if local node, otherwise access other node with <code>nodeDomainInfo.getClusterNode().getI_XmlBlasterAccess()</code> */ public final NodeDomainInfo getConnection(SessionInfo publisherSession, MsgUnit msgUnit, Destination destination) throws XmlBlasterException { if (!postInitialized) { // !!! we need proper run level initialization if (log.isLoggable(Level.FINE)) log.fine("Entering getConnection(" + msgUnit.getLogId() + "), but clustering is not ready, handling in local node"); return null; } if (log.isLoggable(Level.FINER)) log.finer("Entering getConnection(" + msgUnit.getLogId() + "), testing " + getClusterNodeMap().size() + " known cluster nodes ..."); // e.g. unSubscribe(__subId:heron-55) shall be forwarded if (msgUnit.getQosData().isPublish() && msgUnit.getKeyData().isInternal()) { // key oid can be null for XPath subscription // internal system messages are handled locally String keyOid = msgUnit.getKeyOid(); if (keyOid.startsWith(Constants.INTERNAL_OID_CLUSTER_PREFIX)) log.severe("Forwarding of '" + msgUnit.getLogId() + "' implementation is missing"); // !!! TODO: forward system messages with cluster info of foreign nodes! return null; } // Search all other cluster nodes to find the masters of this message ... // NOTE: If no filters are used, the masterSet=f(msgUnit) could be cached for performance gain // Cache implementation is currently missing Set masterSet = new TreeSet(); // Contains the NodeDomainInfo objects which match this message // Sorted by stratum (0 is the first entry) -> see NodeDomainInfo.compareTo int numRulesFound = 0; // For nicer logging of warnings QosData publishQos = msgUnit.getQosData(); if (publishQos.count(glob.getNodeId()) > 1) { // Checked in RequestBroker as well with warning log.warning("Warning, message '" + msgUnit.getLogId() + "' passed my node id='" + glob.getId() + "' before, we have a circular routing problem, keeping message locally"); return null; } Iterator it = getClusterNodeMap().values().iterator(); // for each cluster node ... while (it.hasNext()) { ClusterNode clusterNode = (ClusterNode)it.next(); if (clusterNode.getDomainInfoMap().size() < 1) continue; if (clusterNode.isAllowed() == false) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring master node id='" + clusterNode.getId() + "' because it is not available"); continue; } if (!clusterNode.isLocalNode() && publishQos.count(clusterNode.getNodeId()) > 0) { if (log.isLoggable(Level.FINE)) log.fine("Ignoring node id='" + clusterNode.getId() + "' for routing, message '" + msgUnit.getLogId() + "' has been there already"); continue; } Iterator domains = clusterNode.getDomainInfoMap().values().iterator(); if (log.isLoggable(Level.FINE)) log.fine("Testing " + clusterNode.getDomainInfoMap().size() + " domains rules of node " + clusterNode.getId() + " for " + msgUnit.getLogId()); numRulesFound += clusterNode.getDomainInfoMap().size(); // for each domain mapping rule ... while (domains.hasNext()) { NodeDomainInfo nodeDomainInfo = (NodeDomainInfo)domains.next(); I_MapMsgToMasterId domainMapper = this.mapMsgToMasterPluginManager.getMapMsgToMasterId( nodeDomainInfo.getType(), nodeDomainInfo.getVersion(), // "DomainToMaster", "1.0" msgUnit.getContentMime(), msgUnit.getContentMimeExtended()); if (domainMapper == null) { log.warning("No domain mapping plugin type='" + nodeDomainInfo.getType() + "' version='" + nodeDomainInfo.getVersion() + "' found for message mime='" + msgUnit.getContentMime() + "' and '" + msgUnit.getContentMimeExtended() + "' ignoring rules " + nodeDomainInfo.toXml()); continue; } // Now invoke the plugin to find out who is the master ... nodeDomainInfo = domainMapper.getMasterId(nodeDomainInfo, msgUnit); if (nodeDomainInfo != null) { masterSet.add(nodeDomainInfo); break; // found one } } } if (masterSet.size() < 1) { if (numRulesFound == 0) { if (log.isLoggable(Level.FINE)) log.fine("Using local node for message, no master mapping rules are known."); } else { if (destination == null) { log.info("No master found for message '" + msgUnit.getLogId() + "' mime='" + msgUnit.getContentMime() + "' domain='" + msgUnit.getDomain() + "', using local node."); } else { if (log.isLoggable(Level.FINE)) log.fine("No master found for PtP message '" + msgUnit.getLogId() + "' mime='" + msgUnit.getContentMime() + "' domain='" + msgUnit.getDomain() + "', using local node."); } } return null; } if (masterSet.size() > 1) { if (log.isLoggable(Level.FINE)) log.fine(masterSet.size() + " masters found for message '" + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'"); } NodeDomainInfo nodeDomainInfo = loadBalancer.getClusterNode(masterSet); // Invoke for masterSet.size()==1 as well, the balancer may choose to ignore it /* if (nodeDomainInfo == null) { log.error(ME, "Message '" + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'" + "has no master, message is lost (implementation to handle this case is missing)!"); return null; } */ if (nodeDomainInfo == null || nodeDomainInfo.getClusterNode().isLocalNode()) { if (log.isLoggable(Level.FINE)) log.fine("Using local node '" + getMyClusterNode().getId() + "' as master for message '" + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'"); if (log.isLoggable(Level.FINEST)) log.finest("Received message at master node: " + msgUnit.toXml()); return null; } else { if (log.isLoggable(Level.FINE)) log.fine("Using master node '" + nodeDomainInfo.getClusterNode().getId() + "' for message '" + msgUnit.getLogId() + "' domain='" + msgUnit.getDomain() + "'"); } return nodeDomainInfo; } public final I_XmlBlasterAccess getConnection(NodeId nodeId) { log.severe("getConnection() is not implemented"); return null; /* ClusterNode clusterNode = getClusterNode(nodeId); return (I_XmlBlasterAccess)connectionMap.get(nodeId.getId()); */ } public void shutdown() { if (this.clusterNodeMap != null && this.clusterNodeMap.size() > 0) { ClusterNode[] clusterNodes = getClusterNodes(); for(int i=0; i<clusterNodes.length; i++) { clusterNodes[i].shutdown(); } this.clusterNodesCache = null; this.clusterNodeMap.clear(); } if (this.glob != null) this.glob.unregisterMBean(this.mbeanHandle); } /** * Dump state of this object into a XML ASCII string. */ public final String toXml() { return toXml((String)null); } /** * Dump state of this object into a XML ASCII string. * @param extraOffset indenting of tags for nice output */ public final String toXml(String extraOffset) { StringBuffer sb = new StringBuffer(1024); if (extraOffset == null) extraOffset = ""; String offset = Constants.OFFSET + extraOffset; sb.append(offset).append("<clusterManager>"); if (this.clusterNodeMap != null && this.clusterNodeMap.size() > 0) { ClusterNode[] clusterNodes = getClusterNodes(); for(int i=0; i<clusterNodes.length; i++) { sb.append(clusterNodes[i].toXml(extraOffset + Constants.INDENT, (Properties)null)); } } sb.append(offset).append("</clusterManager>"); return sb.toString(); } /** * Sorts the cluster nodes for the clusterNodeMap * <ol> * <li>First is the local node</li> * <li>Others by node id</li> * </ol> */ class NodeComparator implements Comparator { /** * We compare the cluster node id string. */ public final int compare(Object o1, Object o2) { String id1 = (String)o1; String id2 = (String)o2; //log.info("NodeComparator", "Compare " + id1 + " to " + id2); if (id1.equals(id2)) return 0; if (id1.equals(glob.getId())) // id1 is local node return -1; if (id2.equals(glob.getId())) // id2 is local node return 1; return id1.compareTo(id2); } } /** * Sorts the cluster nodes for the masterSet * <ol> * <li>First is the local node</li> * <li>Others by node id</li> * </ol> */ /* class MasterNodeComparator implements Comparator { public final int compare(Object o1, Object o2) { NodeDomainInfo id1 = (NodeDomainInfo)o1; NodeDomainInfo id2 = (NodeDomainInfo)o2; //log.info("MasterNodeComparator", "Compare " + id1 + " to " + id2); if (id1.equals(id2)) return 0; if (id1.equals(glob.getId())) // id1 is local node return -1; if (id2.equals(glob.getId())) // id2 is local node return 1; return id1.compareTo(id2); } } */ /** * A human readable name of the listener for logging. * <p /> * Enforced by I_RunlevelListener */ public String getName() { return ME; } /** * Invoked on run level change, see RunlevelManager.RUNLEVEL_HALTED and RunlevelManager.RUNLEVEL_RUNNING * <p /> * Enforced by I_RunlevelListener */ public void runlevelChange(int from, int to, boolean force) throws org.xmlBlaster.util.XmlBlasterException { //if (log.isLoggable(Level.FINER)) log.call(ME, "Changing from run level=" + from + " to level=" + to + " with force=" + force); if (to == from) return; if (this.glob.useCluster() == false) return; if (to > from) { // startup if (to == RunlevelManager.RUNLEVEL_STANDBY_POST) { if (this.pluginInfo == null) { // Old style: Instantiate hard coded by RequestBroker.java postInit(); // Assuming the protocol drivers are initialized to deliver their addresses, currently they are started at run level 3 } } } if (to < from) { // shutdown if (to == RunlevelManager.RUNLEVEL_STANDBY) { if (this.pluginInfo == null) { // Old style: Instantiate hard coded by RequestBroker.java shutdown(); } } } } /** * @return A link for JMX usage */ public java.lang.String getUsageUrl() { return ServerScope.getJavadocUrl(this.getClass().getName(), null); } /* dummy to have a copy/paste functionality in jconsole */ public void setUsageUrl(java.lang.String url) { } /** * @return For JMX usage */ public String usage() { return staticUsage(); } public boolean isShutdown() { return this.clusterNodeMap.size() == 0; } /** * Command line usage. * <p /> * These variables may be set in your property file as well. * Don't use the "-" prefix there. * <p /> * Set the verbosity when loading properties (outputs with System.out). * <p /> * 0=nothing, 1=info, 2=trace, configure with * <pre> * java -Dproperty.verbose 2 ... * * java org.xmlBlaster.Main -property.verbose 2 * </pre> */ public static String staticUsage() { StringBuffer sb = new StringBuffer(512); sb.append("Cluster support (activated in xmlBlasterPlugins.xml):\n"); sb.append(" -cluster.node.id A unique name for this xmlBlaster instance, e.g. 'com.myCompany.myHost'.\n"); sb.append(" If not specified a unique name is chosen and displayed on command line.\n"); sb.append(" ... See http://www.xmlBlaster.org/xmlBlaster/doc/requirements/cluster.html\n"); return sb.toString(); }} // class ClusterManager
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -