total_token.java
来自「JGRoups源码」· Java 代码 · 共 1,160 行 · 第 1/3 页
JAVA
1,160 行
//$Id: TOTAL_TOKEN.java,v 1.14 2006/04/28 15:25:00 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.protocols.pbcast.Digest;import org.jgroups.protocols.ring.RingNodeFlowControl;import org.jgroups.protocols.ring.RingToken;import org.jgroups.protocols.ring.TokenLostException;import org.jgroups.protocols.ring.UdpRingNode;import org.jgroups.stack.IpAddress;import org.jgroups.stack.RpcProtocol;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.util.*;/** * <p> * Total order implementation based on <a href="http://citeseer.nj.nec.com/amir95totem.html"> * The Totem Single-Ring Ordering and Membership Protocol</a>. * <p> * * <p> * However, this is an adaption of algorithm mentioned in the research paper above since we reuse * our own membership protocol and failure detectors. Somewhat different flow control mechanism is * also implemented. * * <p> * Token passing is done through reliable point-to-point udp channels provided by UNICAST layer. * Process groups nodes members are organized in a logical ring. * </p> * * <p> * Total token layer doesn't need NAKACK nor STABLE layer beneath it since it implements it's own * retransmission and tracks stability of the messages from the information piggybacked on the * token itself. * </p> * * <p> * For the typical protocol stack configuration used, see org.jgroups.demos.TotalTokenDemo and * total-token.xml configuration file provided with this distribution of JGroups. * </p> * * * *@author Vladimir Blagojevic vladimir@cs.yorku.ca *@version $Revision: 1.14 $ * *@see org.jgroups.protocols.ring.RingNodeFlowControl *@see org.jgroups.protocols.ring.RingNode *@see org.jgroups.protocols.ring.TcpRingNode *@see org.jgroups.protocols.ring.UdpRingNode * **/public class TOTAL_TOKEN extends RpcProtocol{ private static final Object[] NULL_OBJ=new Object[]{}; private static final Class[] NULL_TYPES=new Class[]{}; public static class TotalTokenHeader extends Header { /** * sequence number of the message */ private long seq; /** *used for externalization */ public TotalTokenHeader() { } public TotalTokenHeader(long seq) { this.seq = seq; } public TotalTokenHeader(Long seq) { this.seq = seq.longValue(); } /** *Returns sequence number of the message that owns this header *@return sequence number */ public long getSeq() { return seq; } /** *Returns size of the header * @return headersize in bytes */ public long size() { //calculated using Util.SizeOf(Object) return 121; } /** * Manual serialization * * */ public void writeExternal(ObjectOutput out) throws IOException { out.writeLong(seq); } /** * Manual deserialization * */ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { seq = in.readLong(); } public String toString() { return "[TotalTokenHeader=" + seq + ']'; } } public static class RingTokenHeader extends Header { public RingTokenHeader() { } public void writeExternal(ObjectOutput out) throws IOException { } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { } public long size() { //calculated using Util.SizeOf(Object) return 110; } } private static final int OPERATIONAL_STATE = 0; private static final int RECOVERY_STATE = 1; UdpRingNode node; RingNodeFlowControl flowControl; Address localAddress; private final TokenTransmitter tokenRetransmitter=new TokenTransmitter(); final List newMessagesQueue = Collections.synchronizedList(new ArrayList()); SortedSet liveMembersInRecovery,suspects; final Object mutex = new Object(); TreeMap receivedMessagesQueue; long myAru = 0; final Object threadCoordinationMutex = new Object(); final boolean tokenInStack = false; final boolean threadDeliveringMessage = false; boolean tokenSeen = false; volatile boolean isRecoveryLeader = false; volatile int state; volatile int sleepTime = 10; long highestSeenSeq = 0; long lastRoundTokensAru = 0; int lastRoundTransmitCount,lastRoundRebroadcastCount = 0; int blockSendingBacklogThreshold = Integer.MAX_VALUE; int unblockSendingBacklogThreshold = Integer.MIN_VALUE; boolean tokenCirculating = false; boolean senderBlocked = false; final Object block_sending=new Object(); public static final String prot_name = "TOTAL_TOKEN"; public String getName() { return prot_name; } private String getState() { if (state == OPERATIONAL_STATE) { return "OPERATIONAL"; } else return "RECOVERY"; } public void start() throws Exception { super.start(); receivedMessagesQueue = new TreeMap(); tokenRetransmitter.start(); } /** * Overrides @org.jgroups.stack.MessageProtocol#stop(). */ public void stop() { super.stop(); tokenRetransmitter.shutDown(); } /** * Setup the Protocol instance acording to the configuration string * */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str = props.getProperty("block_sending"); if (str != null) { blockSendingBacklogThreshold = Integer.parseInt(str); props.remove("block_sending"); } str = props.getProperty("unblock_sending"); if (str != null) { unblockSendingBacklogThreshold = Integer.parseInt(str); props.remove("unblock_sending"); } if (props.size() > 0) { log.error("UDP.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public IpAddress getTokenReceiverAddress() { return node != null? node.getTokenReceiverAddress() : null; } public Vector providedUpServices() { Vector retval = new Vector(); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.GET_DIGEST_STATE)); retval.addElement(new Integer(Event.SET_DIGEST)); return retval; } public boolean handleUpEvent(Event evt) { Message msg; Header h; switch (evt.getType()) { case Event.SET_LOCAL_ADDRESS: localAddress = (Address) evt.getArg(); node = new UdpRingNode(this, localAddress); flowControl = new RingNodeFlowControl(); break; case Event.SUSPECT: Address suspect = (Address) evt.getArg(); onSuspectMessage(suspect); break; case Event.MSG: msg = (Message) evt.getArg(); h = msg.getHeader(getName()); if (h instanceof TotalTokenHeader) { messageArrived(msg); return false; } else if (h instanceof RingTokenHeader) { if(node != null) { Object tmp=msg.getObject(); node.tokenArrived(tmp); } return false; } } return true; } public boolean handleDownEvent(Event evt) { switch (evt.getType()) { case Event.GET_DIGEST: case Event.GET_DIGEST_STATE: Digest d = new Digest(members.size()); Address sender = null; //all members have same digest :) for (int j = 0; j < members.size(); j++) { sender = (Address) members.elementAt(j); d.add(sender, highestSeenSeq, highestSeenSeq); } passUp(new Event(Event.GET_DIGEST_OK, d)); return false; case Event.SET_DIGEST: Digest receivedDigest = (Digest) evt.getArg(); // changed by bela July 12 2005, not sure if this is correct, don't know what the original author // intended to do here // myAru = receivedDigest.highSeqnoAt(0); myAru = receivedDigest.highSeqnoAt(localAddress); return false; case Event.VIEW_CHANGE: onViewChange(); return true; /* case Event.CLEANUP: // do not pass cleanup event //further down. This is a hack to enable // sucessfull leave from group when using pbcast.GMS. // It just buys us 5 seconds to imminent STOP // event following CLEANUP. We hope that the moment // this node disconnect up until new view is installed // at other members is less than 5 seconds. //The proper way would be to: //trap DISCONNECT event on the way down, do not pass it further. //wait for the new view to be installed (effectively excluding this node out of //ring) , wait for one token roundtrip time, and then send that trapped //DISCONNECT event down furhter to generate DISCONNECT_OK on the way up. // CLEANUP and STOP are generated after DISCONNECT. //However, as the things stand right now pbcast.GMS stops working immediately //when it receives DISCONNECT thus the new view is never generated in node that is //leaving the group. //pbcsat.GMS should still generate new view and stop working when //it receives STOP event. //In timeline DISCONNECT < CLEANUP < STOP return false; */ case Event.MSG: Message msg = (Message) evt.getArg(); //handle only multicasts if (msg == null) return false; if (msg.getDest() == null || msg.getDest().isMulticastAddress()) { newMessagesQueue.add(msg); return false; } } return true; } private void onViewChange() {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?