total_token.java

来自「JGRoups源码」· Java 代码 · 共 1,160 行 · 第 1/3 页

JAVA
1,160
字号
      isRecoveryLeader = false;      if (suspects != null)      {         suspects.clear();         suspects = null;      }      if (liveMembersInRecovery != null)      {         liveMembersInRecovery.clear();         liveMembersInRecovery = null;      }   }   private void onSuspectMessage(Address suspect)   {      state = RECOVERY_STATE;      if (suspects == null || suspects.size() == 0)      {         suspects = Collections.synchronizedSortedSet(new TreeSet());         liveMembersInRecovery = Collections.synchronizedSortedSet(new TreeSet(members));      }      suspects.add(suspect);      liveMembersInRecovery.removeAll(suspects);      isRecoveryLeader = isRecoveryLeader(liveMembersInRecovery);   }   /**    * Given a set of surviving members in the transitioanl view    * returns true if this stack is elected to be recovery leader.    *    */   private boolean isRecoveryLeader(SortedSet liveMembers)   {      boolean recoveryLeader = false;      if (liveMembers.size() > 0)      {         recoveryLeader = localAddress.equals(liveMembers.first());      }       if(log.isInfoEnabled()) log.info("live memebers are " + liveMembers);       if(log.isInfoEnabled()) log.info("I am recovery leader?" + recoveryLeader);       return recoveryLeader;   }   public long getAllReceivedUpTo()   {      return myAru;   }   public void installTransitionalView(Vector members)   {       if(node != null)           node.reconfigure(members);   }   /**    *  Total Token recovery protocol (TTRP)    *    *    *    *  Upon transition to recovery state, coordinator sends multiple reliable    *  unicasts message requesting each ring member to report it's allReceivedUpto    *  value. When all replies are received, a response list of allReceivedUpto    *  values is sorted and transformed into a set while dropping the lowest    *  allReceivedUpto value. For example , received response list  [4,4,5,6,7,7,8]    *  is transformed into [5,6,7,8] thus not including the lowest value 4.    *    *  The objective of the recovery protocol is to have each member receive all    *  messages up to maximum sequence value M from the response list, thus    *  satisfying virtual synchrony properties.    *    *  Note that if all members report the same allReceivedUpto values, then    *  virtual synchrony is satisfied (since all surviving members have seen    *  the same set of messages) and we can immediately inject operational    *  token which will enable installment of the new view.    *    *  Otherwise, a constructed set S of all allReceivedUpto values represent sequence ids    *  of messages that have to be received by all mebers prior to installing new    *  view thus satisfying virtual synchrony properties.    *    *  A transitional view, visible only to TOTAL_TOKEN layer is then installed    *  on the ring by a coordinator. Again a multiple unicast are used. A    *  transitional view is deduced from current view by excluding suspected members.    *  Coordinator creates a recovery token by appending the set S of sequence ids to    *  token retransmission request list. Recovery token is then inserted into    *  transitional ring view.    *    *  Upon reception of recovery token, ring members are not allowed to transmit    *  any additional new messages but only to retransmit messages from the    *  specified token retransmission request list.    *    *  When all member detect that they have received all messages upto sequence    *  value M , the next member that first receives token converts it to operatioanl    *  token and  normal operational state is restored in all nodes.    *    *  If token is lost during recovery stage, recovery protocol is restarted.    *    * */   private void recover()   {      if (isRecoveryLeader && state == RECOVERY_STATE)      {          if(log.isInfoEnabled()) log.info("I am starting recovery now");          Vector m = new Vector(liveMembersInRecovery);          RspList list=callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0);         //RspList list = callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);         Vector myAllReceivedUpTos = list.getResults();          callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0);         //callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);         Vector myAllReceivedUpTosConfirm = list.getResults();         while (!myAllReceivedUpTos.equals(myAllReceivedUpTosConfirm))         {            myAllReceivedUpTos = myAllReceivedUpTosConfirm;             callRemoteMethods(m, "getAllReceivedUpTo", NULL_OBJ, NULL_TYPES, GroupRequest.GET_ALL, 0);            // callRemoteMethods(m, "getAllReceivedUpTo", GroupRequest.GET_ALL, 0);            myAllReceivedUpTosConfirm = list.getResults();             if(log.isInfoEnabled()) log.info("myAllReceivedUpto values are" + myAllReceivedUpTos);             if(log.isInfoEnabled()) log.info("myAllReceivedUpto confirm values are " + myAllReceivedUpTosConfirm);         }          if(log.isInfoEnabled()) log.info("myAllReceivedUpto stabilized values are" + myAllReceivedUpTos);          if(log.isInfoEnabled()) log.info("installing transitional view to repair the ring...");          callRemoteMethods(m, "installTransitionalView", new Object[]{m}, new String[]{Vector.class.getName()},                            GroupRequest.GET_ALL, 0);         //callRemoteMethods(m, "installTransitionalView", m, GroupRequest.GET_ALL, 0);         Vector xmits = prepareRecoveryRetransmissionList(myAllReceivedUpTos);         RingToken injectToken = null;         if (xmits.size() > 1)         {             if(log.isInfoEnabled()) log.info("VS not satisfied, injecting recovery token...");             long aru = ((Long) xmits.firstElement()).longValue();            long highest = ((Long) xmits.lastElement()).longValue();            injectToken = new RingToken(RingToken.RECOVERY);            injectToken.setHighestSequence(highest);            injectToken.setAllReceivedUpto(aru);            Collection rtr = injectToken.getRetransmissionRequests();            rtr.addAll(xmits);         }         else         {             if(log.isInfoEnabled()) log.info("VS satisfied, injecting operational token...");             injectToken = new RingToken();            long sequence = ((Long) xmits.firstElement()).longValue();            injectToken.setHighestSequence(sequence);            injectToken.setAllReceivedUpto(sequence);         }          if(node != null)              node.passToken(injectToken);         tokenRetransmitter.resetTimeout();      }   }   /**    * Prepares a retransmissions list for recovery protocol    * given a collection of all myReceivedUpTo values as    * reported by polled surviving members.    *    *    *    */   private Vector prepareRecoveryRetransmissionList(Vector sequences)   {      Collections.sort(sequences);      Long first = (Long) sequences.firstElement();      Long last = (Long) sequences.lastElement();      Vector retransmissions = new Vector();      if (first.equals(last))      {         retransmissions.add(new Long(first.longValue()));      }      else      {         for (long j = first.longValue() + 1; j <= last.longValue(); j++)         {            retransmissions.add(new Long(j));         }      }      return retransmissions;   }   protected void updateView(View newMembers)   {      super.updateView(newMembers);      Vector newViewMembers = newMembers.getMembers();      flowControl.viewChanged(newViewMembers.size());       if(node != null)           node.reconfigure(newViewMembers);      boolean isCoordinator = localAddress.equals(newViewMembers.firstElement());      int memberSize = newViewMembers.size();      if (memberSize == 1 && isCoordinator && !tokenCirculating)      {         //create token for the first time , lets roll         tokenCirculating = true;         RingToken token = new RingToken();          if(node != null)              node.passToken(token);         tokenRetransmitter.resetTimeout();      }      sleepTime = (20/memberSize);   }   /**    * TOTAL_TOKEN's up-handler thread invokes this method after multicast    * message originating from some other TOTAL_TOKEN stack layer arrives at    * this stack layer.    *    * Up-handler thread coordinates it's access to a shared variables    * with TokenTransmitter thread.    *    * See tokenReceived() for details.    *    */   private void messageArrived(Message m)   {      TotalTokenHeader h = (TotalTokenHeader) m.getHeader(getName());      long seq = h.getSeq();      synchronized (mutex)      {         if ((myAru + 1) <= seq)         {            if (seq > highestSeenSeq)            {               highestSeenSeq = seq;            }            receivedMessagesQueue.put(new Long(seq), m);            if ((myAru + 1) == seq)            {               myAru = seq;               passUp(new Event(Event.MSG, m));            }            if (isReceiveQueueHolePlugged())            {               myAru = deliverMissingMessages();            }         }      }   }   /**    * Returns true if there is a hole in receive queue and at    * least one messages with sequence id consecutive to myAru.    *    *    */   private boolean isReceiveQueueHolePlugged()   {      return ((myAru < highestSeenSeq) && receivedMessagesQueue.containsKey(new Long(myAru + 1)));   }   /**    * Delivers as much as possible messages from receive    * message queue as long as they are consecutive with    * respect to their sequence ids.    *    */   private long deliverMissingMessages()   {      Map.Entry entry = null;      boolean inOrder = true;      long lastDelivered = myAru;      Set deliverySet = receivedMessagesQueue.tailMap(new Long(myAru + 1)).entrySet();       if(log.isInfoEnabled()) log.info("hole getting plugged, prior muAru " + myAru);       for (Iterator iterator = deliverySet.iterator();inOrder && iterator.hasNext();)       {          entry = (Map.Entry) iterator.next();          long nextInQueue = ((Long) entry.getKey()).longValue();          if (lastDelivered + 1 == nextInQueue)          {             Message m = (Message) entry.getValue();             passUp(new Event(Event.MSG, m));             lastDelivered++;          }          else          {             inOrder = false;          }       }       if(log.isInfoEnabled()) log.info("hole getting plugged, post muAru " + lastDelivered);       return lastDelivered;   }   /**    * Checks if the receivedMessageQueue has any missing sequence    * numbers in it, and if it does it finds holes in sequences from    * this stack's receivedMessageQueue and adds them to token retransmission    * list, thus informing other group members about messages missing    * from this stack.    *    *    */   private void updateTokenRtR(RingToken token)   {      long holeLowerBound = 0;      long holeUpperBound = 0;      Long missingSequence = null;      Collection retransmissionList = null;      //any holes?      if (myAru < token.getHighestSequence())      {         retransmissionList = token.getRetransmissionRequests();         Set received = receivedMessagesQueue.tailMap(new Long(myAru + 1)).keySet();         Iterator nonMissing = received.iterator();         holeLowerBound = myAru;            if(log.isDebugEnabled()) log.debug("retransmission request prior" + retransmissionList);         while (nonMissing.hasNext())         {            Long seq = (Long) nonMissing.next();            holeUpperBound = seq.longValue();            while (holeLowerBound < holeUpperBound)            {               missingSequence = new Long(++holeLowerBound);               retransmissionList.add(missingSequence);            }            holeLowerBound = holeUpperBound;         }         holeUpperBound = token.getHighestSequence();         while (holeLowerBound < holeUpperBound)         {            missingSequence = new Long(++holeLowerBound);            retransmissionList.add(missingSequence);         }            if(log.isDebugEnabled()) log.debug("retransmission request after" + retransmissionList);      }   }   /**    * Sends messages in this stacks's outgoing queue and    * saves a copy of each outgoing message in case they got lost.    * If messages get lost it is thus guaranteed that each stack    * that sent any message has a copy of it ready for retransmitting.    *    * Each sent message is stamped by monotonically increasing    * sequence number starting from the highest sequence "seen"    * on the ring.    *    * Returns number of messages actually sent.  The number of    * sent messages is bounded above by the flow control    * algorithm (allowedCount) and bounded below by the number    * of pending messages in newMessagesQueue.    */   private int broadcastMessages(int allowedCount, RingToken token)   {      List sendList = null;      synchronized (newMessagesQueue)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?