⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consistentjoinprotocol.java

📁 pastry的java实现的2.0b版
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*************************************************************************"FreePastry" Peer-to-Peer Application Development Substrate Copyright 2002, Rice University. All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:- Redistributions of source code must retain the above copyrightnotice, this list of conditions and the following disclaimer.- Redistributions in binary form must reproduce the above copyrightnotice, this list of conditions and the following disclaimer in thedocumentation and/or other materials provided with the distribution.- Neither  the name  of Rice  University (RICE) nor  the names  of itscontributors may be  used to endorse or promote  products derived fromthis software without specific prior written permission.This software is provided by RICE and the contributors on an "as is"basis, without any representations or warranties of any kind, expressor implied including, but not limited to, representations orwarranties of non-infringement, merchantability or fitness for aparticular purpose. In no event shall RICE or contributors be liablefor any direct, indirect, incidental, special, exemplary, orconsequential damages (including, but not limited to, procurement ofsubstitute goods or services; loss of use, data, or profits; orbusiness interruption) however caused and on any theory of liability,whether in contract, strict liability, or tort (including negligenceor otherwise) arising in any way out of the use of this software, evenif advised of the possibility of such damage.********************************************************************************//* *  Created on Apr 13, 2005 */package rice.pastry.standard;import java.io.IOException;import java.util.*;import rice.environment.logging.Logger;import rice.environment.params.Parameters;import rice.p2p.commonapi.rawserialization.*;import rice.pastry.*;import rice.pastry.leafset.LeafSet;import rice.pastry.messaging.Message;import rice.pastry.routing.RoutingTable;import rice.selector.LoopObserver;import rice.selector.SelectorManager;import rice.selector.TimerTask;/** * Does not setReady until contacting entire leafset which gossips new members. * Provides consistency as long as checkLiveness() never incorrectly reports a * node faulty. Based on MSR-TR-2003-94. The difference is that our assumption * that checkLiveness() is much stronger because we are using DSR rather than * checking ourself. Another difference is that we are unwilling to pull nodes * from our leafset without checkingLiveness() ourself. * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author Jeff Hoye */public class ConsistentJoinProtocol extends StandardJoinProtocol implements Observer, NodeSetListener, LoopObserver {  /**   * This variable is set to prevent the process from going to sleep or not   * being scheduled for too long.   */  protected final int MAX_TIME_TO_BE_SCHEDULED;  /**   * Suppresses sendTheMessage() if we are not ready to do this part of the join   * process, or we are already done.   */  protected boolean tryingToGoReady = false;  /**   * Set of NodeHandles that know about us. -> Object   */  WeakHashMap gotResponse;  /**   * Nodes that we think are dead. NodeHandle -> FailedTime   */  Hashtable failed;  TimerTask cleanupTask;//  static class TestNodeHandle extends NodeHandle {//    int num;//    public TestNodeHandle(int num) {//      this.num = num;//    }////    public NodeId getNodeId() {//      return null;//    }////    public int getLiveness() {//      return 0;//    }////    public int proximity() {//      return 0;//    }////    public boolean ping() {//      return false;//    }////    public boolean equals(Object obj) {//      return false;//    }////    public int hashCode() {//      return 0;//    }////    public void receiveMessage(Message msg) {//    }////    public String toString() {//      return "NH: "+num;//    }//  }////  /**//   * To test the comparator...//   * @param foo//   *///  public static void main(String[] foo) {//    // we want sort to return newest first, so the list should get flipped//    ArrayList l = new ArrayList();//    for (int i = 0; i < 20; i++) {//      l.add(new FailedTime(new TestNodeHandle(i), i));//    }//    Collections.sort(l);//    Iterator i = l.iterator();//    while(i.hasNext()) {//      System.out.println(i.next());//    }//  }  /**   * how long a node should remain in failed acquired from   * pastry_protocol_consistentJoin_failedRetentionTime   */  int failedNodeExpirationTime;  /**   * maximum number of failed entries to gossip sends only the most recent   * failures acquired from pastry_protocol_consistentJoin_maxFailedToSend   */  int maxFailedEntries;  /**   * NodeHandles I'm observing   */  HashSet observing;  /**   * Will retry sending ConsistentJoinMsg to all neighbors who have not   * responded on this interval. Only necessary if somehow the message was   * dropped.   */  public final int RETRY_INTERVAL;  TimerTask retryTask;  /**   * Constructor for ConsistentJoinProtocol.   *   * @param ln DESCRIBE THE PARAMETER   * @param lh DESCRIBE THE PARAMETER   * @param rt DESCRIBE THE PARAMETER   * @param ls DESCRIBE THE PARAMETER   */  public ConsistentJoinProtocol(PastryNode ln, NodeHandle lh,                                RoutingTable rt, LeafSet ls) {    this(ln, lh, rt, ls, null);  }  /**   * Constructor takes in the usual suspects.   *   * @param ln DESCRIBE THE PARAMETER   * @param lh DESCRIBE THE PARAMETER   * @param rt DESCRIBE THE PARAMETER   * @param ls DESCRIBE THE PARAMETER   * @param md DESCRIBE THE PARAMETER   */  public ConsistentJoinProtocol(PastryNode ln, NodeHandle lh,                                RoutingTable rt, LeafSet ls, MessageDeserializer md) {    super(ln, lh, rt, ls, md != null ? md : new CJPDeserializer(ln));    gotResponse = new WeakHashMap();    failed = new Hashtable();    observing = new HashSet();    ls.addNodeSetListener(this);    ln.addObserver(this);    Parameters p = ln.getEnvironment().getParameters();    MAX_TIME_TO_BE_SCHEDULED = p.getInt("pastry_protocol_consistentJoin_max_time_to_be_scheduled");    RETRY_INTERVAL = p.getInt("pastry_protocol_consistentJoin_retry_interval");    failedNodeExpirationTime = p.getInt("pastry_protocol_consistentJoin_failedRetentionTime");    // 90000    maxFailedEntries = p.getInt("pastry_protocol_consistentJoin_maxFailedToSend");    // 20    int cleanupInterval = p.getInt("pastry_protocol_consistentJoin_cleanup_interval");    // 30000    ln.getEnvironment().getSelectorManager().addLoopObserver(this);    cleanupTask =      new TimerTask() {        public void run() {          if (logger.level <= Logger.FINE) {            logger.log("CJP: Cleanup task.");          }          synchronized (failed) {            long now = thePastryNode.getEnvironment().getTimeSource().currentTimeMillis();            long expiration = now - failedNodeExpirationTime;            Iterator i = failed.values().iterator();            while (i.hasNext()) {              FailedTime ft = (FailedTime) i.next();              if (ft.time < expiration) {                if (logger.level <= Logger.FINE) {                  logger.log("CJP: Removing " + ft.handle + " from failed set.");                }                i.remove();                ft.handle.deleteObserver(ConsistentJoinProtocol.this);              } else {                if (logger.level <= Logger.FINER) {                  logger.log("CJP: Not Removing " + ft.handle + " from failed set until " + (ft.time + failedNodeExpirationTime) + " which is another " + (ft.time + failedNodeExpirationTime - now) + " millis.");                }              }            }          }        }      };    ln.getEnvironment().getSelectorManager().schedule(cleanupTask, cleanupInterval, cleanupInterval);  }  /**   * This is where we start out, when the StandardJoinProtocol would call   * setReady();   */  protected void setReady() {    if (tryingToGoReady) {      return;    }    tryingToGoReady = true;    if (logger.level <= Logger.INFO) {      logger.log("ConsistentJonProtocol.setReady()");    }    gotResponse.clear();    //failed.clear(); // done by cleanup task as of March 6th, 2006    // send a probe to everyone in the leafset    Iterator i = leafSet.neighborSet(Integer.MAX_VALUE).iterator();    while (i.hasNext()) {      NodeHandle nh = (NodeHandle) i.next();      sendTheMessage(nh, false);    }    retryTask = thePastryNode.scheduleMsg(new RequestFromEveryoneMsg(getAddress()), RETRY_INTERVAL, RETRY_INTERVAL);  }  /**   * Observes all NodeHandles added to LeafSet   *   * @param nh the nodeHandle to add   */  public void addToLeafSet(NodeHandle nh) {    leafSet.put(nh);    if (!observing.contains(nh)) {      if (logger.level <= Logger.FINE) {        logger.log("CJP observing " + nh);      }      nh.addObserver(this);      observing.add(nh);    }  }  /**   * DESCRIBE THE METHOD   */  public void requestFromEveryoneWeHaventHeardFrom() {    if (thePastryNode.isReady()) {      retryTask.cancel();      return;    }    Collection c = whoDoWeNeedAResponseFrom();    if (logger.level <= Logger.INFO) {      logger.log("CJP: timeout1, still waiting to hear from " + c.size() + " nodes.");    }    Iterator i = c.iterator();    while (i.hasNext()) {      NodeHandle nh = (NodeHandle) i.next();      if (logger.level <= Logger.FINE) {        logger.log("CJP: timeout2, still waiting to hear from " + nh);      }      //nh.checkLiveness();      sendTheMessage(nh, false);    }  }  /**   * Call this if there is an event such that you may have not received messages   * for long enough for other nodes to call you faulty. This method will call   * PastryNode.setReady(false) which will stop delivering messages, and then   * via the observer pattern will call this.update(PN, FALSE) which will call   * setReady() which will begin the join process again.   */  public void otherNodesMaySuspectFaulty() {    thePastryNode.setReady(false);  }  /**   * Returns all members of the leafset that are not in gotResponse   *   * @return   */  public Collection whoDoWeNeedAResponseFrom() {    HashSet ret = new HashSet();    for (int i = -leafSet.ccwSize(); i <= leafSet.cwSize(); i++) {      if (i != 0) {        NodeHandle nh = leafSet.get(i);        if (gotResponse.get(nh) == null) {          ret.add(nh);        }      }    }    return ret;  }  /**   * Handle the CJM as in the MSR-TR   *   * @param msg DESCRIBE THE PARAMETER   */  public void receiveMessage(Message msg) {    if (msg instanceof ConsistentJoinMsg) {      ConsistentJoinMsg cjm = (ConsistentJoinMsg) msg;      // identify node j, the sender of the message      NodeHandle j = cjm.ls.get(0);      // failed_i := failed_i - {j}      failed.remove(j);      if (thePastryNode.isReady()) {        if (cjm.request) {          sendTheMessage(j, true);        }        return;      }      // L_i.add(j);      addToLeafSet(j);      // for each n in L_i and failed do {probe}      // rather than removing everyone in the remote failedset,      // checkLiveness on them all      Iterator it = cjm.failed.iterator();      while (it.hasNext()) {        NodeHandle nh = (NodeHandle) it.next();        if (leafSet.member(nh)) {          if (nh.getLiveness() == NodeHandle.LIVENESS_DEAD) {            // if we already found them dead, don't bother            // hopefully this is redundant with the leafset protocol            leafSet.remove(nh);          } else {            if (logger.level <= Logger.FINE) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -