⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 total.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: TOTAL.java,v 1.13 2006/01/03 14:11:29 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;import org.jgroups.*;import org.jgroups.stack.AckSenderWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Streamable;import java.io.*;import java.util.*;/** * Implements the total ordering layer using a message sequencer * <p/> * <p/> * The protocol guarantees that all bcast sent messages will be delivered in * the same order to all members. For that it uses a sequencer which assignes * monotonically increasing sequence ID to broadcasts. Then all group members * deliver the bcasts in ascending sequence ID order. * <p/> * <ul> * <li> * When a bcast message comes down to this layer, it is placed in the pending * down queue. A bcast request is sent to the sequencer.</li> * <li> * When the sequencer receives a bcast request, it creates a bcast reply * message and assigns to it a monotonically increasing seqID and sends it back * to the source of the bcast request.</li> * <li> * When a broadcast reply is received, the corresponding bcast message is * assigned the received seqID. Then it is broadcasted.</li> * <li> * Received bcasts are placed in the up queue. The queue is sorted according * to the seqID of the bcast. Any message at the head of the up queue with a * seqID equal to the next expected seqID is delivered to the layer above.</li> * <li> * Unicast messages coming from the layer below are forwarded above.</li> * <li> * Unicast messages coming from the layer above are forwarded below.</li> * </ul> * <p/> * <i>Please note that once a <code>BLOCK_OK</code> is acknowledged messages * coming from above are discarded!</i> Either the application must stop * sending messages when a <code>BLOCK</code> event is received from the * channel or a QUEUE layer should be placed above this one. Received messages * are still delivered above though. * <p/> * bcast requests are retransmitted periodically until a bcast reply is * received. In case a BCAST_REP is on its way during a BCAST_REQ * retransmission, then the next BCAST_REP will be to a non-existing * BCAST_REQ. So, a null BCAST message is sent to fill the created gap in * the seqID of all members. * * @author i.georgiadis@doc.ic.ac.uk * @author Bela Ban */public class TOTAL extends Protocol {    /**     * The header processed by the TOTAL layer and intended for TOTAL     * inter-stack communication     */    public static class Header extends org.jgroups.Header implements Streamable {        // Header types        /**         * Null value for the tag         */        public static final int NULL_TYPE=-1;        /**         * Request to broadcast by the source         */        public static final int REQ=0;        /**         * Reply to broadcast request.         */        public static final int REP=1;        /**         * Unicast message         */        public static final int UCAST=2;        /**         * Broadcast Message         */        public static final int BCAST=3;        /**         * The header's type tag         */        public int type;        /**         * The ID used by the message source to match replies from the         * sequencer         */        public long localSequenceID;        /**         * The ID imposing the total order of messages         */        public long sequenceID;        /**         * used for externalization         */        public Header() {        }        /**         * Create a header for the TOTAL layer         *         * @param type       the header's type         * @param localSeqID the ID used by the sender of broadcasts to match         *                   requests with replies from the sequencer         * @param seqID      the ID imposing the total order of messages         * @throws IllegalArgumentException if the provided header type is         *                                  unknown         */        public Header(int type, long localSeqID, long seqID) {            super();            switch(type) {            case REQ:            case REP:            case UCAST:            case BCAST:                this.type=type;                break;            default:                this.type=NULL_TYPE;                throw new IllegalArgumentException("type");            }            this.localSequenceID=localSeqID;            this.sequenceID=seqID;        }        /**         * For debugging purposes         */        public String toString() {            StringBuffer buffer=new StringBuffer();            String typeName;            buffer.append("[TOTAL.Header");            switch(type) {            case REQ:                typeName="REQ";                break;            case REP:                typeName="REP";                break;            case UCAST:                typeName="UCAST";                break;            case BCAST:                typeName="BCAST";                break;            case NULL_TYPE:                typeName="NULL_TYPE";                break;            default:                typeName="";                break;            }            buffer.append(", type=" + typeName);            buffer.append(", " + "localID=" + localSequenceID);            buffer.append(", " + "seqID=" + sequenceID);            buffer.append(']');            return (buffer.toString());        }        /**         * Manual serialization         */        public void writeExternal(ObjectOutput out) throws IOException {            out.writeInt(type);            out.writeLong(localSequenceID);            out.writeLong(sequenceID);        }        /**         * Manual deserialization         */        public void readExternal(ObjectInput in) throws IOException,                                                        ClassNotFoundException {            type=in.readInt();            localSequenceID=in.readLong();            sequenceID=in.readLong();        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeInt(type);            out.writeLong(localSequenceID);            out.writeLong(sequenceID);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readInt();            localSequenceID=in.readLong();            sequenceID=in.readLong();        }        public long size() {            return Global.INT_SIZE + Global.LONG_SIZE *2;        }    }    /**     * The retransmission listener - It is called by the     * <code>AckSenderWindow</code> when a retransmission should occur     */    private class Command implements AckSenderWindow.RetransmitCommand {        Command() {        }        public void retransmit(long seqNo, Message msg) {            _retransmitBcastRequest(seqNo);        }    }    /**     * Protocol name     */    private static final String PROT_NAME="TOTAL";    /**     * Property names     */    private static final String TRACE_PROP="trace";    /**     * Average time between broadcast request retransmissions     */    private final long[] AVG_RETRANSMIT_INTERVAL=new long[]{1000, 2000, 3000, 4000};    /**     * Null value for the IDs     */    private static final long NULL_ID=-1;    // Layer sending states    /**     * No group has been joined yet     */    private static final int NULL_STATE=-1;    /**     * When set, all messages are sent/received     */    private static final int RUN=0;    /**     * When set, only session-specific messages are sent/received, i.e. only     * messages essential to the session's integrity     */    private static final int FLUSH=1;    /**     * No message is sent to the layer below     */    private static final int BLOCK=2;    /**     * The state lock allowing multiple reads or a single write     */    private final ReadWriteLock stateLock=new WriterPreferenceReadWriteLock();    /**     * Protocol layer message-sending state     */    private int state=NULL_STATE;    /**     * The address of this stack     */    private Address addr=null;    /**     * The address of the sequencer     */    private Address sequencerAddr=null;    /**     * The sequencer's seq ID. The ID of the most recently broadcast reply     * message     */    private long sequencerSeqID=NULL_ID;    /**     * The local sequence ID, i.e. the ID sent with the last broadcast request     * message. This is increased with every broadcast request sent to the     * sequencer and it's used to match the requests with the sequencer's     * replies     */    private long localSeqID=NULL_ID;    /**     * The total order sequence ID. This is the ID of the most recently     * delivered broadcast message. As the sequence IDs are increasing without     * gaps, this is used to detect missing broadcast messages     */    private long seqID=NULL_ID;    /**     * The list of unanswered broadcast requests to the sequencer. The entries     * are stored in increasing local sequence ID, i.e. in the order they were     * <p/>     * sent localSeqID -> Broadcast msg to be sent.     */    private SortedMap reqTbl;    /**     * The list of received broadcast messages that haven't yet been delivered     * to the layer above. The entries are stored in increasing sequence ID,     * i.e. in the order they must be delivered above     * <p/>     * seqID -> Received broadcast msg     */    private SortedMap upTbl;    /**     * Retranmitter for pending broadcast requests     */    private AckSenderWindow retransmitter;    /**     * Print addresses in host_ip:port form to bypass DNS     */    private String addrToString(Object addr) {        return (                addr == null ? "<null>" :                        ((addr instanceof org.jgroups.stack.IpAddress) ?                                (((org.jgroups.stack.IpAddress)addr).getIpAddress(                                ).getHostAddress() + ':' +                                        ((org.jgroups.stack.IpAddress)addr).getPort()) :                                addr.toString())        );    }    /**     * @return this protocol's name     */    public String getName() {        return PROT_NAME;    }    /**     * Configure the protocol based on the given list of properties     *     * @param properties the list of properties to use to setup this layer     * @return false if there was any unrecognized property or a property with     *         an invalid value     */    public boolean setProperties(Properties properties) {        String value;        // trace        // Parse & remove property but ignore it; use Trace.trace instead        value=properties.getProperty(TRACE_PROP);        if(value != null) properties.remove(TRACE_PROP);        if(properties.size() > 0) {            if(log.isErrorEnabled())                log.error("The following properties are not recognized: " + properties);            return (false);        }        return (true);    }    /**     * Events that some layer below must handle     *     * @return the set of <code>Event</code>s that must be handled by some layer     *         below     */    public Vector requiredDownServices() {        return new Vector();    }    /**     * Events that some layer above must handle     *     * @return the set of <code>Event</code>s that must be handled by some     *         layer above     */    public Vector requiredUpServices() {        return new Vector();    }    /**     * Extract as many messages as possible from the pending up queue and send     * them to the layer above     */    private void _deliverBcast() {        Message msg;        Header header;        synchronized(upTbl) {            while((msg=(Message)upTbl.remove(new Long(seqID + 1))) != null) {                header=(Header)msg.removeHeader(getName());                if(header.localSequenceID != NULL_ID) passUp(new Event(Event.MSG, msg));                ++seqID;            }        } // synchronized(upTbl)    }    /**     * Add all undelivered bcasts sent by this member in the req queue and then     * replay this queue     */    private void _replayBcast() {        Iterator it;        Message msg;        Header header;        // i. Remove all undelivered bcasts sent by this member and place them        // again in the pending bcast req queue        synchronized(upTbl) {            if(upTbl.size() > 0)                if(log.isInfoEnabled()) log.info("Replaying undelivered bcasts");            it=upTbl.entrySet().iterator();            while(it.hasNext()) {                msg=(Message)((Map.Entry)it.next()).getValue();                it.remove();                if(!msg.getSrc().equals(addr)) {                    if(log.isInfoEnabled())                        log.info("During replay: " +                                 "discarding BCAST[" +                                 ((TOTAL.Header)msg.getHeader(getName())).sequenceID +                                 "] from " + addrToString(msg.getSrc()));                    continue;                }                header=(Header)msg.removeHeader(getName());                if(header.localSequenceID == NULL_ID) continue;                _sendBcastRequest(msg, header.localSequenceID);            }        } // synchronized(upTbl)    }    /**     * Send a unicast message: Add a <code>UCAST</code> header     *     * @param msg the message to unicast     * @return the message to send     */    private Message _sendUcast(Message msg) {        msg.putHeader(getName(), new Header(Header.UCAST, NULL_ID, NULL_ID));        return (msg);    }    /**     * Replace the original message with a broadcast request sent to the     * sequencer. The original bcast message is stored locally until a reply to     * bcast is received from the sequencer. This function has the side-effect     * of increasing the <code>localSeqID</code>     *     * @param msg the message to broadcast     */    private void _sendBcastRequest(Message msg) {        _sendBcastRequest(msg, ++localSeqID);    }    /**     * Replace the original message with a broadcast request sent to the     * sequencer. The original bcast message is stored locally until a reply     * to bcast is received from the sequencer     *     * @param msg the message to broadcast     * @param id  the local sequence ID to use     */    private void _sendBcastRequest(Message msg, long id) {        // i. Store away the message while waiting for the sequencer's reply        // ii. Send a bcast request immediatelly and also schedule a        // retransmission        synchronized(reqTbl) {            reqTbl.put(new Long(id), msg);        }        _transmitBcastRequest(id);        retransmitter.add(id, msg);    }    /**     * Send the bcast request with the given localSeqID     *     * @param seqID the local sequence id of the     */    private void _transmitBcastRequest(long seqID) {        Message reqMsg;        // i. If NULL_STATE, then ignore, just transient state before        // shutting down the retransmission thread        // ii. If blocked, be patient - reschedule        // iii. If the request is not pending any more, acknowledge it        // iv. Create a broadcast request and send it to the sequencer        if(state == NULL_STATE) {            if(log.isInfoEnabled()) log.info("Transmit BCAST_REQ[" + seqID + "] in NULL_STATE");            return;        }        if(state == BLOCK) return;        synchronized(reqTbl) {            if(!reqTbl.containsKey(new Long(seqID))) {                retransmitter.ack(seqID);                return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -