📄 scribeimpl.java
字号:
// if message is for a non-existant topic, drop it on the floor (we are the root, after all) // otherwise, turn it into a publish message, and forward it on if (manager == null) { Logger.log(endpoint.getId() + ": Received publish request message for non-existent topic " + prMessage.getTopic() + " - dropping on floor.",Logger.EVENT_LOG); } else { deliver(prMessage.getTopic().getId(), new PublishMessage(prMessage.getSource(), prMessage.getTopic(), prMessage.getContent())); } } else if (message instanceof PublishMessage) { PublishMessage pMessage = (PublishMessage) message; TopicManager manager = (TopicManager) topics.get(pMessage.getTopic()); Logger.log(endpoint.getId() + ": Received publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic(),Logger.EVENT_LOG); // if we don't know about this topic, send an unsubscribe message // otherwise, we deliver the message to all clients and forward the // message to all children if (manager != null) { pMessage.setSource(handle); ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { Logger.log(endpoint.getId() + ": Delivering publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic() + " to client " + clients[i],Logger.EVENT_LOG); clients[i].deliver(pMessage.getTopic(), pMessage.getContent()); } NodeHandle[] handles = manager.getChildren(); for (int i = 0; i < handles.length; i++) { Logger.log(endpoint.getId() + ": Forwarding publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic() + " to child " + handles[i],Logger.EVENT_LOG); endpoint.route(handles[i].getId(), pMessage, handles[i]); } } else { Logger.log(endpoint.getId() + ": Received unexpected publish message from " + pMessage.getSource() + " for unknown topic " + pMessage.getTopic(),Logger.EVENT_LOG); endpoint.route(pMessage.getSource().getId(), new UnsubscribeMessage(handle, pMessage.getTopic()), pMessage.getSource()); } } else if (message instanceof UnsubscribeMessage) { UnsubscribeMessage uMessage = (UnsubscribeMessage) message; Logger.log(endpoint.getId() + ": Received unsubscribe message from " + uMessage.getSource() + " for topic " + uMessage.getTopic(),Logger.EVENT_LOG); removeChild(uMessage.getTopic(), uMessage.getSource(), false); } else if (message instanceof DropMessage) { DropMessage dMessage = (DropMessage) message; Logger.log(endpoint.getId() + ": Received drop message from " + dMessage.getSource() + " for topic " + dMessage.getTopic(),Logger.EVENT_LOG); TopicManager manager = (TopicManager) topics.get(dMessage.getTopic()); if (manager != null) { if ((manager.getParent() != null) && manager.getParent().equals(dMessage.getSource())) { // we set the parent to be null, and then send out another subscribe message manager.setParent(null); ScribeClient[] clients = manager.getClients(); if (clients.length > 0) sendSubscribe(dMessage.getTopic(), clients[0], null); else sendSubscribe(dMessage.getTopic(), null, null); } else { Logger.log(endpoint.getId() + ": Received unexpected drop message from non-parent " + dMessage.getSource() + " for topic " + dMessage.getTopic() + " - ignoring",Logger.EVENT_LOG); } } else { Logger.log(endpoint.getId() + ": Received unexpected drop message from " + dMessage.getSource() + " for unknown topic " + dMessage.getTopic() + " - ignoring",Logger.EVENT_LOG); } } else if (message instanceof ReplicaSetMessage) { ReplicaSetMessage mrs = (ReplicaSetMessage) message; if (mrs.isRequest()){ int max = mrs.getMaxRank(); Id sourceId = mrs.getMessageKey(); mrs = new ReplicaSetMessage(endpoint.getId(),endpoint.replicaSet(endpoint.getId(),max)); try { endpoint.route(endpoint.getId(),mrs,GenericFactory.buildNodeHandle(sourceId,true)); } catch (InitializationException e) { e.printStackTrace(); } } else { mrs.getReplicaSet(); } } else { Logger.log(endpoint.getId() + ": Received unknown message " + message + " - dropping on floor.",Logger.EVENT_LOG); } } /** * This method is invoked to inform the application that the given node has either joined or left * the neighbor set of the local node, as the set would be returned by the neighborSet call. * * @param handle The handle that has joined/left * @param joined Whether the node has joined or left */ public void update(NodeHandle handle, boolean joined) { Set set = topics.keySet(); Iterator e = set.iterator(); TopicManager manager; Topic topic; while(e.hasNext()){ topic = (Topic)e.next(); manager = (TopicManager)topics.get(topic); if (joined){ // check if new guy is root, we were old root, then subscribe if (manager.getParent() == null){ // send subscribe message sendSubscribe(topic, null, null); } } else { if (isRoot(topic) && (manager.getParent() != null)) { endpoint.route(manager.getParent().getId(), new UnsubscribeMessage(handle, topic), manager.getParent()); manager.setParent(null); } } } } /** * Sets the name for this applicaton. * @param applicationName Name for this application. * @return The same instance, once it has been updated. */ public Application setValues(String applicationName) { this.appId = applicationName; return this; } /** * Class which keeps track of a given topic * * @version $Id: ScribeImpl.java,v 1.14 2003/10/22 03:16:40 amislove Exp $ * @author amislove */ public class TopicManager implements Observer { /** * DESCRIBE THE FIELD */ protected Topic topic; /** * The current path to the root for this node */ protected Id[] pathToRoot; /** * DESCRIBE THE FIELD */ protected Vector clients; /** * DESCRIBE THE FIELD */ protected Vector children; /** * DESCRIBE THE FIELD */ protected NodeHandle parent; /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER * @param client DESCRIBE THE PARAMETER */ public TopicManager(Topic topic, ScribeClient client) { this(topic); addClient(client); } /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER * @param child DESCRIBE THE PARAMETER */ public TopicManager(Topic topic, NodeHandle child) { this(topic); addChild(child); } /** * Constructor for TopicManager. * * @param topic DESCRIBE THE PARAMETER */ protected TopicManager(Topic topic) { this.topic = topic; this.clients = new Vector(); this.children = new Vector(); setPathToRoot(new Id[0]); } /** * Gets the Parent attribute of the TopicManager object * * @return The Parent value */ public NodeHandle getParent() { return parent; } /** * Gets the Clients attribute of the TopicManager object * * @return The Clients value */ public ScribeClient[] getClients() { return (ScribeClient[]) clients.toArray(new ScribeClient[0]); } /** * Gets the Children attribute of the TopicManager object * * @return The Children value */ public NodeHandle[] getChildren() { return (NodeHandle[]) children.toArray(new NodeHandle[0]); } /** * Gets the PathToRoot attribute of the TopicManager object * * @return The PathToRoot value */ public Id[] getPathToRoot() { return pathToRoot; } /** * Sets the PathToRoot attribute of the TopicManager object * * @param pathToRoot The new PathToRoot value */ public void setPathToRoot(Id[] pathToRoot) { // build the path to the root for the new node this.pathToRoot = new Id[pathToRoot.length + 1]; System.arraycopy(pathToRoot, 0, this.pathToRoot, 0, pathToRoot.length); this.pathToRoot[pathToRoot.length] = endpoint.getId(); // now send the information out to our children NodeHandle[] children = getChildren(); for (int i=0; i<children.length; i++) { if (Arrays.asList(this.pathToRoot).contains(children[i].getId())) { endpoint.route(children[i].getId(), new DropMessage(handle, topic), children[i]); removeChild(children[i]); } else { endpoint.route(children[i].getId(), new SubscribeAckMessage(handle, topic, getPathToRoot(), Integer.MAX_VALUE), children[i]); } } } /** * Sets the Parent attribute of the TopicManager object * * @param handle The new Parent value */ public void setParent(NodeHandle handle) { if ((handle != null) && (parent != null)) { Logger.log(endpoint.getId() + ": Unexpectedly changing parents for topic " + topic,Logger.EVENT_LOG); } if (parent != null) { parent.deleteObserver(this); } parent = handle; setPathToRoot(new Id[0]); if ((parent != null) && parent.isAlive()) { parent.addObserver(this); } } /** * DESCRIBE THE METHOD * * @param o DESCRIBE THE PARAMETER * @param arg DESCRIBE THE PARAMETER */ public void update(Observable o, Object arg) { if (arg.equals(NodeHandle.DECLARED_DEAD)) { if (children.contains(o)) { Logger.log(endpoint.getId() + ": Child " + o + " for topic " + topic + " has died - removing.",Logger.EVENT_LOG); ScribeImpl.this.removeChild(topic, (NodeHandle) o); } else if (o.equals(parent)) { // if our parent has died, then we must resubscribe to the topic Logger.log(endpoint.getId() + ": Parent " + parent + " for topic " + topic + " has died - resubscribing.",Logger.EVENT_LOG); setParent(null); if (clients.size() > 0) sendSubscribe(topic, (ScribeClient) clients.elementAt(0), null, ((NodeHandle) o).getId()); else sendSubscribe(topic, null, null, ((NodeHandle) o).getId()); } else { Logger.log(endpoint.getId() + ": Received unexpected update from " + o,Logger.EVENT_LOG); o.deleteObserver(this); } } } /** * Adds a feature to the Client attribute of the TopicManager object * * @param client The feature to be added to the Client attribute */ public void addClient(ScribeClient client) { if (!clients.contains(client)) { clients.add(client); } } /** * DESCRIBE THE METHOD * * @param client DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public boolean removeClient(ScribeClient client) { clients.remove(client); boolean unsub = ((clients.size() == 0) && (children.size() == 0)); // if we're going to unsubscribe, then we remove ourself as // as observer if (unsub && (parent != null)) { parent.deleteObserver(this); } return unsub; } /** * Adds a feature to the Child attribute of the TopicManager object * * @param child The feature to be added to the Child attribute */ public void addChild(NodeHandle child) { if ((!children.contains(child)) && child.isAlive()) { children.add(child); child.addObserver(this); } } /** * DESCRIBE THE METHOD * * @param child DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public boolean removeChild(NodeHandle child) { children.remove(child); child.deleteObserver(this); boolean unsub = ((clients.size() == 0) && (children.size() == 0)); // if we're going to unsubscribe, then we remove ourself as // as observer if (unsub && (parent != null)) { parent.deleteObserver(this); } return unsub; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -