total_old.java
来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页
JAVA
1,369 行
} Object temp_obj=members.elementAt(0); if(temp_obj instanceof Address) { Address seq_addr=(Address)temp_obj; return local_addr.equals(seq_addr); } else { log.error("Error: TOTAL_OLD.isSequencer() - could not cast element of \"members\" to an Address"); return false; } // if ( temp_obj instanceof Address ) } } /** * returns the Address of the local machine * returns null if it is not known yet */ protected Address getLocalAddr() { return local_addr; } /** * returns the address of the current sequencer of the group * returns null if the list of members is empty */ protected Address getSequencer() { synchronized(members) { if(members.size() == 0) { log.error("TOTAL_OLD.getSequencer() - no members"); return null; } else { Object temp_obj=members.elementAt(0); if(temp_obj instanceof Address) { return (Address)temp_obj; } else { log.error("Error: TOTAL_OLD.getSequencer() - could not cast first element of \"members\" to an Address"); return null; } } } } /** * class TotalHeader * <p/> * The header that is prepended to every message passed down through the TOTAL_OLD layer * and removed (and processed) from every message passed up through the TOTAL_OLD layer */ public static class TotalHeader extends Header { // Total message types public final static int TOTAL_UNICAST=0; // a point to point unicast message that should not be processed by TOTAL_OLD public final static int TOTAL_BCAST=1; // message broadcast by the sequencer public final static int TOTAL_REQUEST=2; // request for a message to be broadcast public final static int TOTAL_NEW_VIEW=3; // reset with a view change, sequence number also reset public final static int TOTAL_NEW_VIEW_ACK=4; // acknowledgement of new view and sequence id public final static int TOTAL_CUM_SEQ_ACK=5; // cumulatively acknowledge the reception of messages up to a sequence id public final static int TOTAL_SEQ_ACK=6; // acknowledge the reception of a message with a certain sequence id (probably won't be used) public final static int TOTAL_RESEND=7; // request the message with a certain sequence id public int total_header_type; final Log log=LogFactory.getLog(TotalHeader.class); // TODO: finish commenting meaning of seq_id for different header types /** * For TOTAL_BCAST messages, seq_id is used to determine the order of messages * in the view. The seq_id is expected to increment by one for each new message * sent in the current view. this sequence id is reset with each new view. * the GMS layer should make sure that messages sent in one view are not * received in another view. * For TOTAL_REQUEST messages, seq_id is not used. * For TOTAL_NEW_VIEW, seq_id is the sequence id that the sequencer of this * view will use for the first message broadcast to the group * (i.e. the expected sequence id is "reset" to this value). * For TOTAL_NEW_VIEW_ACK, .. * For TOTAL_CUM_SEQ_ACK messages, the seq_id is the cumulative sequence id * that the sender has received. * For TOTAL_SEQ_ACK messages, seq_id is the sequence id that is being acknowledged. * For TOTAL_RESEND, seq_id is the sequence id to be sent again. */ public long seq_id; // see use above (varies between types of headers) public TotalHeader() { } // used for externalization public TotalHeader(int type, long seq) { switch(type) { case TOTAL_UNICAST: case TOTAL_BCAST: case TOTAL_REQUEST: case TOTAL_NEW_VIEW: case TOTAL_NEW_VIEW_ACK: case TOTAL_CUM_SEQ_ACK: case TOTAL_SEQ_ACK: case TOTAL_RESEND: // the given type is a known one total_header_type=type; break; default: // this type is unknown log.error("Error: TotalHeader.TotalHeader() - unknown TotalHeader type given: " + type); total_header_type=-1; break; } seq_id=seq; } //static TotalHeader getRequestHeader() { //return new TotalHeader( TOTAL_REQUEST, -1 ); // sequence id is irrelevant //} public String toString() { String type=""; switch(total_header_type) { case TOTAL_UNICAST: type="TOTAL_UNICAST"; break; case TOTAL_BCAST: type="TOTAL_BCAST"; break; case TOTAL_REQUEST: type="TOTAL_REQUEST"; break; case TOTAL_NEW_VIEW: type="NEW_VIEW"; break; case TOTAL_NEW_VIEW_ACK: type="NEW_VIEW_ACK"; break; case TOTAL_CUM_SEQ_ACK: type="TOTAL_CUM_SEQ_ACK"; break; case TOTAL_SEQ_ACK: type="TOTAL_SEQ_ACK"; break; case TOTAL_RESEND: type="TOTAL_RESEND"; break; default: type="UNKNOWN TYPE (" + total_header_type + ')'; break; } return "[ TOTAL_OLD: type=" + type + ", seq=" + seq_id + " ]"; } public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(total_header_type); out.writeLong(seq_id); } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { total_header_type=in.readInt(); seq_id=in.readLong(); } } // class TotalHeader} // class TOTAL_OLD/** * ************************************************************************** * class TotalRetransmissionThread * <p/> * thread that handles retransmission for the TOTAL_OLD protocol * ************************************************************************** */class TotalRetransmissionThread extends Thread { // state variables to determine when and what to request private long last_retrans_request_time; // last time (in milliseconds) that we sent a resend request private long last_requested_seq; // latest sequence id that we have requested // retransmission constants final private static long polling_delay=1000; // how long (in milliseconds) to sleep before rechecking for resend final private static long resend_timeout=2000; // amount of time (in milliseconds) to wait on a resend request before resending another request final private static int max_request=10; // maximum number of resend request to send out in one iteration // reference to the parent TOTAL_OLD protocol instance private TOTAL_OLD prot_ptr; // flag to specify if the thread should continue running private boolean is_running; final Log log=LogFactory.getLog(TotalRetransmissionThread.class); /** * constructor * <p/> * creates and initializes a retransmission thread for the * specified instance of a TOTAL_OLD protocol */ TotalRetransmissionThread(TOTAL_OLD parent_prot) { super(Util.getGlobalThreadGroup(), "retransmission thread"); if(parent_prot != null) { prot_ptr=parent_prot; } else { // parent thread not specified log.fatal("given parent protocol reference is null\n (FATAL ERROR - TOTAL_OLD protocol will not function properly)"); // prevent the run method from doing any work is_running=false; } // initialize the state variables reset(); // let the thread make resend requests is_running=true; } /** * resets the state of the thread as if it was just started * the thread will assume that there were no resend requests make */ public void reset() { // we have not made any resend requests for any messages last_retrans_request_time=-1; last_requested_seq=-1; } /** * send a resend request to the given sequencer (from the given local_addr) * for the given sequence id */ private void sendResendRequest(Address sequencer, Address local_addr, long seq_id) { Message resend_msg=new Message(sequencer, local_addr, null); resend_msg.putHeader(getName(), new TOTAL_OLD.TotalHeader(TOTAL_OLD.TotalHeader.TOTAL_RESEND, seq_id)); prot_ptr.passDown(new Event(Event.MSG, resend_msg)); // debug log.error("TotalRetransmissionThread.resend() - resend requested for message " + seq_id); } /** * checks if a resend request should be made to the sequencer. if a request needs * to be made, it makes the appropriate requests with the parameters specified * by the constants in this class */ private void checkForResend() { long first_seq_id=prot_ptr.getFirstQueuedSeqID(); // sequence id of first queued message /* // begin debug System.out.println( "DEBUG (TotalRetransmissionThread) - first_seq_id = " + first_seq_id ); // end debug */ if(first_seq_id >= 0) { // there is at least one message in the queue long next_seq_id=prot_ptr.getNextSeqID(); // next sequence id expected from the group if((next_seq_id < first_seq_id)) { // TODO: handle case to resend TOTAL_NEW_VIEW message // there are messages that we received out of order //log.error( "DEBUG (TotalRetransmissionThread) - there are messages queued" ); // debug // see if it is time to send a request long time_now=System.currentTimeMillis(); if((next_seq_id > last_requested_seq) || (time_now > (last_retrans_request_time + resend_timeout)) || (last_retrans_request_time < 0)) { // send a resend request to the sequencer //log.error( "DEBUG (TotalRetransmissionThread) - sending resend requests" ); // debug Address sequencer=prot_ptr.getSequencer(); if(sequencer == null) { System.out.println("Error: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - could not determine sequencer to send a TOTAL_RESEND request"); return; } Address local_addr=prot_ptr.getLocalAddr(); if(local_addr == null) { System.out.println("Warning: (TOTAL_OLD) TotalRetransmissionThread.checkForResend() - local address not specified in TOTAL_RESEND request... attempting to send requests anyway"); } long temp_long=(next_seq_id + max_request); // potential max seq id to request (exclusive) long last_resend_seq_id=(temp_long > first_seq_id) ? first_seq_id : temp_long; for(long resend_seq=next_seq_id; resend_seq < last_resend_seq_id; resend_seq++) { sendResendRequest(sequencer, local_addr, resend_seq); } // update state for this set of resend requests last_retrans_request_time=time_now; last_requested_seq=last_resend_seq_id; } } // if ( (next_seq_id < first_seq_id) ) } // if ( first_seq_id >= 0 ) // else there are no messages to request } /** * overloaded from Thread * method that executes when the thread is started */ public void run() { while(is_running) { // resend any requests if necessary //log.error( "DEBUG (TotalRetransmissionThread) - heartbeat" ); // debug checkForResend(); // wait before check again try { sleep(polling_delay); } catch(InterruptedException e) { } // do nothing if interrupted } } /** * stops the thread from making any further resend requests * note: the thread may not die immediately */ public void stopResendRequests() { is_running=false; }} // class TotalRetransmissionThread
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?