📄 scribeimpl.java
字号:
if (path.contains(previousParent)) { if (logger.level <= Logger.INFO) { logger.log(endpoint.getId() + ": Rejecting subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic() + " because we are on the subscriber's path to the root."); } return true; } } ScribeClient[] clients = new ScribeClient[0]; NodeHandle[] handles = new NodeHandle[0]; if (manager != null) { clients = manager.getClients(); handles = manager.getChildren(); } // check if child is already there if (Arrays.asList(handles).contains(sMessage.getSubscriber())) { return false; } // see if the policy will allow us to take on this child if (policy.allowSubscribe(sMessage, clients, handles)) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Hijacking subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic()); } // if so, add the child addChild(sMessage.getTopic(), sMessage.getSubscriber(), sMessage.getId()); return false; } // otherwise, we are effectively rejecting the child if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Rejecting subscribe message from " + sMessage.getSubscriber() + " for topic " + sMessage.getTopic()); } // if we are not associated with this topic at all, we simply let the subscribe go // closer to the root if (manager == null) { return true; } } else { // if we are not associated with this topic at all, let the // anycast continue if (manager == null) { return true; } ScribeClient[] clients = manager.getClients(); // see if one of our clients will accept the anycast for (int i = 0; i < clients.length; i++) { if (clients[i].anycast(aMessage.getTopic(), aMessage.getContent())) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Accepting anycast message from " + aMessage.getSource() + " for topic " + aMessage.getTopic()); } return false; } } // if we are the orginator for this anycast and it already has a destination, // we let it go ahead if (aMessage.getSource().getId().equals(endpoint.getId()) && (message.getNextHopHandle() != null) && (!handle.equals(message.getNextHopHandle()))) { return true; } if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Rejecting anycast message from " + aMessage.getSource() + " for topic " + aMessage.getTopic()); } } // add the local node to the visited list aMessage.addVisited(endpoint.getLocalNodeHandle()); // allow the policy to select the order in which the nodes are visited policy.directAnycast(aMessage, manager.getParent(), manager.getChildren()); // reset the source of the message to be us aMessage.setSource(endpoint.getLocalNodeHandle()); // get the next hop NodeHandle handle = aMessage.getNext(); // make sure that the next node is alive while ((handle != null) && (!handle.isAlive())) { handle = aMessage.getNext(); } if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Forwarding anycast message for topic " + aMessage.getTopic() + "on to " + handle); } if (handle == null) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Anycast " + aMessage + " failed."); } // if it's a subscribe message, send a subscribe failed message back // as a courtesy if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) aMessage; if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage to " + sMessage.getSubscriber()); } endpoint.route(null, new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()), sMessage.getSubscriber()); } } else { endpoint.route(null, aMessage, handle); } return false; } return true; } /** * This method is called on the application at the destination node for the * given id. * * @param id The destination id of the message * @param message The message being sent */ public void deliver(Id id, Message message) { if (logger.level <= Logger.FINEST) { logger.log(endpoint.getId() + ": Deliver called with " + id + " " + message); } if (message instanceof AnycastMessage) { AnycastMessage aMessage = (AnycastMessage) message; // if we are the recipient to someone else's subscribe, then we should have processed // this message in the forward() method. // Otherwise, we received our own subscribe message, which means that we are // the root if (aMessage.getSource().getId().equals(endpoint.getId())) { if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) message; outstanding.remove(new Integer(sMessage.getId())); if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received our own subscribe message " + aMessage + " for topic " + aMessage.getTopic() + " - we are the root."); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " + aMessage.getTopic() + " - was generated by us."); } } } else { // here, we have had a subscribe message delivered, which means that we are the root, but // our policy says that we cannot take on this child if (aMessage instanceof SubscribeMessage) { SubscribeMessage sMessage = (SubscribeMessage) aMessage; if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage (at root) to " + sMessage.getSubscriber()); } endpoint.route(null, new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()), sMessage.getSubscriber()); } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " + aMessage.getTopic() + " - not generated by us, but was expected to be."); } } } } else if (message instanceof SubscribeAckMessage) { SubscribeAckMessage saMessage = (SubscribeAckMessage) message; TopicManager manager = (TopicManager) topics.get(saMessage.getTopic()); ackMessageReceived(saMessage); if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic()); } if (!saMessage.getSource().isAlive()) { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic()); } } // if we're the root, reject the ack message if (isRoot(saMessage.getTopic())) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (we are the root) from " + saMessage.getSource() + " for topic " + saMessage.getTopic()); } endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource()); } else { // if we don't know about this topic, then we unsubscribe // if we already have a parent, then this is either an errorous // subscribe ack, or our path to the root has changed. if (manager != null) { if (manager.getParent() == null) { manager.setParent(saMessage.getSource()); } if (manager.getParent().equals(saMessage.getSource())) { manager.setPathToRoot(saMessage.getPathToRoot()); } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received somewhat unexpected subscribe ack message (already have parent " + manager.getParent() + ") from " + saMessage.getSource() + " for topic " + saMessage.getTopic() + " - the new policy is now to accept the message"); } NodeHandle parent = manager.getParent(); manager.setParent(saMessage.getSource()); manager.setPathToRoot(saMessage.getPathToRoot()); endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), parent); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected subscribe ack message from " + saMessage.getSource() + " for unknown topic " + saMessage.getTopic()); } endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource()); } } } else if (message instanceof SubscribeLostMessage) { SubscribeLostMessage slMessage = (SubscribeLostMessage) message; lostMessageReceived(slMessage); } else if (message instanceof SubscribeFailedMessage) { SubscribeFailedMessage sfMessage = (SubscribeFailedMessage) message; failedMessageReceived(sfMessage); } else if (message instanceof PublishRequestMessage) { PublishRequestMessage prMessage = (PublishRequestMessage) message; TopicManager manager = (TopicManager) topics.get(prMessage.getTopic()); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Received publish request message with data " + prMessage.getContent() + " for topic " + prMessage.getTopic()); } // if message is for a non-existant topic, drop it on the floor (we are the root, after all) // otherwise, turn it into a publish message, and forward it on if (manager == null) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received publish request message for non-existent topic " + prMessage.getTopic() + " - dropping on floor."); } } else { deliver(prMessage.getTopic().getId(), new PublishMessage(prMessage.getSource(), prMessage.getTopic(), prMessage.getContent())); } } else if (message instanceof PublishMessage) { PublishMessage pMessage = (PublishMessage) message; TopicManager manager = (TopicManager) topics.get(pMessage.getTopic()); if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Received publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic()); } // if we don't know about this topic, or this message did // not come from our parent, send an unsubscribe message // otherwise, we deliver the message to all clients and forward the // message to all children if ((manager != null) && ((manager.getParent() == null) || (manager.getParent().equals(pMessage.getSource())))) { pMessage.setSource(handle); ScribeClient[] clients = manager.getClients(); for (int i = 0; i < clients.length; i++) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Delivering publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic() + " to client " + clients[i]); } clients[i].deliver(pMessage.getTopic(), pMessage.getContent()); } NodeHandle[] handles = manager.getChildren(); for (int i = 0; i < handles.length; i++) { if (logger.level <= Logger.FINER) { logger.log(endpoint.getId() + ": Forwarding publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic() + " to child " + handles[i]); } endpoint.route(null, new PublishMessage(endpoint.getLocalNodeHandle(), pMessage.getTopic(), pMessage.getContent()), handles[i]); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected publish message from " + pMessage.getSource() + " for unknown topic " + pMessage.getTopic()); } endpoint.route(null, new UnsubscribeMessage(handle, pMessage.getTopic()), pMessage.getSource()); } } else if (message instanceof UnsubscribeMessage) { UnsubscribeMessage uMessage = (UnsubscribeMessage) message; if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received unsubscribe message from " + uMessage.getSource() + " for topic " + uMessage.getTopic()); } removeChild(uMessage.getTopic(), uMessage.getSource(), false); } else if (message instanceof DropMessage) { DropMessage dMessage = (DropMessage) message; if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received drop message from " + dMessage.getSource() + " for topic " + dMessage.getTopic()); } TopicManager manager = (TopicManager) topics.get(dMessage.getTopic()); if (manager != null) { if ((manager.getParent() != null) && manager.getParent().equals(dMessage.getSource())) { // we set the parent to be null, and then send out another subscribe message manager.setParent(null); ScribeClient[] clients = manager.getClients(); if (clients.length > 0) { sendSubscribe(dMessage.getTopic(), clients[0], null); } else { sendSubscribe(dMessage.getTopic(), null, null); } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected drop message from non-parent " + dMessage.getSource() + " for topic " + dMessage.getTopic() + " - ignoring"); } } } else { if (logger.level <= Logger.WARNING) { logger.log(endpoint.getId() + ": Received unexpected drop message from " + dMessage.getSource() + " for unknown topic " + dMessage.getTopic() + " - ignoring"); } } } else if (message instanceof MaintenanceMessage) { if (logger.level <= Logger.FINE) { logger.log(endpoint.getId() + ": Received maintenance message"); } Iterator i = topics.values().iterator(); // for each topic, make sure our parent is still alive while (i.hasNext()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -