total_old.java

来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页

JAVA
1,369
字号
            System.out.println("New view members (printed in TOTAL_OLD):");            int view_size=members.size();            for(int i=0; i < view_size; i++) {                System.out.println("  " + members.elementAt(i).toString());            }            // reset the state for total ordering for this new view            reset();            // if we are the sequencer in this new view, send a new            //   TOTAL_NEW_VIEW message to the group            if(isSequencer()) {                // we are the sequencer in this new view                log.error("TOTAL_OLD.up() - I am the sequencer of this new view");                // we need to keep track of acknowledgements messages                ack_history=new MessageAcks(members);                // start assigning messages with this sequence id                next_seq_id_to_assign=INIT_SEQ_ID;                // send a message to the group with the initial sequence id to expect                Message new_view_msg=new Message(null, local_addr, null);                new_view_msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_NEW_VIEW, next_seq_id_to_assign));                passDown(new Event(Event.MSG, new_view_msg));            }            break;            // end mms21        default:            break;        } // switch( evt.getType() )        passUp(evt);            // Pass up to the layer above us    }    /**     * passes up (calling passUp()) any stored messages eligible according to     * the total ordering property     */    private synchronized int passUpMessages() {        if(next_seq_id < 0) {            // don't know what to pass up so don't pass up anything            return 0;        }        long lowest_seq_stored=queued_messages.getFirstSeq();        if(lowest_seq_stored < 0) {            // there are no messages stored            return 0;        }        if(lowest_seq_stored < next_seq_id) {            // it is bad to have messages stored that have a lower sequence id than what            //   we are expecting            log.error("Error: TOTAL_OLD.passUpMessages() - next expected sequence id (" + next_seq_id + ") is greater than the sequence id of a stored message (" + lowest_seq_stored + ')');            return 0;        }        else            if(next_seq_id == lowest_seq_stored) {                // we can pass this first message up the Protocol Stack                Message msg=queued_messages.getFirstMessage();                if(msg == null) {                    log.error("Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages");                    return 0;                }                passUp(new Event(Event.MSG, msg));                // increment the next expected sequence id                next_seq_id++;                return (1 + passUpMessages());            }            else {                /* don't drop messages, it should be requesting resends                // all messages stored have sequence ids greater than expected                if ( queued_messages.getSize() > 10 ) {                 {                    log.error( "WARNING: TOTAL_OLD.passUpMessages() - more than 10 messages saved" );                    log.error( "Dropping sequence id: " + next_seq_id );                }                next_seq_id++;                return passUpMessages();                }                */                return 0;            }    }    private final long last_request_time=-1;    /**     * stores the message in the list of messages. also passes up any messages     * if it can (i.e. if it satisfies total ordering).     * if the sequence for the next expected message is unknown, the message is     * discarded without being stored     */    private synchronized void handleBCastMessage(Message msg, long seq) {        /* store the message anyway, hopefully we'll get a TOTAL_NEW_VIEW message later        if ( next_seq < 0 ) {            // don't know what sequence id to expect             log.error( "TOTAL_OLD.handleBCastMessage() - received broadcast message but don't know what sequence id to expect" );            return;        }        */        if(seq < next_seq_id) {            // we're expecting a message with a greater sequence id            //   hopefully, we've already seen this message so just ignore it            return;        }        // save this message in the list of received broadcast messages        queued_messages.insertMessage(msg, seq);        // try to pass up any messages        int num_passed=passUpMessages();// TODO: this if is temporary (debug)        if(num_passed > 1)            log.error("TOTAL_OLD.handleBCastMessage() - " + num_passed + " message(s) passed up the Protocol Stack");        /* this is handles by the retransmission thread now	// see if we may need to issue any resend requests	if ( queued_messages.getSize() > 1 ) {  // TODO: magical constant N?	    Address sequencer = getSequencer();	    //Object sequencer = msg.makeReply().getSrc();  // test (debug)	    if ( sequencer == null ) {		// couldn't get the sequencer of the group		log.error( "TOTAL_OLD.handleBCastMessage() - couldn't determine sequencer to send a TOTAL_RESEND request" );		return;	    }	    if ( local_addr == null ) {		// don't know local address, can't set source of message		log.error( "TOTAL_OLD.handleBCastMessage() - do not know local address so cannot send resend request for message " + seq );		return;	    }	    long time_now = System.currentTimeMillis();	    if ( (last_request_time >= 0) && ((time_now - last_request_time) < 1000) ) {		return;	    } else {		last_request_time = time_now;	    }	    // request a resend request for all missing sequence ids	    //   from the next one expected up to the "earliest" queued one	    // TODO: (works a little different now)	    long first_queued_seq = queued_messages.getFirstSeq();	    long max_resend_seq = ((next_seq_id + 10) > first_queued_seq) ? first_queued_seq : (next_seq_id + 10);	    for( long resend_seq=next_seq_id; resend_seq<=max_resend_seq ; resend_seq++ ) {		Message resend_msg = new Message( sequencer, local_addr, null );		resend_msg.putHeader(getName(),  new TotalHeader( TotalHeader.TOTAL_RESEND, resend_seq ) );		passDown( new Event( Event.MSG, resend_msg ) );		 log.error( "TOTAL_OLD.handleBCastMessage() - resend requested for message " + resend_seq );	    }	}	*/    }    /**     * respond to a request message by broadcasting a copy of the message to the group     * with the next sequence id assigned to it     * if we do not know what the next sequence id is to assign, discard the message     */    private synchronized void handleRequestMessage(Message msg) {        if(next_seq_id_to_assign < 0) {            // we cannot assign a valid sequence id            log.error("Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign");            return;        }        // make the message a broadcast message to the group        msg.setDest(null);        // set the source of the message to be me        msg.setSrc(local_addr);        // add the sequence id to the message        msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_BCAST, next_seq_id_to_assign));        // store a copy of this message is the history        Message msg_copy=msg.copy();        ack_history.addMessage(msg_copy, next_seq_id_to_assign);        // begin debug        Object header=msg_copy.getHeader(getName());        if(!(header instanceof TotalHeader)) {            log.error("Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - " + next_seq_id_to_assign);        }        // end debug        // increment the next sequence id to use        next_seq_id_to_assign++;        // pass this new Message (wrapped in an Event) down the Protocol Stack        passDown(new Event(Event.MSG, msg));    }    /**     * respond to a request to resend a message with the specified sequence id     */    private synchronized void handleResendRequest(Message msg, long seq) {        log.error("TOTAL_OLD.handleRequestMessage() - received resend request for message " + seq);        /* just rebroadcast for now because i can't get the source - this is bad (TODO: fix this)        Object requester = msg.makeReply().getSrc();  // Address? of requester - test (debug)        /*        Object temp_obj = msg.getSrc();        if ( temp_obj instanceof Address ) {            Address requester = (Address) temp_obj;        } else {            log.error( "Error: TOTAL_OLD.handleResendRequest() - could not cast source of message to an Address" );            return;        }        * /        if ( requester == null ) {            // don't know who to send this back to            log.error( "TOTAL_OLD.handleResendRequest() - do not know who requested this resend request for sequence " + seq );            return;        }        */        Address requester=null;// log.error( "TOTAL_OLD: got here - 1" );        Message resend_msg=ack_history.getMessage(seq);// log.error( "TOTAL_OLD: got here - 2" );        if(resend_msg == null) {            // couldn't find this message in the history            log.error("TOTAL_OLD.handleResendRequest() - could not find the message " + seq + " in the history to resend");            return;        }        resend_msg.setDest(requester);        // note: do not need to add a TotalHeader because it should already be a        //       TOTAL_BCAST message        // begin debug        Object header=resend_msg.getHeader(getName());        if(header instanceof TotalHeader) {            //log.error( "TOTAL_OLD: resend msg GOOD (header is TotalHeader) - " + seq );        }        else {            log.error("TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - " + seq);        }        // end debug        passDown(new Event(Event.MSG, resend_msg));        log.error("TOTAL_OLD.handleResendRequest() - responded to resend request for message " + seq);    }    /**     * handles an Event coming down the Protocol Stack     */    public void down(Event evt) {        Message msg;        //System.out.println("DOWN: " + evt);        switch(evt.getType()) {        case Event.VIEW_CHANGE:            // this will probably never happen            log.error("NOTE: VIEW_CHANGE Event going down through " + PROTOCOL_NAME);            Vector new_members=((View)evt.getArg()).getMembers();            synchronized(members) {                members.removeAllElements();                if(new_members != null && new_members.size() > 0)                    for(int i=0; i < new_members.size(); i++)                        members.addElement(new_members.elementAt(i));            }            break;        case Event.MSG:            Object temp_obj=evt.getArg();            if(temp_obj instanceof Message) {                msg=(Message)temp_obj;                // note: a TotalHeader is added to every message (Event.MSG)                //   that is sent                // check if this is a broadcast message                if(msg.getDest() == null) {                    // yes, this is a broadcast message                    // send out a request for a message to be broadcast                    //   (the sequencer will handle this)                    Address sequencer=getSequencer();                    if(sequencer != null) {                        // we only need to send the request to the sequencer (who will broadcast it)                        msg.setDest(sequencer);                    }                    else {                        // couldn't find sequencer of the group                        // for now, just send it to the original destination                        //   (don't need to do anything here)                    }                    //msg.putHeader(getName(),  TotalHeader.getRequestHeader() );                    msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_REQUEST, -1));                }                else {                    // this is a point to point unicast message so just send it to its original destination                    msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_UNICAST, -1));  // sequence id in header is irrelevant                }            }            else {                log.error("Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)");            } // if ( temp_obj instanceof Message )            break;        default:            break;        } // switch( evt.getType() )        passDown(evt);          // Pass on to the layer below us    }    /**     * returns true if we are currently the sequencer of the group;     * returns false otherwise     * note: returns false if our local address is unknown, or the list of members is     * empty     */    private boolean isSequencer() {        if(local_addr == null) {            // don't know my own local address            log.error("TOTAL_OLD.isSequencer() - local address unknown!");            return false;        }        synchronized(members) {            if(members.size() == 0) {                // there are no members listed for the group (not even myself)                log.error("TOTAL_OLD.isSequencer() - no members!");                return false;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?