total_token.java

来自「JGRoups源码」· Java 代码 · 共 1,160 行 · 第 1/3 页

JAVA
1,160
字号
//$Id: TOTAL_TOKEN.java,v 1.14 2006/04/28 15:25:00 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.protocols.pbcast.Digest;import org.jgroups.protocols.ring.RingNodeFlowControl;import org.jgroups.protocols.ring.RingToken;import org.jgroups.protocols.ring.TokenLostException;import org.jgroups.protocols.ring.UdpRingNode;import org.jgroups.stack.IpAddress;import org.jgroups.stack.RpcProtocol;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.*;/** * <p> * Total order implementation based on <a href="http://citeseer.nj.nec.com/amir95totem.html"> * The Totem Single-Ring Ordering and Membership Protocol</a>. * <p> * * <p> * However, this is an adaption of algorithm mentioned in the research paper above since we reuse * our own membership protocol and failure detectors. Somewhat different flow control mechanism is * also implemented. * * <p> * Token passing is done through reliable point-to-point udp channels provided by UNICAST layer. * Process groups nodes members are organized in a logical ring. * </p> * * <p> * Total token layer doesn't need NAKACK nor STABLE layer beneath it since it implements it's own * retransmission and tracks stability of the messages from the information piggybacked on the * token itself. * </p> * * <p> * For the typical protocol stack configuration used, see org.jgroups.demos.TotalTokenDemo and * total-token.xml configuration file provided with this distribution of JGroups. * </p> * * * *@author Vladimir Blagojevic vladimir@cs.yorku.ca *@version $Revision: 1.14 $ * *@see org.jgroups.protocols.ring.RingNodeFlowControl *@see org.jgroups.protocols.ring.RingNode *@see org.jgroups.protocols.ring.TcpRingNode *@see org.jgroups.protocols.ring.UdpRingNode * **/public class TOTAL_TOKEN extends RpcProtocol{    private static final Object[] NULL_OBJ=new Object[]{};    private static final Class[] NULL_TYPES=new Class[]{};    public static class TotalTokenHeader extends Header    {       /**        * sequence number of the message        */       private long seq;       /**        *used for externalization        */       public TotalTokenHeader()       {       }       public TotalTokenHeader(long seq)       {          this.seq = seq;       }       public TotalTokenHeader(Long seq)       {          this.seq = seq.longValue();       }       /**        *Returns sequence number of the message that owns this header        *@return sequence number        */       public long getSeq()       {          return seq;       }       /**        *Returns size of the header        * @return headersize in bytes        */       public long size()       {          //calculated using Util.SizeOf(Object)          return 121;       }       /**        * Manual serialization        *        *        */       public void writeExternal(ObjectOutput out) throws IOException       {          out.writeLong(seq);       }       /**        * Manual deserialization        *        */       public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException       {          seq = in.readLong();       }       public String toString()       {          return "[TotalTokenHeader=" + seq + ']';       }    }   public static class RingTokenHeader extends Header   {      public RingTokenHeader()      {      }      public void writeExternal(ObjectOutput out) throws IOException      {      }      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException      {      }      public long size()      {         //calculated using Util.SizeOf(Object)         return 110;      }   }   private static final int OPERATIONAL_STATE = 0;   private static final int RECOVERY_STATE = 1;   UdpRingNode node;   RingNodeFlowControl flowControl;   Address localAddress;   private final TokenTransmitter tokenRetransmitter=new TokenTransmitter();   final List newMessagesQueue = Collections.synchronizedList(new ArrayList());   SortedSet liveMembersInRecovery,suspects;   final Object mutex = new Object();   TreeMap receivedMessagesQueue;   long myAru = 0;   final Object threadCoordinationMutex = new Object();   final boolean tokenInStack = false;   final boolean threadDeliveringMessage = false;   boolean tokenSeen = false;   volatile boolean isRecoveryLeader = false;   volatile int state;   volatile int sleepTime = 10;   long highestSeenSeq = 0;   long lastRoundTokensAru = 0;   int lastRoundTransmitCount,lastRoundRebroadcastCount = 0;   int blockSendingBacklogThreshold = Integer.MAX_VALUE;   int unblockSendingBacklogThreshold = Integer.MIN_VALUE;   boolean tokenCirculating = false;   boolean senderBlocked = false;    final Object block_sending=new Object();   public static final String prot_name = "TOTAL_TOKEN";   public String getName()   {      return prot_name;   }   private String getState()   {      if (state == OPERATIONAL_STATE)      {         return "OPERATIONAL";      }      else         return "RECOVERY";   }    public void start() throws Exception {        super.start();        receivedMessagesQueue = new TreeMap();        tokenRetransmitter.start();    }   /**    * Overrides @org.jgroups.stack.MessageProtocol#stop().    */   public void stop()   {       super.stop();       tokenRetransmitter.shutDown();   }   /**    * Setup the Protocol instance acording to the configuration string    *    */   public boolean setProperties(Properties props)   {      String str;       super.setProperties(props);      str = props.getProperty("block_sending");      if (str != null)      {         blockSendingBacklogThreshold = Integer.parseInt(str);         props.remove("block_sending");      }      str = props.getProperty("unblock_sending");      if (str != null)      {         unblockSendingBacklogThreshold = Integer.parseInt(str);         props.remove("unblock_sending");      }      if (props.size() > 0)      {         log.error("UDP.setProperties(): the following properties are not recognized: " + props);         return false;      }      return true;   }   public IpAddress getTokenReceiverAddress()   {      return node != null? node.getTokenReceiverAddress() : null;   }   public Vector providedUpServices()   {      Vector retval = new Vector();      retval.addElement(new Integer(Event.GET_DIGEST));      retval.addElement(new Integer(Event.GET_DIGEST_STATE));      retval.addElement(new Integer(Event.SET_DIGEST));      return retval;   }   public boolean handleUpEvent(Event evt)   {      Message msg;      Header h;      switch (evt.getType())      {         case Event.SET_LOCAL_ADDRESS:            localAddress = (Address) evt.getArg();            node = new UdpRingNode(this, localAddress);            flowControl = new RingNodeFlowControl();            break;         case Event.SUSPECT:            Address suspect = (Address) evt.getArg();            onSuspectMessage(suspect);            break;         case Event.MSG:            msg = (Message) evt.getArg();            h = msg.getHeader(getName());            if (h instanceof TotalTokenHeader)            {               messageArrived(msg);               return false;            }            else if (h instanceof RingTokenHeader)            {                if(node != null) {                    Object tmp=msg.getObject();                    node.tokenArrived(tmp);                }               return false;            }      }      return true;   }   public boolean handleDownEvent(Event evt)   {      switch (evt.getType())      {         case Event.GET_DIGEST:         case Event.GET_DIGEST_STATE:            Digest d = new Digest(members.size());            Address sender = null;            //all members have same digest :)            for (int j = 0; j < members.size(); j++)            {               sender = (Address) members.elementAt(j);               d.add(sender, highestSeenSeq, highestSeenSeq);            }            passUp(new Event(Event.GET_DIGEST_OK, d));            return false;         case Event.SET_DIGEST:            Digest receivedDigest = (Digest) evt.getArg();          // changed by bela July 12 2005, not sure if this is correct, don't know what the original author          // intended to do here            // myAru = receivedDigest.highSeqnoAt(0);            myAru = receivedDigest.highSeqnoAt(localAddress);            return false;         case Event.VIEW_CHANGE:            onViewChange();            return true;            /*         case Event.CLEANUP:            // do not pass cleanup event            //further down. This is a hack to enable            // sucessfull leave from group when using pbcast.GMS.            // It just buys us 5 seconds to imminent STOP            // event following CLEANUP. We hope that the moment            // this node disconnect up until new view is installed            // at other members is less than 5 seconds.            //The proper way would be to:            //trap DISCONNECT event on the way down, do not pass it further.            //wait for the new view to be installed (effectively excluding this node out of            //ring) , wait for one token roundtrip time, and then send that trapped            //DISCONNECT event down furhter to generate DISCONNECT_OK on the way up.            // CLEANUP and STOP are generated after DISCONNECT.            //However, as the things stand right now pbcast.GMS stops working immediately            //when it receives DISCONNECT thus the new view is never generated in node that is            //leaving the group.            //pbcsat.GMS should still generate new view and stop working when            //it receives STOP event.            //In timeline DISCONNECT < CLEANUP < STOP            return false;            */         case Event.MSG:            Message msg = (Message) evt.getArg();            //handle only multicasts            if (msg == null) return false;            if (msg.getDest() == null || msg.getDest().isMulticastAddress())            {               newMessagesQueue.add(msg);               return false;            }      }      return true;   }   private void onViewChange()   {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?