📄 scribeimpl.java
字号:
TopicManager manager = (TopicManager) i.next(); NodeHandle parent = manager.getParent(); // also send a reverse heartbeat message, which should make sure we are still subscribed if (parent != null) { endpoint.route(manager.getTopic().getId(), new SubscribeMessage(handle, manager.getTopic(), handle.getId(), -1, null), parent); parent.checkLiveness(); } } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unknown message " + message + " - dropping on floor."); } } } /** * This method is invoked to inform the application that the given node has * either joined or left the neighbor set of the local node, as the set would * be returned by the neighborSet call. * * @param handle The handle that has joined/left * @param joined Whether the node has joined or left */ public void update(NodeHandle handle, boolean joined) { Set set = topics.keySet(); Iterator e; synchronized (set) { e = new ArrayList(set).iterator(); } TopicManager manager; Topic topic; while (e.hasNext()) { topic = (Topic) e.next(); manager = (TopicManager) topics.get(topic); if (joined) { // check if new guy is root, we were old root, then subscribe if (manager.getParent() == null) { // send subscribe message sendSubscribe(topic, null, null); } } else { if (isRoot(topic) && (manager.getParent() != null)) { endpoint.route(null, new UnsubscribeMessage(handle, topic), manager.getParent()); manager.setParent(null); } } } } /** * DESCRIBE THE METHOD */ public void destroy() { Iterator topicIter = topics.values().iterator(); while (topicIter.hasNext()) { TopicManager topicManager = (TopicManager) topicIter.next(); topicManager.destroy(); } } /** * Class which keeps track of a given topic * * @version $Id: ScribeImpl.java 3274 2006-05-15 16:17:47Z jeffh $ * @author amislove */ public class TopicManager implements Observer, Destructable { /** * DESCRIBE THE FIELD */ protected Topic topic; /** * The current path to the root for this node */ protected Id[] pathToRoot; /** * DESCRIBE THE FIELD */ protected Vector clients; /** * DESCRIBE THE FIELD */ protected Vector children; /** * DESCRIBE THE FIELD */ protected NodeHandle parent; /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER * @param client DESCRIBE THE PARAMETER */ public TopicManager(Topic topic, ScribeClient client) { this(topic); addClient(client); } /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER * @param child DESCRIBE THE PARAMETER */ public TopicManager(Topic topic, NodeHandle child) { this(topic); addChild(child); } /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER */ protected TopicManager(Topic topic) { this.topic = topic; this.clients = new Vector(); this.children = new Vector(); setPathToRoot(new Id[0]); } /** * Gets the topic of the TopicManager object * * @return The Parent value */ public Topic getTopic() { return topic; } /** * Gets the Parent attribute of the TopicManager object * * @return The Parent value */ public NodeHandle getParent() { return parent; } /** * Gets the Clients attribute of the TopicManager object * * @return The Clients value */ public ScribeClient[] getClients() { return (ScribeClient[]) clients.toArray(new ScribeClient[0]); } /** * Gets the Children attribute of the TopicManager object * * @return The Children value */ public NodeHandle[] getChildren() { return (NodeHandle[]) children.toArray(new NodeHandle[0]); } /** * Gets the PathToRoot attribute of the TopicManager object * * @return The PathToRoot value */ public Id[] getPathToRoot() { return pathToRoot; } /** * Sets the PathToRoot attribute of the TopicManager object * * @param pathToRoot The new PathToRoot value */ public void setPathToRoot(Id[] pathToRoot) { // build the path to the root for the new node this.pathToRoot = new Id[pathToRoot.length + 1]; System.arraycopy(pathToRoot, 0, this.pathToRoot, 0, pathToRoot.length); this.pathToRoot[pathToRoot.length] = endpoint.getId(); // now send the information out to our children NodeHandle[] children = getChildren(); for (int i = 0; i < children.length; i++) { if (Arrays.asList(this.pathToRoot).contains(children[i].getId())) { endpoint.route(null, new DropMessage(handle, topic), children[i]); removeChild(children[i]); } else { endpoint.route(null, new SubscribeAckMessage(handle, topic, getPathToRoot(), Integer.MAX_VALUE), children[i]); } } } /** * Sets the Parent attribute of the TopicManager object * * @param handle The new Parent value */ public void setParent(NodeHandle handle) { if ((handle != null) && (parent != null)) { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Unexpectedly changing parents for topic " + topic); } } if (parent != null) { parent.deleteObserver(this); } parent = handle; setPathToRoot(new Id[0]); if ((parent != null) && parent.isAlive()) { parent.addObserver(this); } } /** * Returns whether or not this topic manager contains the given client. * * @param client The client in question * @return Whether or not this manager contains the client */ public boolean containsClient(ScribeClient client) { return clients.contains(client); } /** * DESCRIBE THE METHOD * * @param o DESCRIBE THE PARAMETER * @param arg DESCRIBE THE PARAMETER */ public void update(Observable o, Object arg) { if (arg.equals(NodeHandle.DECLARED_DEAD)) { if (children.contains(o)) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Child " + o + " for topic " + topic + " has died - removing."); } ScribeImpl.this.removeChild(topic, (NodeHandle) o); } if (o.equals(parent)) { // if our parent has died, then we must resubscribe to the topic if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Parent " + parent + " for topic " + topic + " has died - resubscribing."); } setParent(null); if (clients.size() > 0) { sendSubscribe(topic, (ScribeClient) clients.elementAt(0), null, ((NodeHandle) o).getId()); } else { sendSubscribe(topic, null, null, ((NodeHandle) o).getId()); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected update from " + o); } o.deleteObserver(this); } } } /** * Adds a feature to the Client attribute of the TopicManager object * * @param client The feature to be added to the Client attribute */ public void addClient(ScribeClient client) { if (!clients.contains(client)) { clients.add(client); } } /** * DESCRIBE THE METHOD * * @param client DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public boolean removeClient(ScribeClient client) { clients.remove(client); boolean unsub = ((clients.size() == 0) && (children.size() == 0)); // if we're going to unsubscribe, then we remove ourself as // as observer if (unsub && (parent != null)) { parent.deleteObserver(this); } return unsub; } /** * Adds a feature to the Child attribute of the TopicManager object * * @param child The feature to be added to the Child attribute */ public void addChild(NodeHandle child) { if ((!children.contains(child)) && child.isAlive()) { children.add(child); child.addObserver(this); } } /** * DESCRIBE THE METHOD * * @param child DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public boolean removeChild(NodeHandle child) { children.remove(child); child.deleteObserver(this); boolean unsub = ((clients.size() == 0) && (children.size() == 0)); // if we're going to unsubscribe, then we remove ourself as // as observer if (unsub && (parent != null)) { parent.deleteObserver(this); } return unsub; } /** * DESCRIBE THE METHOD */ public void destroy() { if (parent != null) { parent.deleteObserver(this); } Iterator i = children.iterator(); while (i.hasNext()) { NodeHandle child = (NodeHandle) i.next(); child.deleteObserver(this); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -