📄 clustermanager.java
字号:
/** * Return myself */ public ClusterNode getMyClusterNode() { return this.myClusterNode; } /** * Access the unique cluster node id (as NodeId object). */ public final NodeId getNodeId() { return this.glob.getNodeId(); } /** * Access the unique cluster node id (as a String). * @return The name of this xmlBlaster instance, e.g. "heron.mycompany.com" */ public final String getId() { return this.glob.getId(); } /** * The plugin loader instance to map messages to their master node. */ public MapMsgToMasterPluginManager getMapMsgToMasterPluginManager() { return this.mapMsgToMasterPluginManager; } /** * @return null if no forwarding is done and we are the master of this message ourself<br /> * <pre><qos><state id='OK' info='QUEUED[bilbo]'/></qos></pre> if message is * tailed back because cluster node is temporary not available. The message will * be flushed on reconnect.<br /> * Otherwise the normal publish return value of the remote cluster node * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller<br /> * ErrorCode.USER_PTP_UNKNOWNDESTINATION if destination cluster node is not found<br /> * ErrorCode.RESOURCE_CLUSTER_NOTAVAILABLE is destination cluster node is known but down */ public PublishReturnQos forwardPtpPublish(SessionInfo publisherSession, MsgUnit msgUnit, Destination destination) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardPtpPublish(" + msgUnit.getLogId() + ", " + destination.getDestination() + ")"); if (destination.getDestination().getNodeId() == null) return null; // First check if a specific not local nodeId is given ClusterNode clusterNode = getClusterNode(destination.getDestination().getNodeId()); if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() + " trying node " + ((clusterNode==null)?"null":clusterNode.getId()) + " isNodeIdExplicitlyGiven=" + destination.getDestination().isNodeIdExplicitlyGiven()); if (clusterNode != null && clusterNode.isLocalNode()) { if (destination.getDestination().isNodeIdExplicitlyGiven()) { if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() + " destination cluster node reached"); return null; // handle locally } } if (clusterNode != null && destination.getDestination().isNodeIdExplicitlyGiven()) { if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() + " remote destination cluster node found"); } else { // Ask the plugin NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit, destination); if (nodeDomainInfo == null) return null; clusterNode = nodeDomainInfo.getClusterNode(); } if (clusterNode.isLocalNode()) return null; I_XmlBlasterAccess con = clusterNode.getXmlBlasterAccess(); if (con == null) { String text = "Cluster node '" + destination.getDestination() + "' is known but not reachable, message '" + msgUnit.getLogId() + "' is lost"; log.warning(text); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_CLUSTER_NOTAVAILABLE, ME, text); } if (log.isLoggable(Level.FINE)) log.fine("PtP message '" + msgUnit.getLogId() + "' destination " + destination.getDestination() + " is now forwarded to node " + clusterNode.getId()); // To be on the save side we clone the message return con.publish(msgUnit.getClone()); } /** * @return null if no forwarding is done, if we are the master of this message ourself<br /> * <pre><qos><state id='OK' info='QUEUED[bilbo]'/></qos></pre> if message is * tailed back because cluster node is temporary not available. The message will * be flushed on reconnect.<br /> * Otherwise the normal publish return value of the remote cluster node and the responsible * NodeDomainInfo instance. * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller */ public PublishRetQosWrapper forwardPublish(SessionInfo publisherSession, MsgUnit msgUnit) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardPublish(" + msgUnit.getLogId() + ")"); NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit); if (nodeDomainInfo == null) return null; I_XmlBlasterAccess con = nodeDomainInfo.getClusterNode().getXmlBlasterAccess(); if (con == null) return null; QosData publishQos = msgUnit.getQosData(); if (nodeDomainInfo.getDirtyRead() == true) { // mark QoS of published message that we dirty read the message: RouteInfo[] ris = publishQos.getRouteNodes(); if (ris == null || ris.length < 1) { log.severe("The route info for '" + msgUnit.getLogId() + "' is missing"); Thread.dumpStack(); } else { ris[ris.length-1].setDirtyRead(true); } } // Set the new qos ... MsgUnit msgUnitShallowClone = new MsgUnit(msgUnit, null, null, publishQos); return new PublishRetQosWrapper(nodeDomainInfo, con.publish(msgUnitShallowClone)); } /** * @return null if no forwarding is done, if we are the master of this message ourself<br /> * <pre><qos><state id='OK' info='QUEUED[bilbo]'/></qos></pre> if message is * tailed back because cluster node is temporary not available. The message will * be flushed on reconnect.<br /> * Otherwise the normal subscribe return value of the remote cluster node. * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller */ public SubscribeReturnQos forwardSubscribe(SessionInfo publisherSession, QueryKeyData xmlKey, SubscribeQosServer subscribeQos) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardSubscribe(" + xmlKey.getOid() + ")"); MsgUnit msgUnit = new MsgUnit(xmlKey, (byte[])null, subscribeQos.getData()); NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit); if (nodeDomainInfo == null) return null; I_XmlBlasterAccess con = nodeDomainInfo.getClusterNode().getXmlBlasterAccess(); if (con == null) { if (log.isLoggable(Level.FINE)) log.fine("forwardSubscribe - Nothing to forward"); return null; } SubscribeQos subscribeQos2 = new SubscribeQos(this.glob, subscribeQos.getData()); // The cluster master needs to accept our "__subId:heron-3456646466" ClientProperty clientProperty = subscribeQos2.getClientProperty(Constants.PERSISTENCE_ID); if (clientProperty != null) { // remove marker that this is from persistent store, the other node would react wrong subscribeQos2 = new SubscribeQos(this.glob, (QueryQosData)subscribeQos.getData().clone()); subscribeQos2.getData().getClientProperties().remove(Constants.PERSISTENCE_ID); } // As we forward many subscribes probably accessing the // same message but only want one update. // We cache this update and distribute to all our clients // TODO: As an unSubscribe() deletes all subscribes() at once // we have not yet activated the new desired use of multiSubscribe // We need to add some sort of subscribe reference counting // preferably in the server implementation (see RequestBroker.java) // TODO: As soon we have implemented it here we need to remove // data.setDuplicateUpdates(false); in NodeInfo.java //subscribeQos2.setMultiSubscribe(false); return con.subscribe(new SubscribeKey(this.glob, xmlKey), subscribeQos2); } /** * @return null if no forwarding is done, if we are the master of this message ourself<br /> * <pre><qos><state id='OK' info='QUEUED[bilbo]'/></qos></pre> if message is * tailed back because cluster node is temporary not available. The message will * be flushed on reconnect.<br /> * Otherwise the normal unSubscribe return value of the remote cluster node. * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller */ public UnSubscribeReturnQos[] forwardUnSubscribe(SessionInfo publisherSession, QueryKeyData xmlKey, UnSubscribeQosServer unSubscribeQos) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardUnSubscribe(" + xmlKey.getOid() + ")"); MsgUnit msgUnit = new MsgUnit(xmlKey, (byte[])null, unSubscribeQos.getData()); NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit); if (nodeDomainInfo == null) return null; I_XmlBlasterAccess con = nodeDomainInfo.getClusterNode().getXmlBlasterAccess(); if (con == null) { if (log.isLoggable(Level.FINE)) log.fine("forwardUnSubscribe - Nothing to forward"); return null; } return con.unSubscribe(new UnSubscribeKey(this.glob, xmlKey), new UnSubscribeQos(this.glob, unSubscribeQos.getData())); } /** * @return null if no forwarding is done, if we are the master of this message ourself<br /> * msgUnit.length==0 if message is * tailed back because cluster node is temporary not available. The command will * be flushed on reconnect.<br /> * Otherwise the normal get return value of the remote cluster node. * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller */ public MsgUnit[] forwardGet(SessionInfo publisherSession, QueryKeyData xmlKey, GetQosServer getQos) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardGet(" + xmlKey.getOid() + ")"); MsgUnit msgUnit = new MsgUnit(xmlKey, new byte[0], getQos.getData()); NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit); if (nodeDomainInfo == null) return null; I_XmlBlasterAccess con = nodeDomainInfo.getClusterNode().getXmlBlasterAccess(); if (con == null) { if (log.isLoggable(Level.FINE)) log.fine("forwardGet - Nothing to forward"); return null; } return con.get(new GetKey(glob, xmlKey), new GetQos(glob, getQos.getData())); } /** * @return null if no forwarding is done, if we are the master of this message ourself<br /> * <pre><qos><state id='OK' info='QUEUED[bilbo]/></qos></pre> if message is * tailed back because cluster node is temporary not available. The command will * be flushed on reconnect.<br /> * Otherwise the normal erase return value of the remote cluster node. * @exception XmlBlasterException and RuntimeExceptions are just forwarded to the caller */ public EraseReturnQos[] forwardErase(SessionInfo publisherSession, QueryKeyData xmlKey, EraseQosServer eraseQos) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering forwardErase(" + xmlKey.getOid() + ")"); MsgUnit msgUnit = new MsgUnit(xmlKey, new byte[0], eraseQos.getData()); NodeDomainInfo nodeDomainInfo = getConnection(publisherSession, msgUnit); if (nodeDomainInfo == null) return null; I_XmlBlasterAccess con = nodeDomainInfo.getClusterNode().getXmlBlasterAccess(); if (con == null) { if (log.isLoggable(Level.FINE)) log.fine("forwardErase - Nothing to forward"); return null; } return con.erase(new EraseKey(glob, xmlKey), new EraseQos(glob, eraseQos.getData())); } /** * Add a new node info object or overwrite an existing one. * @param The ClusterNode instance * @exception IllegalArgumentException */ public final void addClusterNode(ClusterNode clusterNode) { if (clusterNode == null || clusterNode.getNodeId() == null) { Thread.dumpStack(); log.severe("Illegal argument in addClusterNode()"); throw new IllegalArgumentException("Illegal argument in addClusterNode()"); } this.clusterNodesCache = null; // reset cache this.clusterNodeMap.put(clusterNode.getId(), clusterNode); } /** * Return the map containing all known cluster nodes. * @return never null, map contains ClusterNode objects, please treat as read only. */ public Map getClusterNodeMap() { return this.clusterNodeMap; } /** * @return ClusterNode[] which is a snapshot copy of our map */ public ClusterNode[] getClusterNodes() { if (this.clusterNodesCache == null) { if (this.clusterNodeMap == null) { this.clusterNodesCache = new ClusterNode[0]; } else { this.clusterNodesCache = (ClusterNode[])this.clusterNodeMap.values().toArray(new ClusterNode[this.clusterNodeMap.size()]); } } return this.clusterNodesCache; } public int getNumNodes() { if (this.clusterNodeMap == null) return 1; // The caller is a single node return this.clusterNodeMap.size(); } /** * Access a list of known cluster nodes e.g. "heron,avalon,bilbo,frodo" * @return If cluster is switched off just our node */ public final String getNodeList() { int numNodes = getNumNodes(); if (numNodes <= 1) return glob.getId(); StringBuffer sb = new StringBuffer(numNodes * 30); ClusterNode[] clusterNodes = getClusterNodes(); for(int i=0; i<clusterNodes.length; i++) { if (sb.length() > 0) sb.append(","); sb.append(clusterNodes[i].getId()); } return sb.toString(); } /** * Access a list of known cluster nodes e.g. "heron","avalon","bilbo","frodo" * @return If cluster is switched off just our node */ public final String[] getNodes() { ClusterNode[] clusterNodes = getClusterNodes(); if (clusterNodes == null || clusterNodes.length == 0) { return new String[0]; } String[] nodes = new String[clusterNodes.length]; for(int i=0; i<clusterNodes.length; i++) { nodes[i] = clusterNodes[i].getId(); } return nodes; } /** * Access the informations belonging to a node id * @return The ClusterNode instance or null if unknown */ public final ClusterNode getClusterNode(NodeId nodeId) { return getClusterNode(nodeId.getId()); } /** * Access the informations belonging to a node id * @param The cluster node id as a string * @return The ClusterNode instance or null if unknown */ public final ClusterNode getClusterNode(String id) { if (this.clusterNodeMap == null) return null; return (ClusterNode)this.clusterNodeMap.get(id); } /* public final void addConnection(NodeId nodeId, I_XmlBlasterAccess connection) throws XmlBlasterException { ClusterNode info = getClusterNode(nodeId); if (info == null) throw new XmlBlasterException(ME, "Unknown node id = " + nodeId.toString() + ", can't add xmlBlasterConnection"); info.setI_XmlBlasterAccess(connection); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -