total_old.java

来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页

JAVA
1,369
字号
            }            Object temp_obj=members.elementAt(0);            if(temp_obj instanceof Address) {                Address seq_addr=(Address)temp_obj;                return local_addr.equals(seq_addr);            }            else {                log.error("Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address");                return false;            } // if ( temp_obj instanceof Address )        }    }    /**     * returns the Address of the local machine     * returns null if it is not known yet     */    protected Address getLocalAddr() {        return local_addr;    }    /**     * returns the address of the current sequencer of the group     * returns null if the list of members is empty     */    protected Address getSequencer() {        synchronized(members) {            if(members.size() == 0) {                log.error("TOTAL_OLD.getSequencer() - no members");                return null;            }            else {                Object temp_obj=members.elementAt(0);                if(temp_obj instanceof Address) {                    return (Address)temp_obj;                }                else {                    log.error("Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address");                    return null;                }            }        }    }    /**     * class TotalHeader     * <p/>     * The header that is prepended to every message passed down through the TOTAL_OLD layer     * and removed (and processed) from every message passed up through the TOTAL_OLD layer     */    public static class TotalHeader extends Header {        // Total message types        public final static int TOTAL_UNICAST=0;   // a point to point unicast message that should not be processed by TOTAL_OLD        public final static int TOTAL_BCAST=1;   // message broadcast by the sequencer        public final static int TOTAL_REQUEST=2;   // request for a message to be broadcast        public final static int TOTAL_NEW_VIEW=3;   // reset with a view change, sequence number also reset        public final static int TOTAL_NEW_VIEW_ACK=4;   // acknowledgement of new view and sequence id        public final static int TOTAL_CUM_SEQ_ACK=5;   // cumulatively acknowledge the reception of messages up to a sequence id        public final static int TOTAL_SEQ_ACK=6;   // acknowledge the reception of a message with a certain sequence id  (probably won't be used)        public final static int TOTAL_RESEND=7;   // request the message with a certain sequence id        public int total_header_type;        final Log log=LogFactory.getLog(TotalHeader.class);        // TODO: finish commenting meaning of seq_id for different header types        /**         * For TOTAL_BCAST messages, seq_id is used to determine the order of messages         * in the view. The seq_id is expected to increment by one for each new message         * sent in the current view. this sequence id is reset with each new view.         * the GMS layer should make sure that messages sent in one view are not         * received in another view.         * For TOTAL_REQUEST messages, seq_id is not used.         * For TOTAL_NEW_VIEW, seq_id is the sequence id that the sequencer of this         * view will use for the first message broadcast to the group         * (i.e. the expected sequence id is "reset" to this value).         * For TOTAL_NEW_VIEW_ACK, ..         * For TOTAL_CUM_SEQ_ACK messages, the seq_id is the cumulative sequence id         * that the sender has received.         * For TOTAL_SEQ_ACK messages, seq_id is the sequence id that is being acknowledged.         * For TOTAL_RESEND, seq_id is the sequence id to be sent again.         */        public long seq_id;  // see use above (varies between types of headers)        public TotalHeader() {        } // used for externalization        public TotalHeader(int type, long seq) {            switch(type) {            case TOTAL_UNICAST:            case TOTAL_BCAST:            case TOTAL_REQUEST:            case TOTAL_NEW_VIEW:            case TOTAL_NEW_VIEW_ACK:            case TOTAL_CUM_SEQ_ACK:            case TOTAL_SEQ_ACK:            case TOTAL_RESEND:                // the given type is a known one                total_header_type=type;                break;            default:                // this type is unknown                log.error("Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: " + type);                total_header_type=-1;                break;            }            seq_id=seq;        }        //static TotalHeader getRequestHeader() {        //return new TotalHeader( TOTAL_REQUEST, -1 );  // sequence id is irrelevant        //}        public String toString() {            String type="";            switch(total_header_type) {            case TOTAL_UNICAST:                type="TOTAL_UNICAST";                break;            case TOTAL_BCAST:                type="TOTAL_BCAST";                break;            case TOTAL_REQUEST:                type="TOTAL_REQUEST";                break;            case TOTAL_NEW_VIEW:                type="NEW_VIEW";                break;            case TOTAL_NEW_VIEW_ACK:                type="NEW_VIEW_ACK";                break;            case TOTAL_CUM_SEQ_ACK:                type="TOTAL_CUM_SEQ_ACK";                break;            case TOTAL_SEQ_ACK:                type="TOTAL_SEQ_ACK";                break;            case TOTAL_RESEND:                type="TOTAL_RESEND";                break;            default:                type="UNKNOWN TYPE (" + total_header_type + ')';                break;            }            return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id + " ]";        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeInt(total_header_type);            out.writeLong(seq_id);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            total_header_type=in.readInt();            seq_id=in.readLong();        }    } // class TotalHeader} // class TOTAL_OLD/** * ************************************************************************** * class TotalRetransmissionThread * <p/> * thread that handles retransmission for the TOTAL_OLD protocol * ************************************************************************** */class TotalRetransmissionThread extends Thread {    // state variables to determine when and what to request    private long last_retrans_request_time;  // last time (in milliseconds) that we sent a resend request    private long last_requested_seq;         // latest sequence id that we have requested    // retransmission constants    final private static long polling_delay=1000;  // how long (in milliseconds) to sleep before rechecking for resend    final private static long resend_timeout=2000; // amount of time (in milliseconds) to wait on a resend request before resending another request    final private static int max_request=10;      // maximum number of resend request to send out in one iteration    // reference to the parent TOTAL_OLD protocol instance    private TOTAL_OLD prot_ptr;    // flag to specify if the thread should continue running    private boolean is_running;    final Log log=LogFactory.getLog(TotalRetransmissionThread.class);    /**     * constructor     * <p/>     * creates and initializes a retransmission thread for the     * specified instance of a TOTAL_OLD protocol     */    TotalRetransmissionThread(TOTAL_OLD parent_prot) {        super(Util.getGlobalThreadGroup(), "retransmission thread");        if(parent_prot != null) {            prot_ptr=parent_prot;        }        else {            // parent thread not specified            log.fatal("given parent protocol reference is null\n  (FATAL ERROR - TOTAL_OLD protocol will not function properly)");            // prevent the run method from doing any work            is_running=false;        }        // initialize the state variables        reset();        // let the thread make resend requests        is_running=true;    }    /**     * resets the state of the thread as if it was just started     * the thread will assume that there were no resend requests make     */    public void reset() {        // we have not made any resend requests for any messages        last_retrans_request_time=-1;        last_requested_seq=-1;    }    /**     * send a resend request to the given sequencer (from the given local_addr)     * for the given sequence id     */    private void sendResendRequest(Address sequencer, Address local_addr, long seq_id) {        Message resend_msg=new Message(sequencer, local_addr, null);        resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader(TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id));        prot_ptr.passDown(new Event(Event.MSG, resend_msg));        // debug        log.error("TotalRetransmissionThread.resend() - resend requested for message " + seq_id);    }    /**     * checks if a resend request should be made to the sequencer. if a request needs     * to be made, it makes the appropriate requests with the parameters specified     * by the constants in this class     */    private void checkForResend() {        long first_seq_id=prot_ptr.getFirstQueuedSeqID(); // sequence id of first queued message        /*        // begin debug        System.out.println( "DEBUG (TotalRetransmissionThread) - first_seq_id = " + first_seq_id );        // end debug        */        if(first_seq_id >= 0) {            // there is at least one message in the queue            long next_seq_id=prot_ptr.getNextSeqID();     // next sequence id expected from the group            if((next_seq_id < first_seq_id)) {  // TODO: handle case to resend TOTAL_NEW_VIEW message                // there are messages that we received out of order                //log.error( "DEBUG (TotalRetransmissionThread) - there are messages queued" ); // debug                // see if it is time to send a request                long time_now=System.currentTimeMillis();                if((next_seq_id > last_requested_seq) ||                        (time_now > (last_retrans_request_time + resend_timeout)) ||                        (last_retrans_request_time < 0)) {                    // send a resend request to the sequencer                    //log.error( "DEBUG (TotalRetransmissionThread) - sending resend requests" ); // debug                    Address sequencer=prot_ptr.getSequencer();                    if(sequencer == null) {                        System.out.println("Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request");                        return;                    }                    Address local_addr=prot_ptr.getLocalAddr();                    if(local_addr == null) {                        System.out.println("Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway");                    }                    long temp_long=(next_seq_id + max_request);   // potential max seq id to request (exclusive)                    long last_resend_seq_id=(temp_long > first_seq_id) ? first_seq_id : temp_long;                    for(long resend_seq=next_seq_id; resend_seq < last_resend_seq_id; resend_seq++) {                        sendResendRequest(sequencer, local_addr, resend_seq);                    }                    // update state for this set of resend requests                    last_retrans_request_time=time_now;                    last_requested_seq=last_resend_seq_id;                }            } // if ( (next_seq_id < first_seq_id) )        } // if ( first_seq_id >= 0 )        // else there are no messages to request    }    /**     * overloaded from Thread     * method that executes when the thread is started     */    public void run() {        while(is_running) {            // resend any requests if necessary            //log.error( "DEBUG (TotalRetransmissionThread) - heartbeat" ); // debug            checkForResend();            // wait before check again            try {                sleep(polling_delay);            }            catch(InterruptedException e) {            }    // do nothing if interrupted        }    }    /**     * stops the thread from making any further resend requests     * note: the thread may not die immediately     */    public void stopResendRequests() {        is_running=false;    }} // class TotalRetransmissionThread

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?