📄 scribeimpl.java
字号:
NodeHandle parent = manager.getParent(); if (parent != null) { endpoint.route(parent.getId(), new UnsubscribeMessage(handle, topic), parent); } } } else { Logger.log(endpoint.getId() + ": Attempt to unsubscribe client " + client + " from unknown topic " + topic,Logger.EVENT_LOG); } } /** * Publishes the given message to the topic. * * @param topic The topic to publish to * @param content The content to publish */ public void publish(Topic topic, ScribeContent content) { Logger.log(endpoint.getId() + ": Publishing content " + content + " to topic " + topic,Logger.EVENT_LOG); endpoint.route(topic.getId(), new PublishRequestMessage(handle, topic, content), null); } /** * Anycasts the given content to a member of the given topic * * @param topic The topic to anycast to * @param content The content to anycast */ public void anycast(Topic topic, ScribeContent content) { Logger.log(endpoint.getId() + ": Anycasting content " + content + " to topic " + topic,Logger.EVENT_LOG); endpoint.route(topic.getId(), new AnycastMessage(handle, topic, content), null); } /** * Adds a child to the given topic * * @param topic The topic to add the child to * @param child The child to add */ public void addChild(Topic topic, NodeHandle child) { Logger.log(endpoint.getId() + ": Adding child " + child + " to topic " + topic,Logger.EVENT_LOG); TopicManager manager = (TopicManager) topics.get(topic); // if we don't know about the topic, we subscribe, otherwise, // we simply add the child to the list if (manager == null) { manager = new TopicManager(topic, child); topics.put(topic, manager); Logger.log(endpoint.getId() + ": Implicitly subscribing to topic " + topic,Logger.EVENT_LOG); sendSubscribe(topic, null, null); } else { manager.addChild(child); } // we send a confirmation back to the child endpoint.route(child.getId(), new SubscribeAckMessage(handle, topic, manager.getPathToRoot(), Integer.MAX_VALUE), child); // and lastly notify all of the clients ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { clients[i].childAdded(topic, child); } } /** * Removes a child from the given topic * * @param topic The topic to remove the child from * @param child The child to remove */ public void removeChild(Topic topic, NodeHandle child) { removeChild(topic, child, true); } /** * Removes a child from the given topic * * @param topic The topic to remove the child from * @param child The child to remove * @param sendDrop Whether or not to send a drop message to the chil */ protected void removeChild(Topic topic, NodeHandle child, boolean sendDrop) { Logger.log(endpoint.getId() + ": Removing child " + child + " from topic " + topic,Logger.EVENT_LOG); if (topics.get(topic) != null) { TopicManager manager = (TopicManager) topics.get(topic); // if this is the last child and there are no clients, then // we unsubscribe, if we are not the root if (manager.removeChild(child)) { topics.remove(topic); NodeHandle parent = manager.getParent(); Logger.log(endpoint.getId() + ": We no longer need topic " + topic + " - unsubscribing from parent " + parent,Logger.EVENT_LOG); if (parent != null) { endpoint.route(parent.getId(), new UnsubscribeMessage(handle, topic), parent); } } if ((sendDrop) && (child.isAlive())) { Logger.log(endpoint.getId() + ": Informing child " + child + " that he has been dropped from topic " + topic,Logger.EVENT_LOG); // now, we tell the child that he has been dropped endpoint.route(child.getId(), new DropMessage(handle, topic), child); } // and lastly notify all of the clients ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { clients[i].childRemoved(topic, child); } } else { Logger.log(endpoint.getId() + ": Unexpected attempt to remove child " + child + " from unknown topic " + topic,Logger.EVENT_LOG); } } // ----- COMMON API METHODS ----- /** * This method is invoked on applications when the underlying node is about to forward the given * message with the provided target to the specified next hop. Applications can change the * contents of the message, specify a different nextHop (through re-routing), or completely * terminate the message. * * @param message The message being sent, containing an internal message along with a destination * key and nodeHandle next hop. * @return Whether or not to forward the message further */ public boolean forward(final Message message) { //System.out.println(endpoint.getId()+" forward "+message); Logger.log(endpoint.getId() + ": Forward called with " + message,Logger.EVENT_LOG); if (message instanceof AnycastMessage) { AnycastMessage aMessage = (AnycastMessage) message; // get the topic manager associated with this topic TopicManager manager = (TopicManager) topics.get(aMessage.getTopic()); // if it's a subscribe message, we must handle it differently if (message instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) message; // if this is our own subscribe message, ignore it if (sMessage.getSource().getId().equals(endpoint.getId())) { return true; } if (manager != null) { // first, we have to make sure that we don't create a loop, which would occur // if the subcribing node's previous parent is on our path to the root Id previousParent = sMessage.getPreviousParent(); List path = Arrays.asList(manager.getPathToRoot()); if (path.contains(previousParent)) { Logger.log(endpoint.getId() + ": Rejecting subscribe message from " + // System.out.println(endpoint.getId() + ": Rejecting subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic() + " because we are on the subscriber's path to the root.",Logger.EVENT_LOG); return true; } } ScribeClient[] clients = new ScribeClient[0]; NodeHandle[] handles = new NodeHandle[0]; if (manager != null) { clients = manager.getClients(); handles = manager.getChildren(); } // check if child is already there if (Arrays.asList(handles).contains(sMessage.getSubscriber())){ return false; } // see if the policy will allow us to take on this child if (policy.allowSubscribe(sMessage, clients, handles)) { Logger.log(endpoint.getId() + ": Hijacking subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic(),Logger.EVENT_LOG); // System.out.println(endpoint.getId() + ": Hijacking subscribe message from " + // sMessage.getSubscriber().getId() + " for topic " + sMessage.getTopic()); // if so, add the child addChild(sMessage.getTopic(), sMessage.getSubscriber()); return false; } // otherwise, we are effectively rejecting the child Logger.log(endpoint.getId() + ": Rejecting subscribe message from " + //System.out.println(endpoint.getId() + ": Rejecting subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic(),Logger.EVENT_LOG); // if we are not associated with this topic at all, we simply let the subscribe go // closer to the root if (manager == null) { return true; } } else { // if we are not associated with this topic at all, let the // anycast continue if (manager == null) { return true; } ScribeClient[] clients = manager.getClients(); // see if one of our clients will accept the anycast for (int i = 0; i < clients.length; i++) { if (clients[i].anycast(aMessage.getTopic(), aMessage.getContent())) { Logger.log(endpoint.getId() + ": Accepting anycast message from " + aMessage.getSource() + " for topic " + aMessage.getTopic(),Logger.EVENT_LOG); return false; } } // if we are the orginator for this anycast and it already has a destination, // we let it go ahead if (aMessage.getSource().getId().equals(endpoint.getId()) && ( aMessage.getNext() != null) && (! handle.equals(aMessage.getNext()))) { //&& ( message.getNextHopHandle() != null) && (! handle.equals(message.getNextHopHandle()))) return true; } Logger.log(endpoint.getId() + ": Rejecting anycast message from " + aMessage.getSource() + " for topic " + aMessage.getTopic(),Logger.EVENT_LOG); } // add the local node to the visited list aMessage.addVisited(endpoint.getLocalNodeHandle()); // allow the policy to select the order in which the nodes are visited policy.directAnycast(aMessage, manager.getParent(), manager.getChildren()); // reset the source of the message to be us aMessage.setSource(endpoint.getLocalNodeHandle()); // get the next hop NodeHandle handle = aMessage.getNext(); // make sure that the next node is alive while ((handle != null) && (!handle.isAlive())) { handle = aMessage.getNext(); } Logger.log(endpoint.getId() + ": Forwarding anycast message for topic " + aMessage.getTopic() + "on to " + handle,Logger.EVENT_LOG); if (handle == null) { Logger.log(endpoint.getId() + ": Anycast " + aMessage + " failed.",Logger.EVENT_LOG); // if it's a subscribe message, send a subscribe failed message back // as a courtesy if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) aMessage; Logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage to " + sMessage.getSubscriber(),Logger.EVENT_LOG); endpoint.route(sMessage.getSubscriber().getId(), new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()), sMessage.getSubscriber()); } } else { endpoint.route(handle.getId(), aMessage, handle); } return false; } //else //System.out.println(endpoint.getId()+" forward "+message); return true; } /** * This method is called on the application at the destination node for the given id. * * @param id The destination id of the message * @param message The message being sent */ public void deliver(Id id, Message message) { //System.out.println(endpoint.getId()+" deliver "+message); Logger.log(endpoint.getId() + ": Deliver called with " + id + " " + message,Logger.EVENT_LOG); if (message instanceof AnycastMessage) { AnycastMessage aMessage = (AnycastMessage) message; // if we are the recipient to someone else's subscribe, then we should have processed // this message in the forward() method. // Otherwise, we received our own subscribe message, which means that we are // the root if (aMessage.getSource().getId().equals(endpoint.getId())) { if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) message; // System.out.println(endpoint.getId()+" deliver "+sMessage); outstanding.remove(new Integer(sMessage.getId())); Logger.log(endpoint.getId() + ": Received our own subscribe message " + aMessage + " for topic " + aMessage.getTopic() + " - we are the root.",Logger.EVENT_LOG); } else { Logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " + aMessage.getTopic() + " - was generated by us.",Logger.EVENT_LOG); } } else { // here, we have had a subscribe message delivered, which means that we are the root, but // our policy says that we cannot take on this child if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) aMessage; Logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage (at root) to " + sMessage.getSubscriber(),Logger.EVENT_LOG); endpoint.route(sMessage.getSubscriber().getId(), new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()), sMessage.getSubscriber()); } else { Logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " + aMessage.getTopic() + " - not generated by us, but was expected to be.",Logger.EVENT_LOG); } } } else if (message instanceof SubscribeAckMessage) { //System.out.println(endpoint.getId()+" deliver "+message); SubscribeAckMessage saMessage = (SubscribeAckMessage) message; TopicManager manager = (TopicManager) topics.get(saMessage.getTopic()); ackMessageReceived(saMessage); Logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG); if (! saMessage.getSource().isAlive()) { Logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG); (new Exception()).printStackTrace(); System.exit(-1); } // if we're the root, reject the ack message if (isRoot(saMessage.getTopic())) { Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (we are the root) from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG); endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource()); } else { // if we don't know about this topic, then we unsubscribe // if we already have a parent, then this is either an errorous // subscribe ack, or our path to the root has changed. if (manager != null) { if (manager.getParent() == null) { manager.setParent(saMessage.getSource()); } if (manager.getParent().equals(saMessage.getSource())) { manager.setPathToRoot(saMessage.getPathToRoot()); } else { Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (already have parent " + manager.getParent() + ") from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG); endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource()); } } else { Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message from " + saMessage.getSource() + " for unknown topic " + saMessage.getTopic(),Logger.EVENT_LOG); endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource()); } } } else if (message instanceof SubscribeLostMessage) { SubscribeLostMessage slMessage = (SubscribeLostMessage) message; lostMessageReceived(slMessage); } else if (message instanceof SubscribeFailedMessage) { SubscribeFailedMessage sfMessage = (SubscribeFailedMessage) message; failedMessageReceived(sfMessage); } else if (message instanceof PublishRequestMessage) { PublishRequestMessage prMessage = (PublishRequestMessage) message; TopicManager manager = (TopicManager) topics.get(prMessage.getTopic()); Logger.log(endpoint.getId() + ": Received publish request message with data " + prMessage.getContent() + " for topic " + prMessage.getTopic(),Logger.EVENT_LOG);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -