total_old.java
来自「JGRoups源码」· Java 代码 · 共 1,369 行 · 第 1/4 页
JAVA
1,369 行
System.out.println("New view members (printed in TOTAL_OLD):"); int view_size=members.size(); for(int i=0; i < view_size; i++) { System.out.println(" " + members.elementAt(i).toString()); } // reset the state for total ordering for this new view reset(); // if we are the sequencer in this new view, send a new // TOTAL_NEW_VIEW message to the group if(isSequencer()) { // we are the sequencer in this new view log.error("TOTAL_OLD.up() - I am the sequencer of this new view"); // we need to keep track of acknowledgements messages ack_history=new MessageAcks(members); // start assigning messages with this sequence id next_seq_id_to_assign=INIT_SEQ_ID; // send a message to the group with the initial sequence id to expect Message new_view_msg=new Message(null, local_addr, null); new_view_msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_NEW_VIEW, next_seq_id_to_assign)); passDown(new Event(Event.MSG, new_view_msg)); } break; // end mms21 default: break; } // switch( evt.getType() ) passUp(evt); // Pass up to the layer above us } /** * passes up (calling passUp()) any stored messages eligible according to * the total ordering property */ private synchronized int passUpMessages() { if(next_seq_id < 0) { // don't know what to pass up so don't pass up anything return 0; } long lowest_seq_stored=queued_messages.getFirstSeq(); if(lowest_seq_stored < 0) { // there are no messages stored return 0; } if(lowest_seq_stored < next_seq_id) { // it is bad to have messages stored that have a lower sequence id than what // we are expecting log.error("Error: TOTAL_OLD.passUpMessages() - next expected sequence id (" + next_seq_id + ") is greater than the sequence id of a stored message (" + lowest_seq_stored + ')'); return 0; } else if(next_seq_id == lowest_seq_stored) { // we can pass this first message up the Protocol Stack Message msg=queued_messages.getFirstMessage(); if(msg == null) { log.error("Error: TOTAL_OLD.passUpMessages() - unexpected null Message retrieved from stored messages"); return 0; } passUp(new Event(Event.MSG, msg)); // increment the next expected sequence id next_seq_id++; return (1 + passUpMessages()); } else { /* don't drop messages, it should be requesting resends // all messages stored have sequence ids greater than expected if ( queued_messages.getSize() > 10 ) { { log.error( "WARNING: TOTAL_OLD.passUpMessages() - more than 10 messages saved" ); log.error( "Dropping sequence id: " + next_seq_id ); } next_seq_id++; return passUpMessages(); } */ return 0; } } private final long last_request_time=-1; /** * stores the message in the list of messages. also passes up any messages * if it can (i.e. if it satisfies total ordering). * if the sequence for the next expected message is unknown, the message is * discarded without being stored */ private synchronized void handleBCastMessage(Message msg, long seq) { /* store the message anyway, hopefully we'll get a TOTAL_NEW_VIEW message later if ( next_seq < 0 ) { // don't know what sequence id to expect log.error( "TOTAL_OLD.handleBCastMessage() - received broadcast message but don't know what sequence id to expect" ); return; } */ if(seq < next_seq_id) { // we're expecting a message with a greater sequence id // hopefully, we've already seen this message so just ignore it return; } // save this message in the list of received broadcast messages queued_messages.insertMessage(msg, seq); // try to pass up any messages int num_passed=passUpMessages();// TODO: this if is temporary (debug) if(num_passed > 1) log.error("TOTAL_OLD.handleBCastMessage() - " + num_passed + " message(s) passed up the Protocol Stack"); /* this is handles by the retransmission thread now // see if we may need to issue any resend requests if ( queued_messages.getSize() > 1 ) { // TODO: magical constant N? Address sequencer = getSequencer(); //Object sequencer = msg.makeReply().getSrc(); // test (debug) if ( sequencer == null ) { // couldn't get the sequencer of the group log.error( "TOTAL_OLD.handleBCastMessage() - couldn't determine sequencer to send a TOTAL_RESEND request" ); return; } if ( local_addr == null ) { // don't know local address, can't set source of message log.error( "TOTAL_OLD.handleBCastMessage() - do not know local address so cannot send resend request for message " + seq ); return; } long time_now = System.currentTimeMillis(); if ( (last_request_time >= 0) && ((time_now - last_request_time) < 1000) ) { return; } else { last_request_time = time_now; } // request a resend request for all missing sequence ids // from the next one expected up to the "earliest" queued one // TODO: (works a little different now) long first_queued_seq = queued_messages.getFirstSeq(); long max_resend_seq = ((next_seq_id + 10) > first_queued_seq) ? first_queued_seq : (next_seq_id + 10); for( long resend_seq=next_seq_id; resend_seq<=max_resend_seq ; resend_seq++ ) { Message resend_msg = new Message( sequencer, local_addr, null ); resend_msg.putHeader(getName(), new TotalHeader( TotalHeader.TOTAL_RESEND, resend_seq ) ); passDown( new Event( Event.MSG, resend_msg ) ); log.error( "TOTAL_OLD.handleBCastMessage() - resend requested for message " + resend_seq ); } } */ } /** * respond to a request message by broadcasting a copy of the message to the group * with the next sequence id assigned to it * if we do not know what the next sequence id is to assign, discard the message */ private synchronized void handleRequestMessage(Message msg) { if(next_seq_id_to_assign < 0) { // we cannot assign a valid sequence id log.error("Error: TOTAL_OLD.handleRequestMessage() - cannot handle request... do not know what sequence id to assign"); return; } // make the message a broadcast message to the group msg.setDest(null); // set the source of the message to be me msg.setSrc(local_addr); // add the sequence id to the message msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_BCAST, next_seq_id_to_assign)); // store a copy of this message is the history Message msg_copy=msg.copy(); ack_history.addMessage(msg_copy, next_seq_id_to_assign); // begin debug Object header=msg_copy.getHeader(getName()); if(!(header instanceof TotalHeader)) { log.error("Error: TOTAL_OLD.handleRequestMessage() - BAD: stored message that did not contain a TotalHeader - " + next_seq_id_to_assign); } // end debug // increment the next sequence id to use next_seq_id_to_assign++; // pass this new Message (wrapped in an Event) down the Protocol Stack passDown(new Event(Event.MSG, msg)); } /** * respond to a request to resend a message with the specified sequence id */ private synchronized void handleResendRequest(Message msg, long seq) { log.error("TOTAL_OLD.handleRequestMessage() - received resend request for message " + seq); /* just rebroadcast for now because i can't get the source - this is bad (TODO: fix this) Object requester = msg.makeReply().getSrc(); // Address? of requester - test (debug) /* Object temp_obj = msg.getSrc(); if ( temp_obj instanceof Address ) { Address requester = (Address) temp_obj; } else { log.error( "Error: TOTAL_OLD.handleResendRequest() - could not cast source of message to an Address" ); return; } * / if ( requester == null ) { // don't know who to send this back to log.error( "TOTAL_OLD.handleResendRequest() - do not know who requested this resend request for sequence " + seq ); return; } */ Address requester=null;// log.error( "TOTAL_OLD: got here - 1" ); Message resend_msg=ack_history.getMessage(seq);// log.error( "TOTAL_OLD: got here - 2" ); if(resend_msg == null) { // couldn't find this message in the history log.error("TOTAL_OLD.handleResendRequest() - could not find the message " + seq + " in the history to resend"); return; } resend_msg.setDest(requester); // note: do not need to add a TotalHeader because it should already be a // TOTAL_BCAST message // begin debug Object header=resend_msg.getHeader(getName()); if(header instanceof TotalHeader) { //log.error( "TOTAL_OLD: resend msg GOOD (header is TotalHeader) - " + seq ); } else { log.error("TOTAL_OLD: resend msg BAD (header is NOT a TotalHeader) - " + seq); } // end debug passDown(new Event(Event.MSG, resend_msg)); log.error("TOTAL_OLD.handleResendRequest() - responded to resend request for message " + seq); } /** * handles an Event coming down the Protocol Stack */ public void down(Event evt) { Message msg; //System.out.println("DOWN: " + evt); switch(evt.getType()) { case Event.VIEW_CHANGE: // this will probably never happen log.error("NOTE: VIEW_CHANGE Event going down through " + PROTOCOL_NAME); Vector new_members=((View)evt.getArg()).getMembers(); synchronized(members) { members.removeAllElements(); if(new_members != null && new_members.size() > 0) for(int i=0; i < new_members.size(); i++) members.addElement(new_members.elementAt(i)); } break; case Event.MSG: Object temp_obj=evt.getArg(); if(temp_obj instanceof Message) { msg=(Message)temp_obj; // note: a TotalHeader is added to every message (Event.MSG) // that is sent // check if this is a broadcast message if(msg.getDest() == null) { // yes, this is a broadcast message // send out a request for a message to be broadcast // (the sequencer will handle this) Address sequencer=getSequencer(); if(sequencer != null) { // we only need to send the request to the sequencer (who will broadcast it) msg.setDest(sequencer); } else { // couldn't find sequencer of the group // for now, just send it to the original destination // (don't need to do anything here) } //msg.putHeader(getName(), TotalHeader.getRequestHeader() ); msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_REQUEST, -1)); } else { // this is a point to point unicast message so just send it to its original destination msg.putHeader(getName(), new TotalHeader(TotalHeader.TOTAL_UNICAST, -1)); // sequence id in header is irrelevant } } else { log.error("Error: TOTAL_OLD.down() - could not cast argument of Event to a Message (case Event.MSG)"); } // if ( temp_obj instanceof Message ) break; default: break; } // switch( evt.getType() ) passDown(evt); // Pass on to the layer below us } /** * returns true if we are currently the sequencer of the group; * returns false otherwise * note: returns false if our local address is unknown, or the list of members is * empty */ private boolean isSequencer() { if(local_addr == null) { // don't know my own local address log.error("TOTAL_OLD.isSequencer() - local address unknown!"); return false; } synchronized(members) { if(members.size() == 0) { // there are no members listed for the group (not even myself) log.error("TOTAL_OLD.isSequencer() - no members!"); return false;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?