⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribeimpl.java

📁 p2p仿真器。开发者可以工作在覆盖层中进行创造和测试逻辑算法或者创建和测试新的服务。PlanetSim还可以将仿真代码平稳转换为在Internet上的实验代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
      // if message is for a non-existant topic, drop it on the floor (we are the root, after all)      // otherwise, turn it into a publish message, and forward it on      if (manager == null) {        Logger.log(endpoint.getId() + ": Received publish request message for non-existent topic " +          prMessage.getTopic() + " - dropping on floor.",Logger.EVENT_LOG);      } else {        deliver(prMessage.getTopic().getId(), new PublishMessage(prMessage.getSource(), prMessage.getTopic(), prMessage.getContent()));      }    } else if (message instanceof PublishMessage) {      PublishMessage pMessage = (PublishMessage) message;      TopicManager manager = (TopicManager) topics.get(pMessage.getTopic());      Logger.log(endpoint.getId() + ": Received publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic(),Logger.EVENT_LOG);      // if we don't know about this topic, send an unsubscribe message      // otherwise, we deliver the message to all clients and forward the      // message to all children      if (manager != null) {        pMessage.setSource(handle);        ScribeClient[] clients = manager.getClients();        for (int i = 0; i < clients.length; i++) {          Logger.log(endpoint.getId() + ": Delivering publish message with data " + pMessage.getContent() + " for topic " +            pMessage.getTopic() + " to client " + clients[i],Logger.EVENT_LOG);          clients[i].deliver(pMessage.getTopic(), pMessage.getContent());        }        NodeHandle[] handles = manager.getChildren();        for (int i = 0; i < handles.length; i++) {          Logger.log(endpoint.getId() + ": Forwarding publish message with data " + pMessage.getContent() + " for topic " +            pMessage.getTopic() + " to child " + handles[i],Logger.EVENT_LOG);          endpoint.route(handles[i].getId(), pMessage, handles[i]);        }      } else {        Logger.log(endpoint.getId() + ": Received unexpected publish message from " +          pMessage.getSource() + " for unknown topic " + pMessage.getTopic(),Logger.EVENT_LOG);        endpoint.route(pMessage.getSource().getId(), new UnsubscribeMessage(handle, pMessage.getTopic()), pMessage.getSource());      }    } else if (message instanceof UnsubscribeMessage) {      UnsubscribeMessage uMessage = (UnsubscribeMessage) message;      Logger.log(endpoint.getId() + ": Received unsubscribe message from " +        uMessage.getSource() + " for topic " + uMessage.getTopic(),Logger.EVENT_LOG);      removeChild(uMessage.getTopic(), uMessage.getSource(), false);    } else if (message instanceof DropMessage) {      DropMessage dMessage = (DropMessage) message;      Logger.log(endpoint.getId() + ": Received drop message from " + dMessage.getSource() + " for topic " + dMessage.getTopic(),Logger.EVENT_LOG);            TopicManager manager = (TopicManager) topics.get(dMessage.getTopic());      if (manager != null) {        if ((manager.getParent() != null) && manager.getParent().equals(dMessage.getSource())) {          // we set the parent to be null, and then send out another subscribe message          manager.setParent(null);          ScribeClient[] clients = manager.getClients();          if (clients.length > 0)            sendSubscribe(dMessage.getTopic(), clients[0], null);          else            sendSubscribe(dMessage.getTopic(), null, null);        } else {          Logger.log(endpoint.getId() + ": Received unexpected drop message from non-parent " +                      dMessage.getSource() + " for topic " + dMessage.getTopic() + " - ignoring",Logger.EVENT_LOG);        }      } else {        Logger.log(endpoint.getId() + ": Received unexpected drop message from " +                    dMessage.getSource() + " for unknown topic " + dMessage.getTopic() + " - ignoring",Logger.EVENT_LOG);      }    } else if (message instanceof ReplicaSetMessage) {    	    	ReplicaSetMessage mrs = (ReplicaSetMessage) message;       if (mrs.isRequest()){       	  int max = mrs.getMaxRank();       	  Id sourceId = mrs.getMessageKey();       	  mrs = new ReplicaSetMessage(endpoint.getId(),endpoint.replicaSet(endpoint.getId(),max));    	  try {    	  		endpoint.route(endpoint.getId(),mrs,GenericFactory.buildNodeHandle(sourceId,true));    	  } catch (InitializationException e) {    	  		e.printStackTrace();    	  }       }       else {         mrs.getReplicaSet();       }        } else {      Logger.log(endpoint.getId() + ": Received unknown message " + message + " - dropping on floor.",Logger.EVENT_LOG);    }  }  /**   * This method is invoked to inform the application that the given node has either joined or left   * the neighbor set of the local node, as the set would be returned by the neighborSet call.   *   * @param handle The handle that has joined/left   * @param joined Whether the node has joined or left   */  public void update(NodeHandle handle, boolean joined) {    Set set = topics.keySet();    Iterator e = set.iterator();    TopicManager manager;    Topic topic;    while(e.hasNext()){      topic = (Topic)e.next();      manager = (TopicManager)topics.get(topic);      if (joined){        // check if new guy is root, we were old root, then subscribe        if (manager.getParent() == null){          // send subscribe message          sendSubscribe(topic, null, null);        }      } else {        if (isRoot(topic) && (manager.getParent() != null)) {          endpoint.route(manager.getParent().getId(), new UnsubscribeMessage(handle, topic), manager.getParent());          manager.setParent(null);        }      }    }      }  /**   * Sets the name for this applicaton.   * @param applicationName Name for this application.   * @return The same instance, once it has been updated.   */  public Application setValues(String applicationName)  {      this.appId = applicationName;      return this;  }    /**   * Class which keeps track of a given topic   *   * @version $Id: ScribeImpl.java,v 1.14 2003/10/22 03:16:40 amislove Exp $   * @author amislove   */  public class TopicManager implements Observer {    /**     * DESCRIBE THE FIELD     */    protected Topic topic;    /**     * The current path to the root for this node     */    protected Id[] pathToRoot;    /**     * DESCRIBE THE FIELD     */    protected Vector clients;    /**     * DESCRIBE THE FIELD     */    protected Vector children;    /**     * DESCRIBE THE FIELD     */    protected NodeHandle parent;    /**     * Constructor for TopicManager.     *     * @param topic DESCRIBE THE PARAMETER     * @param client DESCRIBE THE PARAMETER     */    public TopicManager(Topic topic, ScribeClient client) {      this(topic);      addClient(client);    }    /**     * Constructor for TopicManager.     *     * @param topic DESCRIBE THE PARAMETER     * @param child DESCRIBE THE PARAMETER     */    public TopicManager(Topic topic, NodeHandle child) {      this(topic);      addChild(child);    }    /**     * Constructor for TopicManager.     *     * @param topic DESCRIBE THE PARAMETER     */    protected TopicManager(Topic topic) {      this.topic = topic;      this.clients = new Vector();      this.children = new Vector();      setPathToRoot(new Id[0]);    }    /**     * Gets the Parent attribute of the TopicManager object     *     * @return The Parent value     */    public NodeHandle getParent() {      return parent;    }    /**     * Gets the Clients attribute of the TopicManager object     *     * @return The Clients value     */    public ScribeClient[] getClients() {      return (ScribeClient[]) clients.toArray(new ScribeClient[0]);    }    /**     * Gets the Children attribute of the TopicManager object     *     * @return The Children value     */    public NodeHandle[] getChildren() {      return (NodeHandle[]) children.toArray(new NodeHandle[0]);    }    /**     * Gets the PathToRoot attribute of the TopicManager object     *     * @return The PathToRoot value     */    public Id[] getPathToRoot() {      return pathToRoot;    }    /**     * Sets the PathToRoot attribute of the TopicManager object     *     * @param pathToRoot The new PathToRoot value     */    public void setPathToRoot(Id[] pathToRoot) {      // build the path to the root for the new node      this.pathToRoot = new Id[pathToRoot.length + 1];      System.arraycopy(pathToRoot, 0, this.pathToRoot, 0, pathToRoot.length);      this.pathToRoot[pathToRoot.length] = endpoint.getId();      // now send the information out to our children      NodeHandle[] children = getChildren();      for (int i=0; i<children.length; i++) {        if (Arrays.asList(this.pathToRoot).contains(children[i].getId())) {          endpoint.route(children[i].getId(), new DropMessage(handle, topic), children[i]);          removeChild(children[i]);        } else {          endpoint.route(children[i].getId(), new SubscribeAckMessage(handle, topic, getPathToRoot(), Integer.MAX_VALUE), children[i]);        }      }    }    /**     * Sets the Parent attribute of the TopicManager object     *     * @param handle The new Parent value     */    public void setParent(NodeHandle handle) {      if ((handle != null) && (parent != null)) {        Logger.log(endpoint.getId() + ": Unexpectedly changing parents for topic " + topic,Logger.EVENT_LOG);      }      if (parent != null) {        parent.deleteObserver(this);      }      parent = handle;      setPathToRoot(new Id[0]);      if ((parent != null) && parent.isAlive()) {        parent.addObserver(this);      }    }    /**     * DESCRIBE THE METHOD     *     * @param o DESCRIBE THE PARAMETER     * @param arg DESCRIBE THE PARAMETER     */    public void update(Observable o, Object arg) {      if (arg.equals(NodeHandle.DECLARED_DEAD)) {        if (children.contains(o)) {          Logger.log(endpoint.getId() + ": Child " + o + " for topic " + topic + " has died - removing.",Logger.EVENT_LOG);          ScribeImpl.this.removeChild(topic, (NodeHandle) o);        } else if (o.equals(parent)) {          // if our parent has died, then we must resubscribe to the topic          Logger.log(endpoint.getId() + ": Parent " + parent + " for topic " + topic + " has died - resubscribing.",Logger.EVENT_LOG);                    setParent(null);          if (clients.size() > 0)            sendSubscribe(topic, (ScribeClient) clients.elementAt(0), null, ((NodeHandle) o).getId());          else            sendSubscribe(topic, null, null, ((NodeHandle) o).getId());        } else {          Logger.log(endpoint.getId() + ": Received unexpected update from " + o,Logger.EVENT_LOG);          o.deleteObserver(this);        }      }    }    /**     * Adds a feature to the Client attribute of the TopicManager object     *     * @param client The feature to be added to the Client attribute     */    public void addClient(ScribeClient client) {      if (!clients.contains(client)) {        clients.add(client);      }    }    /**     * DESCRIBE THE METHOD     *     * @param client DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     */    public boolean removeClient(ScribeClient client) {      clients.remove(client);      boolean unsub = ((clients.size() == 0) && (children.size() == 0));      // if we're going to unsubscribe, then we remove ourself as      // as observer      if (unsub && (parent != null)) {        parent.deleteObserver(this);      }      return unsub;    }    /**     * Adds a feature to the Child attribute of the TopicManager object     *     * @param child The feature to be added to the Child attribute     */    public void addChild(NodeHandle child) {      if ((!children.contains(child)) && child.isAlive()) {        children.add(child);        child.addObserver(this);      }    }    /**     * DESCRIBE THE METHOD     *     * @param child DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     */    public boolean removeChild(NodeHandle child) {      children.remove(child);      child.deleteObserver(this);      boolean unsub = ((clients.size() == 0) && (children.size() == 0));      // if we're going to unsubscribe, then we remove ourself as      // as observer      if (unsub && (parent != null)) {        parent.deleteObserver(this);      }      return unsub;    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -