⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 scribeimpl.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
          if (path.contains(previousParent)) {            if (logger.level <= Logger.INFO) {              logger.log(endpoint.getId() + ": Rejecting subscribe message from " +                sMessage.getSubscriber() + " for topic " + sMessage.getTopic() +                " because we are on the subscriber's path to the root.");            }            return true;          }        }        ScribeClient[] clients = new ScribeClient[0];        NodeHandle[] handles = new NodeHandle[0];        if (manager != null) {          clients = manager.getClients();          handles = manager.getChildren();        }        // check if child is already there        if (Arrays.asList(handles).contains(sMessage.getSubscriber())) {          return false;        }        // see if the policy will allow us to take on this child        if (policy.allowSubscribe(sMessage, clients, handles)) {          if (logger.level <= Logger.FINER) {            logger.log(endpoint.getId() + ": Hijacking subscribe message from " +              sMessage.getSubscriber() + " for topic " + sMessage.getTopic());          }          // if so, add the child          addChild(sMessage.getTopic(), sMessage.getSubscriber(), sMessage.getId());          return false;        }        // otherwise, we are effectively rejecting the child        if (logger.level <= Logger.FINER) {          logger.log(endpoint.getId() + ": Rejecting subscribe message from " +            sMessage.getSubscriber() + " for topic " + sMessage.getTopic());        }        // if we are not associated with this topic at all, we simply let the subscribe go        // closer to the root        if (manager == null) {          return true;        }      } else {        // if we are not associated with this topic at all, let the        // anycast continue        if (manager == null) {          return true;        }        ScribeClient[] clients = manager.getClients();        // see if one of our clients will accept the anycast        for (int i = 0; i < clients.length; i++) {          if (clients[i].anycast(aMessage.getTopic(), aMessage.getContent())) {            if (logger.level <= Logger.FINER) {              logger.log(endpoint.getId() + ": Accepting anycast message from " +                aMessage.getSource() + " for topic " + aMessage.getTopic());            }            return false;          }        }        // if we are the orginator for this anycast and it already has a destination,        // we let it go ahead        if (aMessage.getSource().getId().equals(endpoint.getId()) &&          (message.getNextHopHandle() != null) &&          (!handle.equals(message.getNextHopHandle()))) {          return true;        }        if (logger.level <= Logger.FINER) {          logger.log(endpoint.getId() + ": Rejecting anycast message from " +            aMessage.getSource() + " for topic " + aMessage.getTopic());        }      }      // add the local node to the visited list      aMessage.addVisited(endpoint.getLocalNodeHandle());      // allow the policy to select the order in which the nodes are visited      policy.directAnycast(aMessage, manager.getParent(), manager.getChildren());      // reset the source of the message to be us      aMessage.setSource(endpoint.getLocalNodeHandle());      // get the next hop      NodeHandle handle = aMessage.getNext();      // make sure that the next node is alive      while ((handle != null) && (!handle.isAlive())) {        handle = aMessage.getNext();      }      if (logger.level <= Logger.FINER) {        logger.log(endpoint.getId() + ": Forwarding anycast message for topic " + aMessage.getTopic() + "on to " + handle);      }      if (handle == null) {        if (logger.level <= Logger.FINE) {          logger.log(endpoint.getId() + ": Anycast " + aMessage + " failed.");        }        // if it's a subscribe message, send a subscribe failed message back        // as a courtesy        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) aMessage;          if (logger.level <= Logger.FINER) {            logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage to " + sMessage.getSubscriber());          }          endpoint.route(null,            new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()),            sMessage.getSubscriber());        }      } else {        endpoint.route(null, aMessage, handle);      }      return false;    }    return true;  }  /**   * This method is called on the application at the destination node for the   * given id.   *   * @param id The destination id of the message   * @param message The message being sent   */  public void deliver(Id id, Message message) {    if (logger.level <= Logger.FINEST) {      logger.log(endpoint.getId() + ": Deliver called with " + id + " " + message);    }    if (message instanceof AnycastMessage) {      AnycastMessage aMessage = (AnycastMessage) message;      // if we are the recipient to someone else's subscribe, then we should have processed      // this message in the forward() method.      // Otherwise, we received our own subscribe message, which means that we are      // the root      if (aMessage.getSource().getId().equals(endpoint.getId())) {        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) message;          outstanding.remove(new Integer(sMessage.getId()));          if (logger.level <= Logger.FINE) {            logger.log(endpoint.getId() + ": Received our own subscribe message " + aMessage + " for topic " +              aMessage.getTopic() + " - we are the root.");          }        } else {          if (logger.level <= Logger.WARNING) {            logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " +              aMessage.getTopic() + " - was generated by us.");          }        }      } else {        // here, we have had a subscribe message delivered, which means that we are the root, but        // our policy says that we cannot take on this child        if (aMessage instanceof SubscribeMessage) {          SubscribeMessage sMessage = (SubscribeMessage) aMessage;          if (logger.level <= Logger.FINE) {            logger.log(endpoint.getId() + ": Sending SubscribeFailedMessage (at root) to " + sMessage.getSubscriber());          }          endpoint.route(null,            new SubscribeFailedMessage(handle, sMessage.getTopic(), sMessage.getId()),            sMessage.getSubscriber());        } else {          if (logger.level <= Logger.WARNING) {            logger.log(endpoint.getId() + ": Received unexpected delivered anycast message " + aMessage + " for topic " +              aMessage.getTopic() + " - not generated by us, but was expected to be.");          }        }      }    } else if (message instanceof SubscribeAckMessage) {      SubscribeAckMessage saMessage = (SubscribeAckMessage) message;      TopicManager manager = (TopicManager) topics.get(saMessage.getTopic());      ackMessageReceived(saMessage);      if (logger.level <= Logger.FINE) {        logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic());      }      if (!saMessage.getSource().isAlive()) {        if (logger.level <= Logger.WARNING) {          logger.log(endpoint.getId() + ": Received subscribe ack message from " + saMessage.getSource() + " for topic " + saMessage.getTopic());        }      }      // if we're the root, reject the ack message      if (isRoot(saMessage.getTopic())) {        if (logger.level <= Logger.FINE) {          logger.log(endpoint.getId() + ": Received unexpected subscribe ack message (we are the root) from " +            saMessage.getSource() + " for topic " + saMessage.getTopic());        }        endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource());      } else {        // if we don't know about this topic, then we unsubscribe        // if we already have a parent, then this is either an errorous        // subscribe ack, or our path to the root has changed.        if (manager != null) {          if (manager.getParent() == null) {            manager.setParent(saMessage.getSource());          }          if (manager.getParent().equals(saMessage.getSource())) {            manager.setPathToRoot(saMessage.getPathToRoot());          } else {            if (logger.level <= Logger.WARNING) {              logger.log(endpoint.getId() + ": Received somewhat unexpected subscribe ack message (already have parent " + manager.getParent() +                ") from " + saMessage.getSource() + " for topic " + saMessage.getTopic() + " - the new policy is now to accept the message");            }            NodeHandle parent = manager.getParent();            manager.setParent(saMessage.getSource());            manager.setPathToRoot(saMessage.getPathToRoot());            endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), parent);          }        } else {          if (logger.level <= Logger.WARNING) {            logger.log(endpoint.getId() + ": Received unexpected subscribe ack message from " +              saMessage.getSource() + " for unknown topic " + saMessage.getTopic());          }          endpoint.route(null, new UnsubscribeMessage(handle, saMessage.getTopic()), saMessage.getSource());        }      }    } else if (message instanceof SubscribeLostMessage) {      SubscribeLostMessage slMessage = (SubscribeLostMessage) message;      lostMessageReceived(slMessage);    } else if (message instanceof SubscribeFailedMessage) {      SubscribeFailedMessage sfMessage = (SubscribeFailedMessage) message;      failedMessageReceived(sfMessage);    } else if (message instanceof PublishRequestMessage) {      PublishRequestMessage prMessage = (PublishRequestMessage) message;      TopicManager manager = (TopicManager) topics.get(prMessage.getTopic());      if (logger.level <= Logger.FINER) {        logger.log(endpoint.getId() + ": Received publish request message with data " +          prMessage.getContent() + " for topic " + prMessage.getTopic());      }      // if message is for a non-existant topic, drop it on the floor (we are the root, after all)      // otherwise, turn it into a publish message, and forward it on      if (manager == null) {        if (logger.level <= Logger.FINE) {          logger.log(endpoint.getId() + ": Received publish request message for non-existent topic " +            prMessage.getTopic() + " - dropping on floor.");        }      } else {        deliver(prMessage.getTopic().getId(), new PublishMessage(prMessage.getSource(), prMessage.getTopic(), prMessage.getContent()));      }    } else if (message instanceof PublishMessage) {      PublishMessage pMessage = (PublishMessage) message;      TopicManager manager = (TopicManager) topics.get(pMessage.getTopic());      if (logger.level <= Logger.FINER) {        logger.log(endpoint.getId() + ": Received publish message with data " + pMessage.getContent() + " for topic " + pMessage.getTopic());      }      // if we don't know about this topic, or this message did      // not come from our parent, send an unsubscribe message      // otherwise, we deliver the message to all clients and forward the      // message to all children      if ((manager != null) && ((manager.getParent() == null) || (manager.getParent().equals(pMessage.getSource())))) {        pMessage.setSource(handle);        ScribeClient[] clients = manager.getClients();        for (int i = 0; i < clients.length; i++) {          if (logger.level <= Logger.FINER) {            logger.log(endpoint.getId() + ": Delivering publish message with data " + pMessage.getContent() + " for topic " +              pMessage.getTopic() + " to client " + clients[i]);          }          clients[i].deliver(pMessage.getTopic(), pMessage.getContent());        }        NodeHandle[] handles = manager.getChildren();        for (int i = 0; i < handles.length; i++) {          if (logger.level <= Logger.FINER) {            logger.log(endpoint.getId() + ": Forwarding publish message with data " + pMessage.getContent() + " for topic " +              pMessage.getTopic() + " to child " + handles[i]);          }          endpoint.route(null, new PublishMessage(endpoint.getLocalNodeHandle(), pMessage.getTopic(), pMessage.getContent()), handles[i]);        }      } else {        if (logger.level <= Logger.WARNING) {          logger.log(endpoint.getId() + ": Received unexpected publish message from " +            pMessage.getSource() + " for unknown topic " + pMessage.getTopic());        }        endpoint.route(null, new UnsubscribeMessage(handle, pMessage.getTopic()), pMessage.getSource());      }    } else if (message instanceof UnsubscribeMessage) {      UnsubscribeMessage uMessage = (UnsubscribeMessage) message;      if (logger.level <= Logger.FINE) {        logger.log(endpoint.getId() + ": Received unsubscribe message from " +          uMessage.getSource() + " for topic " + uMessage.getTopic());      }      removeChild(uMessage.getTopic(), uMessage.getSource(), false);    } else if (message instanceof DropMessage) {      DropMessage dMessage = (DropMessage) message;      if (logger.level <= Logger.FINE) {        logger.log(endpoint.getId() + ": Received drop message from " + dMessage.getSource() + " for topic " + dMessage.getTopic());      }      TopicManager manager = (TopicManager) topics.get(dMessage.getTopic());      if (manager != null) {        if ((manager.getParent() != null) && manager.getParent().equals(dMessage.getSource())) {          // we set the parent to be null, and then send out another subscribe message          manager.setParent(null);          ScribeClient[] clients = manager.getClients();          if (clients.length > 0) {            sendSubscribe(dMessage.getTopic(), clients[0], null);          } else {            sendSubscribe(dMessage.getTopic(), null, null);          }        } else {          if (logger.level <= Logger.WARNING) {            logger.log(endpoint.getId() + ": Received unexpected drop message from non-parent " +              dMessage.getSource() + " for topic " + dMessage.getTopic() + " - ignoring");          }        }      } else {        if (logger.level <= Logger.WARNING) {          logger.log(endpoint.getId() + ": Received unexpected drop message from " +            dMessage.getSource() + " for unknown topic " + dMessage.getTopic() + " - ignoring");        }      }    } else if (message instanceof MaintenanceMessage) {      if (logger.level <= Logger.FINE) {        logger.log(endpoint.getId() + ": Received maintenance message");      }      Iterator i = topics.values().iterator();      // for each topic, make sure our parent is still alive      while (i.hasNext()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -