⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consistentjoinprotocol.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
              logger.log("CJP: checking liveness2 on " + nh);            }            nh.checkLiveness();          }        }      }      // we don't do the L_i.remove() because we don't trust this info      // L' stuff      // this is so we don't probe too many dudez.  For example:      // my leafset is not complete, so I will add anybody, but then keep      // getting closer and closer, each time knocking out the next guy      LeafSet lprime = leafSet.copy();      for (int i = -cjm.ls.ccwSize(); i <= cjm.ls.cwSize(); i++) {        NodeHandle nh = cjm.ls.get(i);        if (!failed.containsKey(nh) && nh.getLiveness() < NodeHandle.LIVENESS_DEAD) {          lprime.put(nh);        }      }      HashSet addThese = new HashSet();      for (int i = -lprime.ccwSize(); i <= lprime.cwSize(); i++) {        if (i != 0) {          NodeHandle nh = lprime.get(i);          if (!leafSet.member(nh)) {            addThese.add(nh);          }        }      }      Iterator it2 = addThese.iterator();      while (it2.hasNext()) {        NodeHandle nh = (NodeHandle) it2.next();        // he's not a member, but he could be        if (!failed.containsKey(nh) && nh.getLiveness() < NodeHandle.LIVENESS_DEAD) {          addToLeafSet(nh);          // probe          sendTheMessage(nh, false);        }      }      if (cjm.request) {        // send reply        sendTheMessage(j, true);      }      //else {      // done_probing:      // mark that he knows about us      gotResponse.put(j, new Object());      doneProbing();//      }    } else if (msg instanceof RequestFromEveryoneMsg) {      requestFromEveryoneWeHaventHeardFrom();    } else {      super.receiveMessage(msg);    }  }  /**   * Similar to the MSR-TR   */  void doneProbing() {    if (leafSet.isComplete()) {      // here is where we see if we can go active      HashSet toHearFrom = new HashSet();      HashSet seen = new HashSet();      String toHearFromStr = "";      int numToHearFrom = 0;      for (int i = -leafSet.ccwSize(); i <= leafSet.cwSize(); i++) {        if (i != 0) {          NodeHandle nh = leafSet.get(i);          if (!seen.contains(nh) && (gotResponse.get(nh) == null)) {            numToHearFrom++;            toHearFrom.add(nh);            toHearFromStr += nh + ":" + nh.getLiveness() + ",";          }          seen.add(nh);        }      }      if (numToHearFrom == 0) {        if (!thePastryNode.isReady()) {          // active_i = true;          thePastryNode.setReady();          retryTask.cancel();          tryingToGoReady = false;        }        // failed_i = {}        //      gotResponse.clear();        //failed.clear(); // done by cleanup task as of March 6th, 2006//        Iterator it2 = observing.iterator();//        while(it2.hasNext()) {//          NodeHandle nh = (NodeHandle)it2.next();//          nh.deleteObserver(this);//          it2.remove();//        }      } else {        if (logger.level <= Logger.FINE) {          logger.log("CJP: still need to hear from:" + toHearFromStr);        }      }    } else {      if (logger.level <= Logger.FINE) {        logger.log("CJP: LS is not complete: " + leafSet);      }      // sendTheMessage to leftmost and rightmost?      NodeHandle left = null;      NodeHandle right = null;      synchronized (leafSet) {        int index = -leafSet.ccwSize();        if (index != -leafSet.maxSize() / 2) {          left = leafSet.get(index);        }        index = leafSet.cwSize();        if (index != leafSet.maxSize() / 2) {          right = leafSet.get(index);        }      }      if (left != null) {        sendTheMessage(left, true);      }      if (right != null) {        sendTheMessage(right, true);      }    }  }  /**   * Sends a consistent join protocol message.   *   * @param nh   * @param reply if the reason we are sending this message is just as a   *      response   */  public void sendTheMessage(NodeHandle nh, boolean reply) {    if (!reply) {      if (!tryingToGoReady) {        return;      }//      logException(Logger.FINEST, "StackTrace", new Exception("Stack Trace"));    }    if (logger.level <= Logger.FINE) {      logger.log("CJP:  sendTheMessage(" + nh + "," + reply + ")");    }    // todo, may want to repeat this message as long as the node is alive if we    // are worried about rare message drops//    if (thePastryNode.isReady()) {    //failed.clear(); // done by cleanup task as of March 6th, 2006//    }    HashSet toSend;    if (failed.size() < maxFailedEntries) {      toSend = new HashSet(failed.keySet());    } else {      ArrayList l = new ArrayList(failed.values());      Collections.sort(l);      toSend = new HashSet();      for (int i = 0; i < maxFailedEntries; i++) {        FailedTime tf = (FailedTime) l.get(i);        toSend.add(tf.handle);      }    }    nh.receiveMessage(new ConsistentJoinMsg(leafSet, toSend, !reply));  }  /**   * DESCRIBE THE METHOD   *   * @param set DESCRIBE THE PARAMETER   * @param handle DESCRIBE THE PARAMETER   * @param added DESCRIBE THE PARAMETER   */  public void nodeSetUpdate(NodeSetEventSource set, NodeHandle handle, boolean added) {    if (thePastryNode.isReady()) {      return;    }    if (added) {      if (gotResponse.get(handle) == null) {        sendTheMessage(handle, false);      }    } else {      doneProbing();    }    return;  }  /**   * Can be PastryNode updates, leafset updates, or nodehandle updates.   *   * @param arg0 DESCRIBE THE PARAMETER   * @param arg DESCRIBE THE PARAMETER   */  public void update(Observable arg0, Object arg) {    if (logger.level <= Logger.FINEST) {      logger.log("CJP: update(" + arg0 + "," + arg + ")" + arg.getClass().getName());    }    // we went offline for whatever reason.  Now we need to try to come back online.    if (arg0 == thePastryNode) {      if (((Boolean) arg).booleanValue() == false) {        setReady();      }    }    if (arg0 instanceof NodeHandle) {//      if (thePastryNode.isReady()) {//        observing.remove(arg0);//        arg0.deleteObserver(this);//        return;//      }      // assume it's a NodeHandle, cause we      // want to throw the exception if it is something we don't recognize      NodeHandle nh = (NodeHandle) arg0;      if (((Integer) arg) == NodeHandle.DECLARED_DEAD) {        if (logger.level <= Logger.FINE) {          logger.log("CJP:" + arg0 + " declared dead");        }        if (!failed.containsKey(nh)) {          failed.put(nh, new FailedTime(nh, thePastryNode.getEnvironment().getTimeSource().currentTimeMillis()));        }        leafSet.remove(nh);        doneProbing();      }      if (((Integer) arg) == NodeHandle.DECLARED_LIVE) {        failed.remove(nh);        if (!thePastryNode.isReady()) {          if (leafSet.test(nh)) {            leafSet.put(nh);            sendTheMessage(nh, false);          }        }      }    }  }  /**   * Part of the LoopObserver interface. Used to detect if we may have been   * found faulty by other nodes.   *   * @return the minimum loop time we are interested in being notified about.   */  public int delayInterest() {    return MAX_TIME_TO_BE_SCHEDULED;  }  /**   * If it took longer than the time to detect faultiness, then other nodes may   * believe we are faulty. So we best rejoin.   *   * @param loopTime the time it took to do a single selection loop.   */  public void loopTime(int loopTime) {    if (loopTime > delayInterest()) {      otherNodesMaySuspectFaulty();    }  }  /**   * DESCRIBE THE METHOD   */  public void destroy() {    if (logger.level <= Logger.FINE) {      logger.log("CJP: destroy() called");    }    thePastryNode.getEnvironment().getSelectorManager().removeLoopObserver(this);    cleanupTask.cancel();  }  /**   * Used to trigger timer events.   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author Jeff Hoye   */  class RequestFromEveryoneMsg extends Message {    /**     * Constructor for RequestFromEveryoneMsg.     *     * @param address DESCRIBE THE PARAMETER     */    public RequestFromEveryoneMsg(int address) {      super(address);    }  }  // cleans up failed  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  static class FailedTime implements Comparable {    long time;    NodeHandle handle;    /**     * Constructor for FailedTime.     *     * @param handle DESCRIBE THE PARAMETER     * @param time DESCRIBE THE PARAMETER     */    public FailedTime(NodeHandle handle, long time) {      this.time = time;      this.handle = handle;    }    /**     * DESCRIBE THE METHOD     *     * @param arg0 DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     */    public int compareTo(Object arg0) {      FailedTime ft = (FailedTime) arg0;      // note this is backwards, because we want them sorted in reverse order      return (int) (ft.time - this.time);    }    /**     * DESCRIBE THE METHOD     *     * @return DESCRIBE THE RETURN VALUE     */    public String toString() {      return "FT:" + handle + " " + time;    }  }  /**   * DESCRIBE THE CLASS   *   * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $   * @author jeffh   */  static class CJPDeserializer extends SJPDeserializer {    /**     * Constructor for CJPDeserializer.     *     * @param pn DESCRIBE THE PARAMETER     */    public CJPDeserializer(PastryNode pn) {      super(pn);    }    /**     * DESCRIBE THE METHOD     *     * @param buf DESCRIBE THE PARAMETER     * @param type DESCRIBE THE PARAMETER     * @param priority DESCRIBE THE PARAMETER     * @param sender DESCRIBE THE PARAMETER     * @return DESCRIBE THE RETURN VALUE     * @exception IOException DESCRIBE THE EXCEPTION     */    public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException {      switch (type) {        case ConsistentJoinMsg.TYPE:          return new ConsistentJoinMsg(buf, pn, (NodeHandle) sender);      }      return super.deserialize(buf, type, priority, sender);    }  }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -