⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribeimpl.java

📁 p2p仿真器。开发者可以工作在覆盖层中进行创造和测试逻辑算法或者创建和测试新的服务。PlanetSim还可以将仿真代码平稳转换为在Internet上的实验代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        NodeHandle parent = manager.getParent();        if (parent != null) {          endpoint.route(parent.getId(), new UnsubscribeMessage(handle, topic), parent);        }      }    } else {    	Logger.log(endpoint.getId() + ": Attempt to unsubscribe client " + client + " from unknown topic " + topic,Logger.EVENT_LOG);    }  }  /**   * Publishes the given message to the topic.   *   * @param topic The topic to publish to   * @param content The content to publish   */  public void publish(Topic topic, ScribeContent content) {    Logger.log(endpoint.getId() + ": Publishing content " + content + " to topic " + topic,Logger.EVENT_LOG);    endpoint.route(topic.getId(), new PublishRequestMessage(handle, topic, content), null);  }  /**   * Anycasts the given content to a member of the given topic   *   * @param topic The topic to anycast to   * @param content The content to anycast   */  public void anycast(Topic topic, ScribeContent content) {    Logger.log(endpoint.getId() + ": Anycasting content " + content + " to topic " + topic,Logger.EVENT_LOG);    endpoint.route(topic.getId(), new AnycastMessage(handle, topic, content), null);  }  /**   * Adds a child to the given topic   *   * @param topic The topic to add the child to   * @param child The child to add   */  public void addChild(Topic topic, NodeHandle child) {    Logger.log(endpoint.getId() + ": Adding child " + child + " to topic " + topic,Logger.EVENT_LOG);    TopicManager manager = (TopicManager) topics.get(topic);    // if we don't know about the topic, we subscribe, otherwise,    // we simply add the child to the list    if (manager == null) {      manager = new TopicManager(topic, child);      topics.put(topic, manager);      Logger.log(endpoint.getId() + ": Implicitly subscribing to topic " + topic,Logger.EVENT_LOG);      sendSubscribe(topic, null, null);    } else {      manager.addChild(child);    }    // we send a confirmation back to the child    endpoint.route(child.getId(), new SubscribeAckMessage(handle, topic, manager.getPathToRoot(), Integer.MAX_VALUE), child);    // and lastly notify all of the clients    ScribeClient[] clients = manager.getClients();    for (int i = 0; i < clients.length; i++) {      clients[i].childAdded(topic, child);    }  }  /**   * Removes a child from the given topic   *   * @param topic The topic to remove the child from   * @param child The child to remove   */  public void removeChild(Topic topic, NodeHandle child) {    removeChild(topic, child, true);  }  /**   * Removes a child from the given topic   *   * @param topic The topic to remove the child from   * @param child The child to remove   * @param sendDrop Whether or not to send a drop message to the chil   */  protected void removeChild(Topic topic, NodeHandle child, boolean sendDrop) {    Logger.log(endpoint.getId() + ": Removing child " + child + " from topic " + topic,Logger.EVENT_LOG);    if (topics.get(topic) != null) {      TopicManager manager = (TopicManager) topics.get(topic);      // if this is the last child and there are no clients, then      // we unsubscribe, if we are not the root      if (manager.removeChild(child)) {        topics.remove(topic);        NodeHandle parent = manager.getParent();        Logger.log(endpoint.getId() + ": We no longer need topic " + topic + " - unsubscribing from parent " + parent,Logger.EVENT_LOG);        if (parent != null) {          endpoint.route(parent.getId(), new UnsubscribeMessage(handle, topic), parent);        }      }      if ((sendDrop) && (child.isAlive())) {        Logger.log(endpoint.getId() + ": Informing child " + child + " that he has been dropped from topic " + topic,Logger.EVENT_LOG);                // now, we tell the child that he has been dropped        endpoint.route(child.getId(), new DropMessage(handle, topic), child);      }      // and lastly notify all of the clients      ScribeClient[] clients = manager.getClients();      for (int i = 0; i < clients.length; i++) {        clients[i].childRemoved(topic, child);      }    } else {      Logger.log(endpoint.getId() + ": Unexpected attempt to remove child " + child + " from unknown topic " + topic,Logger.EVENT_LOG);    }  }  // ----- COMMON API METHODS -----  /**   * This method is invoked on applications when the underlying node is about to forward the given   * message with the provided target to the specified next hop. Applications can change the   * contents of the message, specify a different nextHop (through re-routing), or completely   * terminate the message.   *   * @param message The message being sent, containing an internal message along with a destination   *      key and nodeHandle next hop.   * @return Whether or not to forward the message further   */  public boolean forward(final Message message) {  	  	//System.out.println(endpoint.getId()+" forward "+message);     Logger.log(endpoint.getId() + ": Forward called with " + message,Logger.EVENT_LOG);            if (message instanceof AnycastMessage) {      AnycastMessage aMessage = (AnycastMessage) message;                       // get the topic manager associated with this topic      TopicManager manager = (TopicManager) topics.get(aMessage.getTopic());      // if it's a subscribe message, we must handle it differently      if (message instanceof SubscribeMessage) {        SubscribeMessage sMessage = (SubscribeMessage) message;                               // if this is our own subscribe message, ignore it        if (sMessage.getSource().getId().equals(endpoint.getId())) {          return true;        }        if (manager != null) {          // first, we have to make sure that we don't create a loop, which would occur          // if the subcribing node's previous parent is on our path to the root          Id previousParent = sMessage.getPreviousParent();          List path = Arrays.asList(manager.getPathToRoot());          if (path.contains(previousParent)) {            Logger.log(endpoint.getId() + ": Rejecting subscribe message from " +          	//  System.out.println(endpoint.getId() + ": Rejecting subscribe message from " +                      sMessage.getSubscriber() + " for topic " + sMessage.getTopic() +                      " because we are on the subscriber's path to the root.",Logger.EVENT_LOG);            return true;          }        }                  ScribeClient[] clients = new ScribeClient[0];        NodeHandle[] handles = new NodeHandle[0];        if (manager != null) {          clients = manager.getClients();          handles = manager.getChildren();        }        // check if child is already there        if (Arrays.asList(handles).contains(sMessage.getSubscriber())){          return false;        }        // see if the policy will allow us to take on this child        if (policy.allowSubscribe(sMessage, clients, handles)) {          Logger.log(endpoint.getId() + ": Hijacking subscribe message from " +        	sMessage.getSubscriber() + " for topic " + sMessage.getTopic(),Logger.EVENT_LOG);        //	 System.out.println(endpoint.getId() + ": Hijacking subscribe message from " +        //    sMessage.getSubscriber().getId() + " for topic " + sMessage.getTopic());          // if so, add the child          addChild(sMessage.getTopic(), sMessage.getSubscriber());          return false;        }        // otherwise, we are effectively rejecting the child        Logger.log(endpoint.getId() + ": Rejecting subscribe message from " +        //System.out.println(endpoint.getId() + ": Rejecting subscribe message from " +        sMessage.getSubscriber() + " for topic " + sMessage.getTopic(),Logger.EVENT_LOG);        // if we are not associated with this topic at all, we simply let the subscribe go        // closer to the root        if (manager == null) {          return true;        }      } else {        // if we are not associated with this topic at all, let the        // anycast continue        if (manager == null) {          return true;        }        ScribeClient[] clients = manager.getClients();        // see if one of our clients will accept the anycast        for (int i = 0; i < clients.length; i++) {          if (clients[i].anycast(aMessage.getTopic(), aMessage.getContent())) {            Logger.log(endpoint.getId() + ": Accepting anycast message from " +              aMessage.getSource() + " for topic " + aMessage.getTopic(),Logger.EVENT_LOG);            return false;          }        }        // if we are the orginator for this anycast and it already has a destination,        // we let it go ahead        if (aMessage.getSource().getId().equals(endpoint.getId())          && ( aMessage.getNext() != null) &&  (! handle.equals(aMessage.getNext()))) { 	          //&& ( message.getNextHopHandle() != null) &&  (! handle.equals(message.getNextHopHandle())))                              return true;        }        Logger.log(endpoint.getId() + ": Rejecting anycast message from " +          aMessage.getSource() + " for topic " + aMessage.getTopic(),Logger.EVENT_LOG);      }      // add the local node to the visited list      aMessage.addVisited(endpoint.getLocalNodeHandle());      // allow the policy to select the order in which the nodes are visited      policy.directAnycast(aMessage, manager.getParent(), manager.getChildren());      // reset the source of the message to be us      aMessage.setSource(endpoint.getLocalNodeHandle());      // get the next hop      NodeHandle handle = aMessage.getNext();      // make sure that the next node is alive      while ((handle != null) && (!handle.isAlive())) {        handle = aMessage.getNext();      }      Logger.log(endpoint.getId() + ": Forwarding anycast message for topic " + aMessage.getTopic() + "on to " + handle,Logger.EVENT_LOG);      if (handle == null) {        Logger.log(endpoint.getId() + ": Anycast " + aMessage + " failed.",Logger.EVENT_LOG);        // if it's a subscribe message, send a subscribe failed message back        // as a courtesy        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) aMessage;                    Logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage to " + sMessage.getSubscriber(),Logger.EVENT_LOG);          endpoint.route(sMessage.getSubscriber().getId(),                         new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()),                         sMessage.getSubscriber());        }      } else {        endpoint.route(handle.getId(), aMessage, handle);                }      return false;    }    //else      //System.out.println(endpoint.getId()+" forward "+message);                 return true;  }  /**   * This method is called on the application at the destination node for the given id.   *   * @param id The destination id of the message   * @param message The message being sent   */  public void deliver(Id id, Message message)  {  	//System.out.println(endpoint.getId()+" deliver "+message);    Logger.log(endpoint.getId() + ": Deliver called with " + id + " " + message,Logger.EVENT_LOG);        if (message instanceof AnycastMessage) {      AnycastMessage aMessage = (AnycastMessage) message;            // if we are the recipient to someone else's subscribe, then we should have processed      // this message in the forward() method.      // Otherwise, we received our own subscribe message, which means that we are      // the root      if (aMessage.getSource().getId().equals(endpoint.getId())) {        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) message;         // System.out.println(endpoint.getId()+" deliver "+sMessage);                    outstanding.remove(new Integer(sMessage.getId()));          Logger.log(endpoint.getId() + ": Received our own subscribe message " + aMessage + " for topic " +            aMessage.getTopic() + " - we are the root.",Logger.EVENT_LOG);        } else {          Logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " +            aMessage.getTopic() + " - was generated by us.",Logger.EVENT_LOG);        }      } else {        // here, we have had a subscribe message delivered, which means that we are the root, but        // our policy says that we cannot take on this child        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) aMessage;          Logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage (at root) to " + sMessage.getSubscriber(),Logger.EVENT_LOG);          endpoint.route(sMessage.getSubscriber().getId(),                         new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()),                         sMessage.getSubscriber());        } else {          Logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " +                      aMessage.getTopic() + " - not generated by us, but was expected to be.",Logger.EVENT_LOG);        }      }    } else if (message instanceof SubscribeAckMessage) {    	//System.out.println(endpoint.getId()+" deliver "+message);       SubscribeAckMessage saMessage = (SubscribeAckMessage) message;      TopicManager manager = (TopicManager) topics.get(saMessage.getTopic());      ackMessageReceived(saMessage);      Logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG);      if (! saMessage.getSource().isAlive()) {        Logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG);        (new Exception()).printStackTrace();        System.exit(-1);      }                    // if we're the root, reject the ack message      if (isRoot(saMessage.getTopic())) {        Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (we are the root) from " +                 saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG);        endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource());      } else {        // if we don't know about this topic, then we unsubscribe        // if we already have a parent, then this is either an errorous        // subscribe ack, or our path to the root has changed.        if (manager != null) {          if (manager.getParent() == null) {            manager.setParent(saMessage.getSource());          }          if (manager.getParent().equals(saMessage.getSource())) {            manager.setPathToRoot(saMessage.getPathToRoot());          } else {            Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (already have parent " + manager.getParent() +                        ") from " + saMessage.getSource() + " for topic " + saMessage.getTopic(),Logger.EVENT_LOG);            endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource());          }        } else {          Logger.log(endpoint.getId() + ": Received unexpected subscribe ack message from " +                      saMessage.getSource() + " for unknown topic " + saMessage.getTopic(),Logger.EVENT_LOG);          endpoint.route(saMessage.getSource().getId(), new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource());        }      }    } else if (message instanceof SubscribeLostMessage) {      SubscribeLostMessage slMessage = (SubscribeLostMessage) message;      lostMessageReceived(slMessage);    } else if (message instanceof SubscribeFailedMessage) {      SubscribeFailedMessage sfMessage = (SubscribeFailedMessage) message;      failedMessageReceived(sfMessage);    } else if (message instanceof PublishRequestMessage) {      PublishRequestMessage prMessage = (PublishRequestMessage) message;      TopicManager manager = (TopicManager) topics.get(prMessage.getTopic());      Logger.log(endpoint.getId() + ": Received publish request message with data " +        prMessage.getContent() + " for topic " + prMessage.getTopic(),Logger.EVENT_LOG);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -