⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clustermanager.java

📁 java开源的企业总线.xmlBlaster
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
   /**    * Return myself    */   public ClusterNode getMyClusterNode() {      return this.myClusterNode;   }   /**    * Access the unique cluster node id (as NodeId object).     */   public final NodeId getNodeId() {      return this.glob.getNodeId();   }   /**    * Access the unique cluster node id (as a String).     * @return The name of this xmlBlaster instance, e.g. "heron.mycompany.com"    */   public final String getId() {      return this.glob.getId();   }   /**    * The plugin loader instance to map messages to their master node.     */   public MapMsgToMasterPluginManager getMapMsgToMasterPluginManager() {      return this.mapMsgToMasterPluginManager;   }   /**    * @return null if no forwarding is done and we are the master of this message ourself<br />    *         <pre>&lt;qos>&lt;state id='OK' info='QUEUED[bilbo]'/>&lt;/qos></pre> if message is    *         tailed back because cluster node is temporary not available. The message will    *         be flushed on reconnect.<br />    *         Otherwise the normal publish return value of the remote cluster node    * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller<br />    *         ErrorCode.USER_PTP_UNKNOWNDESTINATION if destination cluster node is not found<br />    *         ErrorCode.RESOURCE_CLUSTER_NOTAVAILABLE is destination cluster node is known but down       */   public PublishReturnQos forwardPtpPublish(SessionInfo publisherSession, MsgUnit msgUnit, Destination destination) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardPtpPublish(" + msgUnit.getLogId() + ", " + destination.getDestination() + ")");      if (destination.getDestination().getNodeId() == null)         return null;      // First check if a specific not local nodeId is given      ClusterNode clusterNode = getClusterNode(destination.getDestination().getNodeId());      if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() +                   " trying node " + ((clusterNode==null)?"null":clusterNode.getId()) +                   " isNodeIdExplicitlyGiven=" + destination.getDestination().isNodeIdExplicitlyGiven());      if (clusterNode != null && clusterNode.isLocalNode()) {         if (destination.getDestination().isNodeIdExplicitlyGiven()) {            if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() +                         "' destination " + destination.getDestination() + " destination cluster node reached");            return null; // handle locally         }      }      if (clusterNode != null && destination.getDestination().isNodeIdExplicitlyGiven()) {         if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() +                        "' destination " + destination.getDestination() + " remote destination cluster node found");      }      else {         // Ask the plugin         NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit, destination);         if (nodeDomainInfo == null)            return null;         clusterNode =  nodeDomainInfo.getClusterNode();      }      if (clusterNode.isLocalNode())         return null;      I_XmlBlasterAccess con = clusterNode.getXmlBlasterAccess();      if (con == null) {         String text = "Cluster node '" + destination.getDestination() + "' is known but not reachable, message '" + msgUnit.getLogId() + "' is lost";         log.warning(text);         throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CLUSTER_NOTAVAILABLE, ME, text);      }      if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() +                   " is now forwarded to node " + clusterNode.getId());      // To be on the save side we clone the message      return con.publish(msgUnit.getClone());   }   /**    * @return null if no forwarding is done, if we are the master of this message ourself<br />    *         <pre>&lt;qos>&lt;state id='OK' info='QUEUED[bilbo]'/>&lt;/qos></pre> if message is    *         tailed back because cluster node is temporary not available. The message will    *         be flushed on reconnect.<br />    *         Otherwise the normal publish return value of the remote cluster node and the responsible    *         NodeDomainInfo instance.      * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller    */   public PublishRetQosWrapper forwardPublish(SessionInfo publisherSession, MsgUnit msgUnit) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardPublish(" + msgUnit.getLogId() + ")");      NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit);      if (nodeDomainInfo == null)         return null;      I_XmlBlasterAccess con =  nodeDomainInfo.getClusterNode().getXmlBlasterAccess();      if (con == null)         return null;      QosData publishQos = msgUnit.getQosData();      if (nodeDomainInfo.getDirtyRead() == true) {         // mark QoS of published message that we dirty read the message:         RouteInfo[] ris = publishQos.getRouteNodes();         if (ris == null || ris.length < 1) {            log.severe("The route info for '" + msgUnit.getLogId() + "' is missing");            Thread.dumpStack();         }         else {            ris[ris.length-1].setDirtyRead(true);         }      }      // Set the new qos ...      MsgUnit msgUnitShallowClone = new MsgUnit(msgUnit, null, null, publishQos);      return new PublishRetQosWrapper(nodeDomainInfo, con.publish(msgUnitShallowClone));   }   /**    * @return null if no forwarding is done, if we are the master of this message ourself<br />    *         <pre>&lt;qos>&lt;state id='OK' info='QUEUED[bilbo]'/>&lt;/qos></pre> if message is    *         tailed back because cluster node is temporary not available. The message will    *         be flushed on reconnect.<br />    *         Otherwise the normal subscribe return value of the remote cluster node.      * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller    */   public SubscribeReturnQos forwardSubscribe(SessionInfo publisherSession, QueryKeyData xmlKey, SubscribeQosServer subscribeQos) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardSubscribe(" + xmlKey.getOid() + ")");      MsgUnit msgUnit = new MsgUnit(xmlKey, (byte[])null, subscribeQos.getData());      NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit);      if (nodeDomainInfo == null)         return null;      I_XmlBlasterAccess con =  nodeDomainInfo.getClusterNode().getXmlBlasterAccess();      if (con == null) {         if (log.isLoggable(Level.FINE)) log.fine("forwardSubscribe - Nothing to forward");         return null;      }      SubscribeQos subscribeQos2 = new SubscribeQos(this.glob, subscribeQos.getData());      // The cluster master needs to accept our "__subId:heron-3456646466"            ClientProperty clientProperty = subscribeQos2.getClientProperty(Constants.PERSISTENCE_ID);      if (clientProperty != null) {         // remove marker that this is from persistent store, the other node would react wrong         subscribeQos2 = new SubscribeQos(this.glob, (QueryQosData)subscribeQos.getData().clone());         subscribeQos2.getData().getClientProperties().remove(Constants.PERSISTENCE_ID);      }            // As we forward many subscribes probably accessing the      // same message but only want one update.      // We cache this update and distribute to all our clients      // TODO: As an unSubscribe() deletes all subscribes() at once      //       we have not yet activated the new desired use of multiSubscribe      //       We need to add some sort of subscribe reference counting      //       preferably in the server implementation (see RequestBroker.java)      // TODO: As soon we have implemented it here we need to remove       //       data.setDuplicateUpdates(false); in NodeInfo.java      //subscribeQos2.setMultiSubscribe(false);      return con.subscribe(new SubscribeKey(this.glob, xmlKey), subscribeQos2);   }   /**    * @return null if no forwarding is done, if we are the master of this message ourself<br />    *         <pre>&lt;qos>&lt;state id='OK' info='QUEUED[bilbo]'/>&lt;/qos></pre> if message is    *         tailed back because cluster node is temporary not available. The message will    *         be flushed on reconnect.<br />    *         Otherwise the normal unSubscribe return value of the remote cluster node.      * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller    */   public UnSubscribeReturnQos[] forwardUnSubscribe(SessionInfo publisherSession, QueryKeyData xmlKey, UnSubscribeQosServer unSubscribeQos) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardUnSubscribe(" + xmlKey.getOid() + ")");      MsgUnit msgUnit = new MsgUnit(xmlKey, (byte[])null, unSubscribeQos.getData());      NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit);      if (nodeDomainInfo == null)         return null;      I_XmlBlasterAccess con =  nodeDomainInfo.getClusterNode().getXmlBlasterAccess();      if (con == null) {         if (log.isLoggable(Level.FINE)) log.fine("forwardUnSubscribe - Nothing to forward");         return null;      }      return con.unSubscribe(new UnSubscribeKey(this.glob, xmlKey), new UnSubscribeQos(this.glob, unSubscribeQos.getData()));   }   /**    * @return null if no forwarding is done, if we are the master of this message ourself<br />    *         msgUnit.length==0 if message is    *         tailed back because cluster node is temporary not available. The command will    *         be flushed on reconnect.<br />    *         Otherwise the normal get return value of the remote cluster node.      * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller    */   public MsgUnit[] forwardGet(SessionInfo publisherSession, QueryKeyData xmlKey, GetQosServer getQos) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardGet(" + xmlKey.getOid() + ")");      MsgUnit msgUnit = new MsgUnit(xmlKey, new byte[0], getQos.getData());      NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit);      if (nodeDomainInfo == null)         return null;      I_XmlBlasterAccess con =  nodeDomainInfo.getClusterNode().getXmlBlasterAccess();      if (con == null) {         if (log.isLoggable(Level.FINE)) log.fine("forwardGet - Nothing to forward");         return null;      }      return con.get(new GetKey(glob, xmlKey), new GetQos(glob, getQos.getData()));   }   /**    * @return null if no forwarding is done, if we are the master of this message ourself<br />    *         <pre>&lt;qos>&lt;state id='OK' info='QUEUED[bilbo]/>&lt;/qos></pre> if message is    *         tailed back because cluster node is temporary not available. The command will    *         be flushed on reconnect.<br />    *         Otherwise the normal erase return value of the remote cluster node.      * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller    */   public EraseReturnQos[] forwardErase(SessionInfo publisherSession, QueryKeyData xmlKey, EraseQosServer eraseQos) throws XmlBlasterException {      if (log.isLoggable(Level.FINER)) log.finer("Entering forwardErase(" + xmlKey.getOid() + ")");      MsgUnit msgUnit = new MsgUnit(xmlKey, new byte[0], eraseQos.getData());      NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit);      if (nodeDomainInfo == null)         return null;      I_XmlBlasterAccess con =  nodeDomainInfo.getClusterNode().getXmlBlasterAccess();      if (con == null) {         if (log.isLoggable(Level.FINE)) log.fine("forwardErase - Nothing to forward");         return null;      }      return con.erase(new EraseKey(glob, xmlKey), new EraseQos(glob, eraseQos.getData()));   }   /**    * Add a new node info object or overwrite an existing one.     * @param The ClusterNode instance    * @exception  IllegalArgumentException    */   public final void addClusterNode(ClusterNode clusterNode) {      if (clusterNode == null || clusterNode.getNodeId() == null) {         Thread.dumpStack();         log.severe("Illegal argument in addClusterNode()");         throw new IllegalArgumentException("Illegal argument in addClusterNode()");      }      this.clusterNodesCache = null; // reset cache      this.clusterNodeMap.put(clusterNode.getId(), clusterNode);   }   /**    * Return the map containing all known cluster nodes.     * @return never null, map contains ClusterNode objects, please treat as read only.    */   public Map getClusterNodeMap() {      return this.clusterNodeMap;   }   /**    * @return ClusterNode[] which is a snapshot copy of our map    */   public ClusterNode[] getClusterNodes() {      if (this.clusterNodesCache == null) {         if (this.clusterNodeMap == null) {            this.clusterNodesCache = new ClusterNode[0];         }         else {            this.clusterNodesCache = (ClusterNode[])this.clusterNodeMap.values().toArray(new ClusterNode[this.clusterNodeMap.size()]);         }      }      return this.clusterNodesCache;   }   public int getNumNodes() {      if (this.clusterNodeMap == null) return 1; // The caller is a single node      return this.clusterNodeMap.size();   }   /**    * Access a list of known cluster nodes e.g. "heron,avalon,bilbo,frodo"    * @return If cluster is switched off just our node    */   public final String getNodeList() {      int numNodes = getNumNodes();      if (numNodes <= 1)         return glob.getId();      StringBuffer sb = new StringBuffer(numNodes * 30);      ClusterNode[] clusterNodes = getClusterNodes();      for(int i=0; i<clusterNodes.length; i++) {         if (sb.length() > 0)            sb.append(",");         sb.append(clusterNodes[i].getId());      }      return sb.toString();   }   /**    * Access a list of known cluster nodes e.g. "heron","avalon","bilbo","frodo"    * @return If cluster is switched off just our node    */   public final String[] getNodes() {      ClusterNode[] clusterNodes = getClusterNodes();      if (clusterNodes == null || clusterNodes.length == 0) {         return new String[0];      }      String[] nodes = new String[clusterNodes.length];      for(int i=0; i<clusterNodes.length; i++) {         nodes[i] = clusterNodes[i].getId();      }      return nodes;   }   /**    * Access the informations belonging to a node id    * @return The ClusterNode instance or null if unknown    */   public final ClusterNode getClusterNode(NodeId nodeId) {      return getClusterNode(nodeId.getId());   }   /**    * Access the informations belonging to a node id    * @param The cluster node id as a string    * @return The ClusterNode instance or null if unknown    */   public final ClusterNode getClusterNode(String id) {      if (this.clusterNodeMap == null) return null;      return (ClusterNode)this.clusterNodeMap.get(id);   }   /*   public final void addConnection(NodeId nodeId, I_XmlBlasterAccess connection) throws XmlBlasterException {      ClusterNode info = getClusterNode(nodeId);      if (info == null)         throw new XmlBlasterException(ME, "Unknown node id = " + nodeId.toString() + ", can't add xmlBlasterConnection");      info.setI_XmlBlasterAccess(connection);   }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -