⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribeimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
    CancellableTask task = endpoint.scheduleMessage(new SubscribeLostMessage(handle, topic, id), MESSAGE_TIMEOUT);    lost.put(new Integer(id), task);  }  /**   * Internal method which processes an ack message   *   * @param message The ackMessage   */  private void ackMessageReceived(SubscribeAckMessage message) {    ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId()));    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Removing client " + client + " from list of outstanding for ack " + message.getId());    }    CancellableTask task = (CancellableTask) lost.remove(new Integer(message.getId()));    if (task != null) {      task.cancel();    }  }  /**   * Internal method which processes a subscribe failed message   *   * @param message THe lost message   */  private void failedMessageReceived(SubscribeFailedMessage message) {    ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId()));    lost.remove(new Integer(message.getId()));    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Telling client " + client + " about FAILURE for outstanding ack " + message.getId());    }    if (client != null) {      client.subscribeFailed(message.getTopic());    }  }  /**   * Internal method which processes a subscribe lost message   *   * @param message THe lost message   */  private void lostMessageReceived(SubscribeLostMessage message) {    ScribeClient client = (ScribeClient) outstanding.remove(new Integer(message.getId()));    lost.remove(new Integer(message.getId()));    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Telling client " + client + " about LOSS for outstanding ack " + message.getId());    }    if (client != null) {      client.subscribeFailed(message.getTopic());    }  }  // ----- SCRIBE METHODS -----  /**   * Subscribes the given client to the provided topic. Any message published to   * the topic will be delivered to the Client via the deliver() method.   *   * @param topic The topic to subscribe to   * @param client The client to give messages to   */  public void subscribe(Topic topic, ScribeClient client) {    subscribe(topic, client, null);  }  /**   * Subscribes the given client to the provided topic. Any message published to   * the topic will be delivered to the Client via the deliver() method.   *   * @param topic The topic to subscribe to   * @param client The client to give messages to   * @param content DESCRIBE THE PARAMETER   */  public void subscribe(Topic topic, ScribeClient client, ScribeContent content) {    subscribe(topic, client, content instanceof RawScribeContent ? (RawScribeContent) content : new JavaSerializedScribeContent(content));  }  /**   * DESCRIBE THE METHOD   *   * @param topic DESCRIBE THE PARAMETER   * @param client DESCRIBE THE PARAMETER   * @param content DESCRIBE THE PARAMETER   */  public void subscribe(Topic topic, ScribeClient client, RawScribeContent content) {    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Subscribing client " + client + " to topic " + topic);    }    // if we don't know about this topic, subscribe    // otherwise, we simply add the client to the list    if (topics.get(topic) == null) {      topics.put(topic, new TopicManager(topic, client));      sendSubscribe(topic, client, content);    } else {      TopicManager manager = (TopicManager) topics.get(topic);      manager.addClient(client);      if ((manager.getParent() == null) && (!isRoot(topic))) {        sendSubscribe(topic, client, content);      }    }  }  /**   * Unsubscribes the given client from the provided topic.   *   * @param topic The topic to unsubscribe from   * @param client The client to unsubscribe   */  public void unsubscribe(Topic topic, ScribeClient client) {    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Unsubscribing client " + client + " from topic " + topic);    }    if (topics.get(topic) != null) {      TopicManager manager = (TopicManager) topics.get(topic);      // if this is the last client and there are no children,      // then we unsubscribe from the topic      if (manager.removeClient(client)) {        topics.remove(topic);        NodeHandle parent = manager.getParent();        if (parent != null) {          endpoint.route(null, new UnsubscribeMessage(handle, topic), parent);        }      }    } else {      if (logger.level <= Logger.WARNING) {        logger.log(endpoint.getId() + ": Attempt to unsubscribe client " + client + " from unknown topic " + topic);      }    }  }  /**   * Publishes the given message to the topic.   *   * @param topic The topic to publish to   * @param content The content to publish   */  public void publish(Topic topic, ScribeContent content) {    publish(topic, content instanceof RawScribeContent ? (RawScribeContent) content : new JavaSerializedScribeContent(content));  }  /**   * DESCRIBE THE METHOD   *   * @param topic DESCRIBE THE PARAMETER   * @param content DESCRIBE THE PARAMETER   */  public void publish(Topic topic, RawScribeContent content) {    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Publishing content " + content + " to topic " + topic);    }    endpoint.route(topic.getId(), new PublishRequestMessage(handle, topic, content), null);  }  /**   * Anycasts the given content to a member of the given topic   *   * @param topic The topic to anycast to   * @param content The content to anycast   */  public void anycast(Topic topic, ScribeContent content) {    if (content instanceof RawScribeContent) {      anycast(topic, (RawScribeContent) content);    } else {      anycast(topic, new JavaSerializedScribeContent(content));    }  }  /**   * DESCRIBE THE METHOD   *   * @param topic DESCRIBE THE PARAMETER   * @param content DESCRIBE THE PARAMETER   */  public void anycast(Topic topic, RawScribeContent content) {    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Anycasting content " + content + " to topic " + topic);    }    endpoint.route(topic.getId(), new AnycastMessage(handle, topic, content), null);  }  /**   * Adds a child to the given topic   *   * @param topic The topic to add the child to   * @param child The child to add   */  public void addChild(Topic topic, NodeHandle child) {    addChild(topic, child, Integer.MAX_VALUE);  }  /**   * Adds a child to the given topic, using the specified sequence number in the   * ack message sent to the child.   *   * @param topic The topic   * @param child THe child to add   * @param id THe seuqnce number   */  protected void addChild(Topic topic, NodeHandle child, int id) {    if (logger.level <= Logger.FINER) {      logger.log(endpoint.getId() + ": Adding child " + child + " to topic " + topic);    }    TopicManager manager = (TopicManager) topics.get(topic);    // if we don't know about the topic, we subscribe, otherwise,    // we simply add the child to the list    if (manager == null) {      manager = new TopicManager(topic, child);      topics.put(topic, manager);      if (logger.level <= Logger.FINER) {        logger.log(endpoint.getId() + ": Implicitly subscribing to topic " + topic);      }      sendSubscribe(topic, null, null);    } else {      manager.addChild(child);    }    // we send a confirmation back to the child    endpoint.route(null, new SubscribeAckMessage(handle, topic, manager.getPathToRoot(), id), child);    // and lastly notify the policy and all of the clients    policy.childAdded(topic, child);    ScribeClient[] clients = manager.getClients();    for (int i = 0; i < clients.length; i++) {      clients[i].childAdded(topic, child);    }  }  /**   * Removes a child from the given topic   *   * @param topic The topic to remove the child from   * @param child The child to remove   */  public void removeChild(Topic topic, NodeHandle child) {    removeChild(topic, child, true);  }  /**   * Removes a child from the given topic   *   * @param topic The topic to remove the child from   * @param child The child to remove   * @param sendDrop Whether or not to send a drop message to the chil   */  protected void removeChild(Topic topic, NodeHandle child, boolean sendDrop) {    if (logger.level <= Logger.FINE) {      logger.log(endpoint.getId() + ": Removing child " + child + " from topic " + topic);    }    if (topics.get(topic) != null) {      TopicManager manager = (TopicManager) topics.get(topic);      // if this is the last child and there are no clients, then      // we unsubscribe, if we are not the root      if (manager.removeChild(child)) {        topics.remove(topic);        NodeHandle parent = manager.getParent();        if (logger.level <= Logger.FINE) {          logger.log(endpoint.getId() + ": We no longer need topic " + topic + " - unsubscribing from parent " + parent);        }        if (parent != null) {          endpoint.route(null, new UnsubscribeMessage(handle, topic), parent);        }      }      if ((sendDrop) && (child.isAlive())) {        if (logger.level <= Logger.FINE) {          logger.log(endpoint.getId() + ": Informing child " + child + " that he has been dropped from topic " + topic);        }        // now, we tell the child that he has been dropped        endpoint.route(null, new DropMessage(handle, topic), child);      }      // and lastly notify the policy and all of the clients      policy.childRemoved(topic, child);      ScribeClient[] clients = manager.getClients();      for (int i = 0; i < clients.length; i++) {        clients[i].childRemoved(topic, child);      }    } else {      if (logger.level <= Logger.WARNING) {        logger.log(endpoint.getId() + ": Unexpected attempt to remove child " + child + " from unknown topic " + topic);      }    }  }  // ----- COMMON API METHODS -----  /**   * This method is invoked on applications when the underlying node is about to   * forward the given message with the provided target to the specified next   * hop. Applications can change the contents of the message, specify a   * different nextHop (through re-routing), or completely terminate the   * message.   *   * @param message The message being sent, containing an internal message along   *      with a destination key and nodeHandle next hop.   * @return Whether or not to forward the message further   */  public boolean forward(final RouteMessage message) {    Message internalMessage;    try {      internalMessage = message.getMessage(endpoint.getDeserializer());    } catch (IOException ioe) {      throw new RuntimeException(ioe);    }    if (logger.level <= Logger.FINEST) {      logger.log(endpoint.getId() + ": Forward called with " + internalMessage);    }    if (internalMessage instanceof AnycastMessage) {      AnycastMessage aMessage = (AnycastMessage) internalMessage;      // get the topic manager associated with this topic      TopicManager manager = (TopicManager) topics.get(aMessage.getTopic());      // if it's a subscribe message, we must handle it differently      if (internalMessage instanceof SubscribeMessage) {        SubscribeMessage sMessage = (SubscribeMessage) internalMessage;        // if this is our own subscribe message, ignore it        if (sMessage.getSource().getId().equals(endpoint.getId())) {          return true;        }        if (manager != null) {          // first, we have to make sure that we don't create a loop, which would occur          // if the subcribing node's previous parent is on our path to the root          Id previousParent = sMessage.getPreviousParent();          List path = Arrays.asList(manager.getPathToRoot());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -