total_old.java

来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页

JAVA
1,369
字号
                        return this_entry;                    }                }                else {                    log.error("Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry");                } // if ( temp_obj instanceof Entry )            }            // if we get here, we didn't find this Address            return null;        }    }    /**     * sets the sequence id for the given Address to the given value     * note: if the current sequence value for this host is greater than     * the given value, the sequence for this member is NOT changed     * (i.e. it will only set it to a larger value)     * if the given Address is not found in the member list,     * nothing is changed     */    public void setSeq(Address addr, long seq) {        Entry this_entry=getEntry(addr);        if((this_entry != null) && (this_entry.seq < seq)) {            this_entry.seq=seq;            // try to remove any messages that we don't need anymore            truncateHistory();        }    }    /**     * returns the sequence id of the "latest" cumulative acknowledgement     * for the specified Address     * if the Address is not found in the member list, a negative value     * is returned     * note: the value returned may also be negative if their have been     * no acknowledgements from the given address     */    public long getSeq(Address addr) {        Entry this_entry=getEntry(addr);        if(this_entry == null) {            return -2;  // TODO: change this to something else (e.g. constant) later  (maybe)        }        else {            return this_entry.seq;        }    }    /**     * returns the message in the history that matches the given sequence id     * returns null if no message exists in the history with this sequence id     */    public Message getMessage(long seq) {        return message_history.peekMessage(seq);    }    /**     * adds the given message (with the specified sequence id) to the     * message history     * if the given sequence id already exists in the message history,     * the message is NOT added     */    public void addMessage(Message msg, long seq) {        message_history.insertMessage(msg, seq);    }    /**     * returns the minimum cumulative acknowledged sequence id from all the members     * (i.e. the greatest sequence id cumulatively acknowledged by all members)     */    private long getLowestSeqAck() {        synchronized(acks) {            long ret_val=-10;  // start with a negative value            int size=acks.size();            for(int i=0; i < size; i++) {                Object temp_obj=acks.elementAt(i);                if(temp_obj instanceof Entry) {                    long this_seq=((Entry)temp_obj).seq;                    if(this_seq < ret_val) {                        ret_val=this_seq;                    }                }                else {                    log.error("Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index=" + i + ')');                    return -1;                }            }            return ret_val;        }    }    /**     * removes messages from the history that have been acknowledged     * by all the members of the group     */    private synchronized void truncateHistory() {        long lowest_ack_seq=getLowestSeqAck();        if(lowest_ack_seq < 0) {            // either no members, or someone has not received any messages yet            //   either way, do nothing            return;        }        // don't want message_history being altered during this operation        synchronized(message_history) {            long lowest_stored_seq;            // keep deleting the oldest stored message for as long as we can            while(((lowest_stored_seq=message_history.getFirstSeq()) >= 0) &&                    (lowest_stored_seq > lowest_ack_seq)) {                // we can delete the oldest stored message                message_history.getFirstMessage();            }        } // synchronized( message_history )    }} // class MessageAcks/** * ************************************************************************** * class TOTAL_OLD extends Protocol * <p/> * TODO: (more comments) * Sequencer based total ordering protocol layer * - requires the following layers "below" it in the stack * (or layers with equivalent functionality): * GMS, FD, PING, UDP, ... * * @author Manish Sambhu mms21@cornell.edu Spring 1999 *         ************************************************************************** */public class TOTAL_OLD extends Protocol {    // the unique name of the protocol    private final static String PROTOCOL_NAME="TOTAL_OLD";    private Address local_addr=null;    private Vector members=new Vector();  // note: members should never be null    //   (because of synchronized blocks)    /**     * next_seq_id     * the sequence id of the next message we expect to receive     * note: this value is only meaningful when non-negative     */    private long next_seq_id=-1;    /**     * next_seq_id_to_assign     * used only by the sequencer to assign sequence ids to requests     * and resend them to the group     * note: this value is only meaningful when non-negative     */    private long next_seq_id_to_assign=-1;    private final static long INIT_SEQ_ID=10;  // this value is pretty much arbitrary (should be positive though)    /**     * queued_messages     * broadcast messages that we received that we are storing so that we can     * deterministically order the messages based on their sequence ids     */    private final SavedMessages queued_messages=new SavedMessages();    /**     * ack_history     * used only by the sequencer     * stores the cumulative acks for each member of the group     * also stores messages that may be needed for resend requests     * (i.e. messages that have not been acked by all group members)     */    private MessageAcks ack_history=null;    /**     * retrans_thread     * thread that handles sending requests to the sequencer for messages     * that may not have been received but were expected to arrive     */    private final TotalRetransmissionThread retrans_thread=new TotalRetransmissionThread(this);    final Log log=LogFactory.getLog(TOTAL_OLD.class);    /**     * returns the unique name of this protocol     */    public String getName() {        return PROTOCOL_NAME;    }    public void start() throws Exception {        // Start work        retrans_thread.start();    }    public void stop() {        // stop the retransmission thread        retrans_thread.stopResendRequests();    }    /**     * Just remove if you don't need to reset any state     */    public void reset() {        // TODO: find out when this would be called, maybe do more here        // don't accept any messages until we receive a TOTAL_NEW_VIEW message from the sequencer        next_seq_id=-1;        // clear (i.e. delete) any messages that did not get propagated up        queued_messages.clearMessages();        // reset the retransmission thread state        retrans_thread.reset();    }    /**     * @return the next sequence id expected to be received in this view     */    protected long getNextSeqID() {        return next_seq_id;    }    /**     * Returns the sequence id of the "first" queued message     * (i.e., the lowest seq id queued).     * @return the sequence id of the queued message, or -1 if no messages are queued.     */    protected long getFirstQueuedSeqID() {        return queued_messages.getFirstSeq();    }    /**     * handles an Event coming up the Protocol Stack     */    public void up(Event evt) {        Message msg;        //System.out.println("UP: " + evt);        Object temp_obj;  // used for type checking before performing casts        switch(evt.getType()) {        case Event.SET_LOCAL_ADDRESS:            temp_obj=evt.getArg();            if(temp_obj instanceof Address) {                local_addr=(Address)temp_obj;            }            else {                log.error("Error: Total.up() - could not cast local address to an Address object");            }            break;        case Event.MSG:            // get the message and the header for the TOTAL_OLD layer            temp_obj=evt.getArg();            if(temp_obj instanceof Message) {                msg=(Message)temp_obj;                temp_obj=msg.removeHeader(getName());                if(temp_obj instanceof TotalHeader) {                    TotalHeader hdr=(TotalHeader)temp_obj;                    // switch on the "command" defined by the header                    switch(hdr.total_header_type) {                    case TotalHeader.TOTAL_UNICAST:                        // don't process this message, just pass it up (TotalHeader header already removed)                        passUp(evt);                        return;                    case TotalHeader.TOTAL_BCAST:                        handleBCastMessage(msg, hdr.seq_id);                        break;                    case TotalHeader.TOTAL_REQUEST:                        // if we are the sequencer, respond to this request                        if(isSequencer()) {                            handleRequestMessage(msg);                        }                        break;                    case TotalHeader.TOTAL_NEW_VIEW:                        // store the sequence id that we should expect next                        next_seq_id=hdr.seq_id;                        // TODO: need to send some sort of ACK or something to the sequencer (maybe)                        break;                    case TotalHeader.TOTAL_CUM_SEQ_ACK:                        // if we are the sequencer, update state                        if(isSequencer()) {                            temp_obj=msg.getSrc();                            if(temp_obj instanceof Address) {                                ack_history.setSeq((Address)temp_obj, hdr.seq_id);                            }                            else {                                log.error("Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)");                            }                        }                        break;                    case TotalHeader.TOTAL_RESEND:                        // if we are the sequencer, respond to this request                        if(isSequencer()) {                            handleResendRequest(msg, hdr.seq_id);                        }                        break;                    default:                        // unrecognized header type - discard message                        log.error("Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - " + hdr.toString());                        return;  // don't let it call passUp()                    } // switch( hdr.total_header_type )                }                else {                    log.error("Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)");                }  // if ( temp_obj instanceof TotalHeader )            }            else {                log.error("Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)");            }  // if ( temp_obj instanceof Address )            //System.out.println("The message is " + msg);            return;  // don't blindly pass up messages immediately (if at all)            // begin mms21            /*        case Event.BECOME_SERVER:            System.out.println( "Become Server event passed up to TOTAL_OLD (debug - mms21)" );            break;            */        case Event.TMP_VIEW:  // TODO: this may be temporary        case Event.VIEW_CHANGE:            System.out.println("View Change event passed up to TOTAL_OLD (debug - mms21)");            View new_view=(View)evt.getArg();            members=new_view.getMembers();            // print the members of this new view

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?