total_old.java
来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页
JAVA
1,369 行
return this_entry; } } else { log.error("Error: (TOTAL_OLD) MessageAcks.getEntry() - could not cast element of \"acks\" to an Entry"); } // if ( temp_obj instanceof Entry ) } // if we get here, we didn't find this Address return null; } } /** * sets the sequence id for the given Address to the given value * note: if the current sequence value for this host is greater than * the given value, the sequence for this member is NOT changed * (i.e. it will only set it to a larger value) * if the given Address is not found in the member list, * nothing is changed */ public void setSeq(Address addr, long seq) { Entry this_entry=getEntry(addr); if((this_entry != null) && (this_entry.seq < seq)) { this_entry.seq=seq; // try to remove any messages that we don't need anymore truncateHistory(); } } /** * returns the sequence id of the "latest" cumulative acknowledgement * for the specified Address * if the Address is not found in the member list, a negative value * is returned * note: the value returned may also be negative if their have been * no acknowledgements from the given address */ public long getSeq(Address addr) { Entry this_entry=getEntry(addr); if(this_entry == null) { return -2; // TODO: change this to something else (e.g. constant) later (maybe) } else { return this_entry.seq; } } /** * returns the message in the history that matches the given sequence id * returns null if no message exists in the history with this sequence id */ public Message getMessage(long seq) { return message_history.peekMessage(seq); } /** * adds the given message (with the specified sequence id) to the * message history * if the given sequence id already exists in the message history, * the message is NOT added */ public void addMessage(Message msg, long seq) { message_history.insertMessage(msg, seq); } /** * returns the minimum cumulative acknowledged sequence id from all the members * (i.e. the greatest sequence id cumulatively acknowledged by all members) */ private long getLowestSeqAck() { synchronized(acks) { long ret_val=-10; // start with a negative value int size=acks.size(); for(int i=0; i < size; i++) { Object temp_obj=acks.elementAt(i); if(temp_obj instanceof Entry) { long this_seq=((Entry)temp_obj).seq; if(this_seq < ret_val) { ret_val=this_seq; } } else { log.error("Error: (TOTAL_OLD) MessageAcks.getLowestSeqAck() - could not cast element of \"acks\" to an Entry (index=" + i + ')'); return -1; } } return ret_val; } } /** * removes messages from the history that have been acknowledged * by all the members of the group */ private synchronized void truncateHistory() { long lowest_ack_seq=getLowestSeqAck(); if(lowest_ack_seq < 0) { // either no members, or someone has not received any messages yet // either way, do nothing return; } // don't want message_history being altered during this operation synchronized(message_history) { long lowest_stored_seq; // keep deleting the oldest stored message for as long as we can while(((lowest_stored_seq=message_history.getFirstSeq()) >= 0) && (lowest_stored_seq > lowest_ack_seq)) { // we can delete the oldest stored message message_history.getFirstMessage(); } } // synchronized( message_history ) }} // class MessageAcks/** * ************************************************************************** * class TOTAL_OLD extends Protocol * <p/> * TODO: (more comments) * Sequencer based total ordering protocol layer * - requires the following layers "below" it in the stack * (or layers with equivalent functionality): * GMS, FD, PING, UDP, ... * * @author Manish Sambhu mms21@cornell.edu Spring 1999 * ************************************************************************** */public class TOTAL_OLD extends Protocol { // the unique name of the protocol private final static String PROTOCOL_NAME="TOTAL_OLD"; private Address local_addr=null; private Vector members=new Vector(); // note: members should never be null // (because of synchronized blocks) /** * next_seq_id * the sequence id of the next message we expect to receive * note: this value is only meaningful when non-negative */ private long next_seq_id=-1; /** * next_seq_id_to_assign * used only by the sequencer to assign sequence ids to requests * and resend them to the group * note: this value is only meaningful when non-negative */ private long next_seq_id_to_assign=-1; private final static long INIT_SEQ_ID=10; // this value is pretty much arbitrary (should be positive though) /** * queued_messages * broadcast messages that we received that we are storing so that we can * deterministically order the messages based on their sequence ids */ private final SavedMessages queued_messages=new SavedMessages(); /** * ack_history * used only by the sequencer * stores the cumulative acks for each member of the group * also stores messages that may be needed for resend requests * (i.e. messages that have not been acked by all group members) */ private MessageAcks ack_history=null; /** * retrans_thread * thread that handles sending requests to the sequencer for messages * that may not have been received but were expected to arrive */ private final TotalRetransmissionThread retrans_thread=new TotalRetransmissionThread(this); final Log log=LogFactory.getLog(TOTAL_OLD.class); /** * returns the unique name of this protocol */ public String getName() { return PROTOCOL_NAME; } public void start() throws Exception { // Start work retrans_thread.start(); } public void stop() { // stop the retransmission thread retrans_thread.stopResendRequests(); } /** * Just remove if you don't need to reset any state */ public void reset() { // TODO: find out when this would be called, maybe do more here // don't accept any messages until we receive a TOTAL_NEW_VIEW message from the sequencer next_seq_id=-1; // clear (i.e. delete) any messages that did not get propagated up queued_messages.clearMessages(); // reset the retransmission thread state retrans_thread.reset(); } /** * @return the next sequence id expected to be received in this view */ protected long getNextSeqID() { return next_seq_id; } /** * Returns the sequence id of the "first" queued message * (i.e., the lowest seq id queued). * @return the sequence id of the queued message, or -1 if no messages are queued. */ protected long getFirstQueuedSeqID() { return queued_messages.getFirstSeq(); } /** * handles an Event coming up the Protocol Stack */ public void up(Event evt) { Message msg; //System.out.println("UP: " + evt); Object temp_obj; // used for type checking before performing casts switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: temp_obj=evt.getArg(); if(temp_obj instanceof Address) { local_addr=(Address)temp_obj; } else { log.error("Error: Total.up() - could not cast local address to an Address object"); } break; case Event.MSG: // get the message and the header for the TOTAL_OLD layer temp_obj=evt.getArg(); if(temp_obj instanceof Message) { msg=(Message)temp_obj; temp_obj=msg.removeHeader(getName()); if(temp_obj instanceof TotalHeader) { TotalHeader hdr=(TotalHeader)temp_obj; // switch on the "command" defined by the header switch(hdr.total_header_type) { case TotalHeader.TOTAL_UNICAST: // don't process this message, just pass it up (TotalHeader header already removed) passUp(evt); return; case TotalHeader.TOTAL_BCAST: handleBCastMessage(msg, hdr.seq_id); break; case TotalHeader.TOTAL_REQUEST: // if we are the sequencer, respond to this request if(isSequencer()) { handleRequestMessage(msg); } break; case TotalHeader.TOTAL_NEW_VIEW: // store the sequence id that we should expect next next_seq_id=hdr.seq_id; // TODO: need to send some sort of ACK or something to the sequencer (maybe) break; case TotalHeader.TOTAL_CUM_SEQ_ACK: // if we are the sequencer, update state if(isSequencer()) { temp_obj=msg.getSrc(); if(temp_obj instanceof Address) { ack_history.setSeq((Address)temp_obj, hdr.seq_id); } else { log.error("Error: TOTAL_OLD.Up() - could not cast source of message to an Address object (case TotalHeader.TOTAL_CUM_SEQ_ACK)"); } } break; case TotalHeader.TOTAL_RESEND: // if we are the sequencer, respond to this request if(isSequencer()) { handleResendRequest(msg, hdr.seq_id); } break; default: // unrecognized header type - discard message log.error("Error: TOTAL_OLD.up() - unrecognized TotalHeader in message - " + hdr.toString()); return; // don't let it call passUp() } // switch( hdr.total_header_type ) } else { log.error("Error: TOTAL_OLD.up() - could not cast message header to TotalHeader (case Event.MSG)"); } // if ( temp_obj instanceof TotalHeader ) } else { log.error("Error: TOTAL_OLD.up() - could not cast argument of Event to a Message (case Event.MSG)"); } // if ( temp_obj instanceof Address ) //System.out.println("The message is " + msg); return; // don't blindly pass up messages immediately (if at all) // begin mms21 /* case Event.BECOME_SERVER: System.out.println( "Become Server event passed up to TOTAL_OLD (debug - mms21)" ); break; */ case Event.TMP_VIEW: // TODO: this may be temporary case Event.VIEW_CHANGE: System.out.println("View Change event passed up to TOTAL_OLD (debug - mms21)"); View new_view=(View)evt.getArg(); members=new_view.getMembers(); // print the members of this new view
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?