total_token.java
来自「JGRoups源码」· Java 代码 · 共 1,160 行 · 第 1/3 页
JAVA
1,160 行
isRecoveryLeader = false; if (suspects != null) { suspects.clear(); suspects = null; } if (liveMembersInRecovery != null) { liveMembersInRecovery.clear(); liveMembersInRecovery = null; } } private void onSuspectMessage(Address suspect) { state = RECOVERY_STATE; if (suspects == null || suspects.size() == 0) { suspects = Collections.synchronizedSortedSet(new TreeSet()); liveMembersInRecovery = Collections.synchronizedSortedSet(new TreeSet(members)); } suspects.add(suspect); liveMembersInRecovery.removeAll(suspects); isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery); } /** * Given a set of surviving members in the transitioanl view * returns true if this stack is elected to be recovery leader. * */ private boolean isRecoveryLeader(SortedSet liveMembers) { boolean recoveryLeader = false; if (liveMembers.size() > 0) { recoveryLeader = localAddress.equals(liveMembers.first()); } if(log.isInfoEnabled()) log.info("live memebers are " + liveMembers); if(log.isInfoEnabled()) log.info("I am recovery leader?" + recoveryLeader); return recoveryLeader; } public long getAllReceivedUpTo() { return myAru; } public void installTransitionalView(Vector members) { if(node != null) node.reconfigure(members); } /** * Total Token recovery protocol (TTRP) * * * * Upon transition to recovery state, coordinator sends multiple reliable * unicasts message requesting each ring member to report it's allReceivedUpto * value. When all replies are received, a response list of allReceivedUpto * values is sorted and transformed into a set while dropping the lowest * allReceivedUpto value. For example , received response list [4,4,5,6,7,7,8] * is transformed into [5,6,7,8] thus not including the lowest value 4. * * The objective of the recovery protocol is to have each member receive all * messages up to maximum sequence value M from the response list, thus * satisfying virtual synchrony properties. * * Note that if all members report the same allReceivedUpto values, then * virtual synchrony is satisfied (since all surviving members have seen * the same set of messages) and we can immediately inject operational * token which will enable installment of the new view. * * Otherwise, a constructed set S of all allReceivedUpto values represent sequence ids * of messages that have to be received by all mebers prior to installing new * view thus satisfying virtual synchrony properties. * * A transitional view, visible only to TOTAL_TOKEN layer is then installed * on the ring by a coordinator. Again a multiple unicast are used. A * transitional view is deduced from current view by excluding suspected members. * Coordinator creates a recovery token by appending the set S of sequence ids to * token retransmission request list. Recovery token is then inserted into * transitional ring view. * * Upon reception of recovery token, ring members are not allowed to transmit * any additional new messages but only to retransmit messages from the * specified token retransmission request list. * * When all member detect that they have received all messages upto sequence * value M , the next member that first receives token converts it to operatioanl * token and normal operational state is restored in all nodes. * * If token is lost during recovery stage, recovery protocol is restarted. * * */ private void recover() { if (isRecoveryLeader && state == RECOVERY_STATE) { if(log.isInfoEnabled()) log.info("I am starting recovery now"); Vector m = new Vector(liveMembersInRecovery); RspList list=callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0); //RspList list = callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0); Vector myAllReceivedUpTos = list.getResults(); callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0); //callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0); Vector myAllReceivedUpTosConfirm = list.getResults(); while (!myAllReceivedUpTos.equals(myAllReceivedUpTosConfirm)) { myAllReceivedUpTos = myAllReceivedUpTosConfirm; callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0); // callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0); myAllReceivedUpTosConfirm = list.getResults(); if(log.isInfoEnabled()) log.info("myAllReceivedUpto values are" + myAllReceivedUpTos); if(log.isInfoEnabled()) log.info("myAllReceivedUpto confirm values are " + myAllReceivedUpTosConfirm); } if(log.isInfoEnabled()) log.info("myAllReceivedUpto stabilized values are" + myAllReceivedUpTos); if(log.isInfoEnabled()) log.info("installing transitional view to repair the ring..."); callRemoteMethods(m, "installTransitionalView", new Object[]{m}, new String[]{Vector.class.getName()}, GroupRequest.GET_ALL, 0); //callRemoteMethods(m, "installTransitionalView", m, GroupRequest.GET_ALL, 0); Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos); RingToken injectToken = null; if (xmits.size() > 1) { if(log.isInfoEnabled()) log.info("VS not satisfied, injecting recovery token..."); long aru = ((Long) xmits.firstElement()).longValue(); long highest = ((Long) xmits.lastElement()).longValue(); injectToken = new RingToken(RingToken.RECOVERY); injectToken.setHighestSequence(highest); injectToken.setAllReceivedUpto(aru); Collection rtr = injectToken.getRetransmissionRequests(); rtr.addAll(xmits); } else { if(log.isInfoEnabled()) log.info("VS satisfied, injecting operational token..."); injectToken = new RingToken(); long sequence = ((Long) xmits.firstElement()).longValue(); injectToken.setHighestSequence(sequence); injectToken.setAllReceivedUpto(sequence); } if(node != null) node.passToken(injectToken); tokenRetransmitter.resetTimeout(); } } /** * Prepares a retransmissions list for recovery protocol * given a collection of all myReceivedUpTo values as * reported by polled surviving members. * * * */ private Vector prepareRecoveryRetransmissionList(Vector sequences) { Collections.sort(sequences); Long first = (Long) sequences.firstElement(); Long last = (Long) sequences.lastElement(); Vector retransmissions = new Vector(); if (first.equals(last)) { retransmissions.add(new Long(first.longValue())); } else { for (long j = first.longValue() + 1; j <= last.longValue(); j++) { retransmissions.add(new Long(j)); } } return retransmissions; } protected void updateView(View newMembers) { super.updateView(newMembers); Vector newViewMembers = newMembers.getMembers(); flowControl.viewChanged(newViewMembers.size()); if(node != null) node.reconfigure(newViewMembers); boolean isCoordinator = localAddress.equals(newViewMembers.firstElement()); int memberSize = newViewMembers.size(); if (memberSize == 1 && isCoordinator && !tokenCirculating) { //create token for the first time , lets roll tokenCirculating = true; RingToken token = new RingToken(); if(node != null) node.passToken(token); tokenRetransmitter.resetTimeout(); } sleepTime = (20/memberSize); } /** * TOTAL_TOKEN's up-handler thread invokes this method after multicast * message originating from some other TOTAL_TOKEN stack layer arrives at * this stack layer. * * Up-handler thread coordinates it's access to a shared variables * with TokenTransmitter thread. * * See tokenReceived() for details. * */ private void messageArrived(Message m) { TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName()); long seq = h.getSeq(); synchronized (mutex) { if ((myAru + 1) <= seq) { if (seq > highestSeenSeq) { highestSeenSeq = seq; } receivedMessagesQueue.put(new Long(seq), m); if ((myAru + 1) == seq) { myAru = seq; passUp(new Event(Event.MSG, m)); } if (isReceiveQueueHolePlugged()) { myAru = deliverMissingMessages(); } } } } /** * Returns true if there is a hole in receive queue and at * least one messages with sequence id consecutive to myAru. * * */ private boolean isReceiveQueueHolePlugged() { return ((myAru < highestSeenSeq) && receivedMessagesQueue.containsKey(new Long(myAru + 1))); } /** * Delivers as much as possible messages from receive * message queue as long as they are consecutive with * respect to their sequence ids. * */ private long deliverMissingMessages() { Map.Entry entry = null; boolean inOrder = true; long lastDelivered = myAru; Set deliverySet = receivedMessagesQueue.tailMap(new Long(myAru + 1)).entrySet(); if(log.isInfoEnabled()) log.info("hole getting plugged, prior muAru " + myAru); for (Iterator iterator = deliverySet.iterator();inOrder && iterator.hasNext();) { entry = (Map.Entry) iterator.next(); long nextInQueue = ((Long) entry.getKey()).longValue(); if (lastDelivered + 1 == nextInQueue) { Message m = (Message) entry.getValue(); passUp(new Event(Event.MSG, m)); lastDelivered++; } else { inOrder = false; } } if(log.isInfoEnabled()) log.info("hole getting plugged, post muAru " + lastDelivered); return lastDelivered; } /** * Checks if the receivedMessageQueue has any missing sequence * numbers in it, and if it does it finds holes in sequences from * this stack's receivedMessageQueue and adds them to token retransmission * list, thus informing other group members about messages missing * from this stack. * * */ private void updateTokenRtR(RingToken token) { long holeLowerBound = 0; long holeUpperBound = 0; Long missingSequence = null; Collection retransmissionList = null; //any holes? if (myAru < token.getHighestSequence()) { retransmissionList = token.getRetransmissionRequests(); Set received = receivedMessagesQueue.tailMap(new Long(myAru + 1)).keySet(); Iterator nonMissing = received.iterator(); holeLowerBound = myAru; if(log.isDebugEnabled()) log.debug("retransmission request prior" + retransmissionList); while (nonMissing.hasNext()) { Long seq = (Long) nonMissing.next(); holeUpperBound = seq.longValue(); while (holeLowerBound < holeUpperBound) { missingSequence = new Long(++holeLowerBound); retransmissionList.add(missingSequence); } holeLowerBound = holeUpperBound; } holeUpperBound = token.getHighestSequence(); while (holeLowerBound < holeUpperBound) { missingSequence = new Long(++holeLowerBound); retransmissionList.add(missingSequence); } if(log.isDebugEnabled()) log.debug("retransmission request after" + retransmissionList); } } /** * Sends messages in this stacks's outgoing queue and * saves a copy of each outgoing message in case they got lost. * If messages get lost it is thus guaranteed that each stack * that sent any message has a copy of it ready for retransmitting. * * Each sent message is stamped by monotonically increasing * sequence number starting from the highest sequence "seen" * on the ring. * * Returns number of messages actually sent. The number of * sent messages is bounded above by the flow control * algorithm (allowedCount) and bounded below by the number * of pending messages in newMessagesQueue. */ private int broadcastMessages(int allowedCount, RingToken token) { List sendList = null; synchronized (newMessagesQueue)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?