📄 consistentjoinprotocol.java
字号:
logger.log("CJP: checking liveness2 on " + nh); } nh.checkLiveness(); } } } // we don't do the L_i.remove() because we don't trust this info // L' stuff // this is so we don't probe too many dudez. For example: // my leafset is not complete, so I will add anybody, but then keep // getting closer and closer, each time knocking out the next guy LeafSet lprime = leafSet.copy(); for (int i = -cjm.ls.ccwSize(); i <= cjm.ls.cwSize(); i++) { NodeHandle nh = cjm.ls.get(i); if (!failed.containsKey(nh) && nh.getLiveness() < NodeHandle.LIVENESS_DEAD) { lprime.put(nh); } } HashSet addThese = new HashSet(); for (int i = -lprime.ccwSize(); i <= lprime.cwSize(); i++) { if (i != 0) { NodeHandle nh = lprime.get(i); if (!leafSet.member(nh)) { addThese.add(nh); } } } Iterator it2 = addThese.iterator(); while (it2.hasNext()) { NodeHandle nh = (NodeHandle) it2.next(); // he's not a member, but he could be if (!failed.containsKey(nh) && nh.getLiveness() < NodeHandle.LIVENESS_DEAD) { addToLeafSet(nh); // probe sendTheMessage(nh, false); } } if (cjm.request) { // send reply sendTheMessage(j, true); } //else { // done_probing: // mark that he knows about us gotResponse.put(j, new Object()); doneProbing();// } } else if (msg instanceof RequestFromEveryoneMsg) { requestFromEveryoneWeHaventHeardFrom(); } else { super.receiveMessage(msg); } } /** * Similar to the MSR-TR */ void doneProbing() { if (leafSet.isComplete()) { // here is where we see if we can go active HashSet toHearFrom = new HashSet(); HashSet seen = new HashSet(); String toHearFromStr = ""; int numToHearFrom = 0; for (int i = -leafSet.ccwSize(); i <= leafSet.cwSize(); i++) { if (i != 0) { NodeHandle nh = leafSet.get(i); if (!seen.contains(nh) && (gotResponse.get(nh) == null)) { numToHearFrom++; toHearFrom.add(nh); toHearFromStr += nh + ":" + nh.getLiveness() + ","; } seen.add(nh); } } if (numToHearFrom == 0) { if (!thePastryNode.isReady()) { // active_i = true; thePastryNode.setReady(); retryTask.cancel(); tryingToGoReady = false; } // failed_i = {} // gotResponse.clear(); //failed.clear(); // done by cleanup task as of March 6th, 2006// Iterator it2 = observing.iterator();// while(it2.hasNext()) {// NodeHandle nh = (NodeHandle)it2.next();// nh.deleteObserver(this);// it2.remove();// } } else { if (logger.level <= Logger.FINE) { logger.log("CJP: still need to hear from:" + toHearFromStr); } } } else { if (logger.level <= Logger.FINE) { logger.log("CJP: LS is not complete: " + leafSet); } // sendTheMessage to leftmost and rightmost? NodeHandle left = null; NodeHandle right = null; synchronized (leafSet) { int index = -leafSet.ccwSize(); if (index != -leafSet.maxSize() / 2) { left = leafSet.get(index); } index = leafSet.cwSize(); if (index != leafSet.maxSize() / 2) { right = leafSet.get(index); } } if (left != null) { sendTheMessage(left, true); } if (right != null) { sendTheMessage(right, true); } } } /** * Sends a consistent join protocol message. * * @param nh * @param reply if the reason we are sending this message is just as a * response */ public void sendTheMessage(NodeHandle nh, boolean reply) { if (!reply) { if (!tryingToGoReady) { return; }// logException(Logger.FINEST, "StackTrace", new Exception("Stack Trace")); } if (logger.level <= Logger.FINE) { logger.log("CJP: sendTheMessage(" + nh + "," + reply + ")"); } // todo, may want to repeat this message as long as the node is alive if we // are worried about rare message drops// if (thePastryNode.isReady()) { //failed.clear(); // done by cleanup task as of March 6th, 2006// } HashSet toSend; if (failed.size() < maxFailedEntries) { toSend = new HashSet(failed.keySet()); } else { ArrayList l = new ArrayList(failed.values()); Collections.sort(l); toSend = new HashSet(); for (int i = 0; i < maxFailedEntries; i++) { FailedTime tf = (FailedTime) l.get(i); toSend.add(tf.handle); } } nh.receiveMessage(new ConsistentJoinMsg(leafSet, toSend, !reply)); } /** * DESCRIBE THE METHOD * * @param set DESCRIBE THE PARAMETER * @param handle DESCRIBE THE PARAMETER * @param added DESCRIBE THE PARAMETER */ public void nodeSetUpdate(NodeSetEventSource set, NodeHandle handle, boolean added) { if (thePastryNode.isReady()) { return; } if (added) { if (gotResponse.get(handle) == null) { sendTheMessage(handle, false); } } else { doneProbing(); } return; } /** * Can be PastryNode updates, leafset updates, or nodehandle updates. * * @param arg0 DESCRIBE THE PARAMETER * @param arg DESCRIBE THE PARAMETER */ public void update(Observable arg0, Object arg) { if (logger.level <= Logger.FINEST) { logger.log("CJP: update(" + arg0 + "," + arg + ")" + arg.getClass().getName()); } // we went offline for whatever reason. Now we need to try to come back online. if (arg0 == thePastryNode) { if (((Boolean) arg).booleanValue() == false) { setReady(); } } if (arg0 instanceof NodeHandle) {// if (thePastryNode.isReady()) {// observing.remove(arg0);// arg0.deleteObserver(this);// return;// } // assume it's a NodeHandle, cause we // want to throw the exception if it is something we don't recognize NodeHandle nh = (NodeHandle) arg0; if (((Integer) arg) == NodeHandle.DECLARED_DEAD) { if (logger.level <= Logger.FINE) { logger.log("CJP:" + arg0 + " declared dead"); } if (!failed.containsKey(nh)) { failed.put(nh, new FailedTime(nh, thePastryNode.getEnvironment().getTimeSource().currentTimeMillis())); } leafSet.remove(nh); doneProbing(); } if (((Integer) arg) == NodeHandle.DECLARED_LIVE) { failed.remove(nh); if (!thePastryNode.isReady()) { if (leafSet.test(nh)) { leafSet.put(nh); sendTheMessage(nh, false); } } } } } /** * Part of the LoopObserver interface. Used to detect if we may have been * found faulty by other nodes. * * @return the minimum loop time we are interested in being notified about. */ public int delayInterest() { return MAX_TIME_TO_BE_SCHEDULED; } /** * If it took longer than the time to detect faultiness, then other nodes may * believe we are faulty. So we best rejoin. * * @param loopTime the time it took to do a single selection loop. */ public void loopTime(int loopTime) { if (loopTime > delayInterest()) { otherNodesMaySuspectFaulty(); } } /** * DESCRIBE THE METHOD */ public void destroy() { if (logger.level <= Logger.FINE) { logger.log("CJP: destroy() called"); } thePastryNode.getEnvironment().getSelectorManager().removeLoopObserver(this); cleanupTask.cancel(); } /** * Used to trigger timer events. * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author Jeff Hoye */ class RequestFromEveryoneMsg extends Message { /** * Constructor for RequestFromEveryoneMsg. * * @param address DESCRIBE THE PARAMETER */ public RequestFromEveryoneMsg(int address) { super(address); } } // cleans up failed /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ static class FailedTime implements Comparable { long time; NodeHandle handle; /** * Constructor for FailedTime. * * @param handle DESCRIBE THE PARAMETER * @param time DESCRIBE THE PARAMETER */ public FailedTime(NodeHandle handle, long time) { this.time = time; this.handle = handle; } /** * DESCRIBE THE METHOD * * @param arg0 DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE */ public int compareTo(Object arg0) { FailedTime ft = (FailedTime) arg0; // note this is backwards, because we want them sorted in reverse order return (int) (ft.time - this.time); } /** * DESCRIBE THE METHOD * * @return DESCRIBE THE RETURN VALUE */ public String toString() { return "FT:" + handle + " " + time; } } /** * DESCRIBE THE CLASS * * @version $Id: pretty.settings 2305 2005-03-11 20:22:33Z jeffh $ * @author jeffh */ static class CJPDeserializer extends SJPDeserializer { /** * Constructor for CJPDeserializer. * * @param pn DESCRIBE THE PARAMETER */ public CJPDeserializer(PastryNode pn) { super(pn); } /** * DESCRIBE THE METHOD * * @param buf DESCRIBE THE PARAMETER * @param type DESCRIBE THE PARAMETER * @param priority DESCRIBE THE PARAMETER * @param sender DESCRIBE THE PARAMETER * @return DESCRIBE THE RETURN VALUE * @exception IOException DESCRIBE THE EXCEPTION */ public Message deserialize(InputBuffer buf, short type, byte priority, NodeHandle sender) throws IOException { switch (type) { case ConsistentJoinMsg.TYPE: return new ConsistentJoinMsg(buf, pn, (NodeHandle) sender); } return super.deserialize(buf, type, priority, sender); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -