📄 total.java
字号:
// $Id: TOTAL.java,v 1.13 2006/01/03 14:11:29 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;import org.jgroups.*;import org.jgroups.stack.AckSenderWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Streamable;import java.io.*;import java.util.*;/** * Implements the total ordering layer using a message sequencer * <p/> * <p/> * The protocol guarantees that all bcast sent messages will be delivered in * the same order to all members. For that it uses a sequencer which assignes * monotonically increasing sequence ID to broadcasts. Then all group members * deliver the bcasts in ascending sequence ID order. * <p/> * <ul> * <li> * When a bcast message comes down to this layer, it is placed in the pending * down queue. A bcast request is sent to the sequencer.</li> * <li> * When the sequencer receives a bcast request, it creates a bcast reply * message and assigns to it a monotonically increasing seqID and sends it back * to the source of the bcast request.</li> * <li> * When a broadcast reply is received, the corresponding bcast message is * assigned the received seqID. Then it is broadcasted.</li> * <li> * Received bcasts are placed in the up queue. The queue is sorted according * to the seqID of the bcast. Any message at the head of the up queue with a * seqID equal to the next expected seqID is delivered to the layer above.</li> * <li> * Unicast messages coming from the layer below are forwarded above.</li> * <li> * Unicast messages coming from the layer above are forwarded below.</li> * </ul> * <p/> * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages * coming from above are discarded!</i> Either the application must stop * sending messages when a <code>BLOCK</code> event is received from the * channel or a QUEUE layer should be placed above this one. Received messages * are still delivered above though. * <p/> * bcast requests are retransmitted periodically until a bcast reply is * received. In case a BCAST_REP is on its way during a BCAST_REQ * retransmission, then the next BCAST_REP will be to a non-existing * BCAST_REQ. So, a null BCAST message is sent to fill the created gap in * the seqID of all members. * * @author i.georgiadis@doc.ic.ac.uk * @author Bela Ban */public class TOTAL extends Protocol { /** * The header processed by the TOTAL layer and intended for TOTAL * inter-stack communication */ public static class Header extends org.jgroups.Header implements Streamable { // Header types /** * Null value for the tag */ public static final int NULL_TYPE=-1; /** * Request to broadcast by the source */ public static final int REQ=0; /** * Reply to broadcast request. */ public static final int REP=1; /** * Unicast message */ public static final int UCAST=2; /** * Broadcast Message */ public static final int BCAST=3; /** * The header's type tag */ public int type; /** * The ID used by the message source to match replies from the * sequencer */ public long localSequenceID; /** * The ID imposing the total order of messages */ public long sequenceID; /** * used for externalization */ public Header() { } /** * Create a header for the TOTAL layer * * @param type the header's type * @param localSeqID the ID used by the sender of broadcasts to match * requests with replies from the sequencer * @param seqID the ID imposing the total order of messages * @throws IllegalArgumentException if the provided header type is * unknown */ public Header(int type, long localSeqID, long seqID) { super(); switch(type) { case REQ: case REP: case UCAST: case BCAST: this.type=type; break; default: this.type=NULL_TYPE; throw new IllegalArgumentException("type"); } this.localSequenceID=localSeqID; this.sequenceID=seqID; } /** * For debugging purposes */ public String toString() { StringBuffer buffer=new StringBuffer(); String typeName; buffer.append("[TOTAL.Header"); switch(type) { case REQ: typeName="REQ"; break; case REP: typeName="REP"; break; case UCAST: typeName="UCAST"; break; case BCAST: typeName="BCAST"; break; case NULL_TYPE: typeName="NULL_TYPE"; break; default: typeName=""; break; } buffer.append(", type=" + typeName); buffer.append(", " + "localID=" + localSequenceID); buffer.append(", " + "seqID=" + sequenceID); buffer.append(']'); return (buffer.toString()); } /** * Manual serialization */ public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(type); out.writeLong(localSequenceID); out.writeLong(sequenceID); } /** * Manual deserialization */ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { type=in.readInt(); localSequenceID=in.readLong(); sequenceID=in.readLong(); } public void writeTo(DataOutputStream out) throws IOException { out.writeInt(type); out.writeLong(localSequenceID); out.writeLong(sequenceID); } public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException { type=in.readInt(); localSequenceID=in.readLong(); sequenceID=in.readLong(); } public long size() { return Global.INT_SIZE + Global.LONG_SIZE *2; } } /** * The retransmission listener - It is called by the * <code>AckSenderWindow</code> when a retransmission should occur */ private class Command implements AckSenderWindow.RetransmitCommand { Command() { } public void retransmit(long seqNo, Message msg) { _retransmitBcastRequest(seqNo); } } /** * Protocol name */ private static final String PROT_NAME="TOTAL"; /** * Property names */ private static final String TRACE_PROP="trace"; /** * Average time between broadcast request retransmissions */ private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000}; /** * Null value for the IDs */ private static final long NULL_ID=-1; // Layer sending states /** * No group has been joined yet */ private static final int NULL_STATE=-1; /** * When set, all messages are sent/received */ private static final int RUN=0; /** * When set, only session-specific messages are sent/received, i.e. only * messages essential to the session's integrity */ private static final int FLUSH=1; /** * No message is sent to the layer below */ private static final int BLOCK=2; /** * The state lock allowing multiple reads or a single write */ private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock(); /** * Protocol layer message-sending state */ private int state=NULL_STATE; /** * The address of this stack */ private Address addr=null; /** * The address of the sequencer */ private Address sequencerAddr=null; /** * The sequencer's seq ID. The ID of the most recently broadcast reply * message */ private long sequencerSeqID=NULL_ID; /** * The local sequence ID, i.e. the ID sent with the last broadcast request * message. This is increased with every broadcast request sent to the * sequencer and it's used to match the requests with the sequencer's * replies */ private long localSeqID=NULL_ID; /** * The total order sequence ID. This is the ID of the most recently * delivered broadcast message. As the sequence IDs are increasing without * gaps, this is used to detect missing broadcast messages */ private long seqID=NULL_ID; /** * The list of unanswered broadcast requests to the sequencer. The entries * are stored in increasing local sequence ID, i.e. in the order they were * <p/> * sent localSeqID -> Broadcast msg to be sent. */ private SortedMap reqTbl; /** * The list of received broadcast messages that haven't yet been delivered * to the layer above. The entries are stored in increasing sequence ID, * i.e. in the order they must be delivered above * <p/> * seqID -> Received broadcast msg */ private SortedMap upTbl; /** * Retranmitter for pending broadcast requests */ private AckSenderWindow retransmitter; /** * Print addresses in host_ip:port form to bypass DNS */ private String addrToString(Object addr) { return ( addr == null ? "<null>" : ((addr instanceof org.jgroups.stack.IpAddress) ? (((org.jgroups.stack.IpAddress)addr).getIpAddress( ).getHostAddress() + ':' + ((org.jgroups.stack.IpAddress)addr).getPort()) : addr.toString()) ); } /** * @return this protocol's name */ public String getName() { return PROT_NAME; } /** * Configure the protocol based on the given list of properties * * @param properties the list of properties to use to setup this layer * @return false if there was any unrecognized property or a property with * an invalid value */ public boolean setProperties(Properties properties) { String value; // trace // Parse & remove property but ignore it; use Trace.trace instead value=properties.getProperty(TRACE_PROP); if(value != null) properties.remove(TRACE_PROP); if(properties.size() > 0) { if(log.isErrorEnabled()) log.error("The following properties are not recognized: " + properties); return (false); } return (true); } /** * Events that some layer below must handle * * @return the set of <code>Event</code>s that must be handled by some layer * below */ public Vector requiredDownServices() { return new Vector(); } /** * Events that some layer above must handle * * @return the set of <code>Event</code>s that must be handled by some * layer above */ public Vector requiredUpServices() { return new Vector(); } /** * Extract as many messages as possible from the pending up queue and send * them to the layer above */ private void _deliverBcast() { Message msg; Header header; synchronized(upTbl) { while((msg=(Message)upTbl.remove(new Long(seqID + 1))) != null) { header=(Header)msg.removeHeader(getName()); if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg)); ++seqID; } } // synchronized(upTbl) } /** * Add all undelivered bcasts sent by this member in the req queue and then * replay this queue */ private void _replayBcast() { Iterator it; Message msg; Header header; // i. Remove all undelivered bcasts sent by this member and place them // again in the pending bcast req queue synchronized(upTbl) { if(upTbl.size() > 0) if(log.isInfoEnabled()) log.info("Replaying undelivered bcasts"); it=upTbl.entrySet().iterator(); while(it.hasNext()) { msg=(Message)((Map.Entry)it.next()).getValue(); it.remove(); if(!msg.getSrc().equals(addr)) { if(log.isInfoEnabled()) log.info("During replay: " + "discarding BCAST[" + ((TOTAL.Header)msg.getHeader(getName())).sequenceID + "] from " + addrToString(msg.getSrc())); continue; } header=(Header)msg.removeHeader(getName()); if(header.localSequenceID == NULL_ID) continue; _sendBcastRequest(msg, header.localSequenceID); } } // synchronized(upTbl) } /** * Send a unicast message: Add a <code>UCAST</code> header * * @param msg the message to unicast * @return the message to send */ private Message _sendUcast(Message msg) { msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID)); return (msg); } /** * Replace the original message with a broadcast request sent to the * sequencer. The original bcast message is stored locally until a reply to * bcast is received from the sequencer. This function has the side-effect * of increasing the <code>localSeqID</code> * * @param msg the message to broadcast */ private void _sendBcastRequest(Message msg) { _sendBcastRequest(msg, ++localSeqID); } /** * Replace the original message with a broadcast request sent to the * sequencer. The original bcast message is stored locally until a reply * to bcast is received from the sequencer * * @param msg the message to broadcast * @param id the local sequence ID to use */ private void _sendBcastRequest(Message msg, long id) { // i. Store away the message while waiting for the sequencer's reply // ii. Send a bcast request immediatelly and also schedule a // retransmission synchronized(reqTbl) { reqTbl.put(new Long(id), msg); } _transmitBcastRequest(id); retransmitter.add(id, msg); } /** * Send the bcast request with the given localSeqID * * @param seqID the local sequence id of the */ private void _transmitBcastRequest(long seqID) { Message reqMsg; // i. If NULL_STATE, then ignore, just transient state before // shutting down the retransmission thread // ii. If blocked, be patient - reschedule // iii. If the request is not pending any more, acknowledge it // iv. Create a broadcast request and send it to the sequencer if(state == NULL_STATE) { if(log.isInfoEnabled()) log.info("Transmit BCAST_REQ[" + seqID + "] in NULL_STATE"); return; } if(state == BLOCK) return; synchronized(reqTbl) { if(!reqTbl.containsKey(new Long(seqID))) { retransmitter.ack(seqID); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -