nakack.java.separate

来自「JGRoups源码」· SEPARATE 代码 · 共 485 行

SEPARATE
485
字号
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class NakAckHeader implements Serializable {    public static final int NAK_MSG        = 1;  // asynchronous msg    public static final int ACK_MSG        = 2;  // synchronous msg    public static final int RETRANSMIT_MSG = 3;  // retransmit msg    public static final int ACK_RSP        = 4;  // ack to ACK_MSG, seqno contains ACKed    int     type=0;    long    seqno=-1;      // either reg. ACK_MSG or first_seqno in retransmissions    long    last_seqno=-1; // used for retransmissions    ViewId  vid=null;        NakAckHeader(int type, long seqno, ViewId vid) {	this.type=type;	this.seqno=seqno;	this.vid=vid;    }    String Type2Str(int t) {	switch(t) {	case NAK_MSG: return "NAK_MSG";	case ACK_MSG: return "ACK_MSG";	case RETRANSMIT_MSG: return "RETRANSMIT_MSG";	case ACK_RSP: return "ACK_RSP";	default: return "<undefined>";	}    }    public String toString() {	return "[NAKACK: " + Type2Str(type) + ", seqno=" + seqno + ", last_seqno=" + last_seqno +	    ", vid=" + vid + "]";    }}/**   Combined NAK and ACK class. The layer contains a NAKer and an ACKer: depending on   which one is set, a message will be sent using a negative acknowldegment scheme   (NAK) or regular acknowledgement (ACK). Event SET_ACK_MODE switches between the 2   modes. The following communication between 2 peers exists (left side is initiator,   right side receiver): <pre>      send_nak   -------------->       asynchronous      send_ack   ------------->>       synchronous   <--------------        ack      retransmit   <--------------       asynchronous   </pre>      When using NAKs, a messages is just tagged with a sequence number (seqno) and   broadcast asynchronously. A receiver requests retransmissions (asynchronous) for   gaps between received messages. When ACKs are used, each message is broadcast to   the current membership synchronously and only when a response from each member has   been received (or a suspect notification) will the call return. If a response has   not been received after a timeout, the message will be retransmitted to the   corresponding member. If the response is not received after n retries, and no   suspect notification has been received, the member will be suspected.      When a message is sent, it will contain a header describing the type of the   message, and containing additional data, such as sequence number etc. When a   message is received, it is fed into either the ACKer or NAKer, depending on the   header's type.  */public class NAKACK extends Protocol {    NAKer                 naker=new NAKer();    ACKer                 acker=new ACKer();    LosslessTransmission  transmitter=naker;             // NAK is default    long                  nak_seqno=0;                   // current message sequence number for NAK msgs    long                  ack_seqno=0;                   // current message sequence number for ACK msgs    Hashtable             received_msgs=new Hashtable(); // ordered by sender -> NakReceiverWindow    Hashtable             sent_msgs=new Hashtable();     // ordered by seqno (sent by me !)    AckMcastSenderWindow  ackwin=null;    ViewId                vid=null;    View                  view=null;        class LosslessTransmission implements NakReceiverWindow.RetransmitCommand {	void Start() {}	void Stop()  {}	void Reset() {	    NakReceiverWindow win;	    sent_msgs.clear();	    	    for(Enumeration e=received_msgs.elements(); e.hasMoreElements();) {		win=(NakReceiverWindow)e.nextElement();		win.Reset();	    }	    received_msgs.clear();	}	void Send(Message msg) {	    ;	}	void Receive(long id, Message msg) {	    ;	}	/** Called by retransmission thread when gap is detected. Sends retr. request	    to originator of msg */	public void Retransmit(long first_seqno, long last_seqno, Object sender) {	    System.out.println("NAKer --> Retransmit([" + first_seqno + ", " + last_seqno + "]) to " + 			       sender);	    	    NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, first_seqno, vid);	    Message      retransmit_msg=new Message(sender, null, null);	    hdr.last_seqno=last_seqno;	    retransmit_msg.AddHeader(hdr);	    try {		PassDown(new Event(Event.MSG, retransmit_msg));	    }	    catch(Exception e) {		System.err.println("NAKACK.NAKer.Retransmit(): " + e);	    }	}	// Retransmit from sent-table	void Retransmit(Object dest, long first_seqno, long last_seqno) {	    Message m, retr_msg;	    for(long i=first_seqno; i <= last_seqno; i++) {		m=(Message)sent_msgs.get(new Long(i));  		if(m == null) {  		    System.err.println("NAKACK.LosslessTransmission.Retransmit(): message with " +  				       "seqno=" + i + " not found !");  		    continue;  		}				retr_msg=m.Copy();		retr_msg.SetDest(dest);				try {		    PassDown(new Event(Event.MSG, retr_msg));		}		catch(Exception e) {		    System.err.println("NAKACK.LosslessTransmission.Retransmit(): " + e);		}	    }	}    }            class NAKer extends LosslessTransmission {	void Send(Message msg) {	    Message  copy;	    long     id;	    if(vid == null) {		System.err.println("NAKACK.NAKer.Send(): vid is null !");		return;	    }	    id=nak_seqno++;	    msg.AddHeader(new NakAckHeader(NakAckHeader.NAK_MSG, id, vid));	    	    copy=msg.Copy();  // save a copy for retransmission requests by receivers	    sent_msgs.put(new Long(id), copy);	    System.out.println("NAKer: SEND #" + id);	    PassDown(new Event(Event.MSG, msg));	}	void Receive(long id, Message msg) {	    Object             sender=msg.GetSrc();	    NakReceiverWindow  win=(NakReceiverWindow)received_msgs.get(sender);	    Message            msg_to_deliver;	    if(win == null) {		win=new NakReceiverWindow(sender, this, 0);		received_msgs.put(sender, win);	    }	    System.out.println("NAKer: RECV #" + id + "\n");	    	    win.Add(id, msg);	    while(true) {		msg_to_deliver=win.Remove();		if(msg_to_deliver != null)		    PassUp(new Event(Event.MSG, msg_to_deliver));		else		    break;	    }	}    }            class ACKer extends LosslessTransmission implements AckMcastSenderWindow.RetransmitCommand {	public ACKer() {	    if(ackwin == null)		ackwin=new AckMcastSenderWindow(this);	}	void Start() {	    if(ackwin == null)		ackwin=new AckMcastSenderWindow(this);	    ackwin.Start();	}	void Stop() {	    if(ackwin != null)		ackwin.Stop();	}	void Send(Message msg) {	    long    id;	    if(vid == null) {		System.err.println("NAKACK.ACKer.Send(): vid is null !");		return;	    }	    id=ack_seqno++;	    // Send reliably (with ACKs)	    if(ackwin == null)		ackwin=new AckMcastSenderWindow(this);	   	    ackwin.Add(id, msg, view.GetMembers());  // msg is copied, will not be modified by code below  	    msg.AddHeader(new NakAckHeader(NakAckHeader.ACK_MSG, id, vid));	    System.out.println("ACKer: SEND #" + id);	    PassDown(new Event(Event.MSG, msg));	}	void Receive(long id, Message msg) {	    System.out.println("ACKer: RECV #" + id + "\n");	    Message      ack_msg=new Message(msg.GetSrc(), null, null);	    NakAckHeader hdr=new NakAckHeader(NakAckHeader.ACK_RSP, id, vid);	    ack_msg.AddHeader(hdr);	    System.out.println("==> ACK #" + id);	    PassDown(new Event(Event.MSG, ack_msg));	}	void ReceiveAck(long id, Object sender) {	    if(ackwin == null) {		System.err.println("NAKACK.ACKer.ReceiveAck(): ack window is null !");		return;	    }	    ackwin.Ack(id, sender);	}	/**	   Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already	   a copy, so does not need to be copied again.	 */	public void Retransmit(long seqno, Message msg, Object dest) {	    NakAckHeader hdr=new NakAckHeader(NakAckHeader.ACK_MSG, seqno, vid);	    msg.SetDest(dest);	    msg.AddHeader(hdr);	    PassDown(new Event(Event.MSG, msg));	}	    }            public  String  GetName()       {return "NAKACK";}        /**       <b>Callback</b>. Called by superclass when event may be handled.<p>       <b>Do not use <code>PassUp</code> in this method as the event is passed up       by default by the superclass after this method returns !</b>       @return boolean Defaults to true. If false, event will not be passed up the stack.     */    public void Up(Event evt) {	Object        obj;	NakAckHeader  hdr;	Message       msg;	int           rc;  	switch(evt.GetType()) {	case Event.MSG:	    msg=(Message)evt.GetArg();	    obj=msg.PeekHeader();	    if(obj == null || !(obj instanceof NakAckHeader))		break;  // pass up	    	    hdr=(NakAckHeader)msg.RemoveHeader();	    if(vid != null) {		if(hdr.vid == null) {		    System.err.println("NAKACK.Up(): message's view is null ! " +				       "Cannot check against our own view !");		}		else {		    rc=hdr.vid.Compare(vid);		    if(rc == 0) {               // same vid -> OK			;		    }		    else if(rc < 0) {           // message sent in prev. view -> discard !			System.err.println("NAKACK.Up(): message's vid is smaller than " +					   "current vid: message is discarded !");			return;		    }		    else if(rc > 0) {           // message is sent in next view -> store !			System.out.println("NAKACK.Up(): message's vid is bigger than " +					   "current vid: message is queued !");									// +++ Implement: queue message								    }		    else {			System.err.println("NAKACK.Up(): comparison of message's vid with ours " +					   "yielded " + rc);		    }		}	    }	    else		System.err.println("NAKACK.Up(): our vid is not available ! "+				   "Cannot check against message's vid !");	    	    switch(hdr.type) {	    case NakAckHeader.NAK_MSG:		naker.Receive(hdr.seqno, msg);		return;        // transmitter passes message up for us !	    case NakAckHeader.ACK_MSG:		acker.Receive(hdr.seqno, msg);		return;        // transmitter passes message up for us !	    case NakAckHeader.RETRANSMIT_MSG:		transmitter.Retransmit(msg.GetSrc(), hdr.seqno, hdr.last_seqno);		return;	    case NakAckHeader.ACK_RSP:		System.out.println("<== ACK # " + hdr.seqno);		acker.ReceiveAck(hdr.seqno, msg.GetSrc());		return;        // discard, no need to pass up	    default:		System.err.println("NAKACK.Up(): NakAck header type " +				   hdr.type + " not known !");		break;	    }	      	}	PassUp(evt);    }    /**       <b>Callback</b>. Called by superclass when event may be handled.<p>       <b>Do not use <code>PassDown</code> in this method as the event is passed down       by default by the superclass after this method returns !</b>       @return boolean Defaults to true. If false, event will not be passed down the stack.    */    public void Down(Event evt) {	Message msg;	switch(evt.GetType()) {	case Event.MSG:	    msg=(Message)evt.GetArg();	    if(msg.GetDest() != null && !((Address)msg.GetDest()).IsMulticastAddress())		break; // unicast address: not null and not mcast, pass down unchanged	    transmitter.Send(msg);	    return;    // don't pass down the stack, transmitter does this for us !	case Event.VIEW_CHANGE:	    view=(View)evt.GetArg();	    vid=new ViewId((Address)view.GetCreator(), view.GetId());	    nak_seqno=0;  // reset sequence number	    ack_seqno=0;  // reset sequence number	    transmitter.Reset();	    // ++ Implement: if there are any queued messages that were previously 	    //               sent for this vid, add those that match the new vid now !	    break;	case Event.SWITCH_NAK:	    acker.Stop();	    transmitter=naker;	    return;              // don't pass down any further	case Event.SWITCH_ACK:	    transmitter=acker;	    acker.Start();	    return;              // don't pass down any further	}	PassDown(evt);    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?