nakack.java.messageprotocol

来自「JGRoups源码」· MESSAGEPROTOCOL 代码 · 共 423 行

MESSAGEPROTOCOL
423
字号
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class NakAckHeader implements Serializable {    public static final int NAK_MSG        = 1;  // asynchronous msg    public static final int ACK_MSG        = 2;  // synchronous msg    public static final int RETRANSMIT_MSG = 3;  // retransmit msg    int     type=0;    long    seqno=-1;    ViewId  vid=null;        NakAckHeader(int type, long seqno, ViewId vid) {	this.type=type;	this.seqno=seqno;	this.vid=vid;    }    String Type2Str(int t) {	switch(t) {	case NAK_MSG: return "NAK_MSG";	case ACK_MSG: return "ACK_MSG";	case RETRANSMIT_MSG: return "RETRANSMIT_MSG";	default: return "<undefined>";	}    }    public String toString() {	return "[NAKACK: " + Type2Str(type) + ", seqno=" + seqno + ", vid=" + vid + "]";    }}/**   Combined NAK and ACK class. The layer contains a NAKer and an ACKer: depending on which   one is set, a message will be sent using a negative acknowldegment scheme (NAK) or regular   acknowledgement (ACK). Event SET_ACK_MODE switches between the 2 modes. The following   communication between 2 peers exists (left side is initiator, right side receiver):   <pre>      send_nak   -------------->       asynchronous      send_ack   ------------->>       synchronous   <--------------        ack      retransmit   <--------------       asynchronous   </pre>      When using NAKs, a messages is just tagged with a sequence number (seqno) and   broadcast asynchronously. A receiver requests retransmissions (asynchronous) for gaps   between received messages. When ACKs are used, each message is broadcast to the current   membership synchronously and only when a response from each member has been received   (or a suspect notification) will the call return. If a response has not been received after   a timeout, the message will be retransmitted to the corresponding member. If the response   is not received after n retries, and no suspect notification has been received, the member   will be suspected.      When a message is sent, it will contain a header describing the type of the message, and containing   additional data, such as sequence number etc. When a message is received, it is fed into either   the ACKer or NAKer, depending on the header's type.*/public class NAKACK extends MessageProtocol {    LosslessTransmission  naker=new NAKer();             //     LosslessTransmission  acker=new ACKer();             //     LosslessTransmission  transmitter=naker;             // NAK is default    long                  seqno=0;                       // current message sequence number    Hashtable             received_msgs=new Hashtable(); // ordered by sender -> NakReceiverWindow    Hashtable             sent_msgs=new Hashtable();     // ordered by seqno (sent by me !)    ViewId                vid=null;    View                  view=null;    //Object                myaddr=null;        class LosslessTransmission implements NakReceiverWindow.RetransmitCommand {	void Reset() {	    NakReceiverWindow win;	    sent_msgs.clear();	    	    for(Enumeration e=received_msgs.elements(); e.hasMoreElements();) {		win=(NakReceiverWindow)e.nextElement();		win.Reset();	    }	    received_msgs.clear();	}	void Send(long id, Message msg) {	}	void Receive(long id, Message msg) {	    Object             sender=msg.GetSrc();	    NakReceiverWindow  win=(NakReceiverWindow)received_msgs.get(sender);	    Message            msg_to_deliver;	    if(win == null) {		win=new NakReceiverWindow(sender, this, 0);		received_msgs.put(sender, win);	    }	    System.out.println("RECV #" + id);	    	    win.Add(id, msg);	    while(true) {		msg_to_deliver=win.Remove();		if(msg_to_deliver != null)		    PassUp(new Event(Event.MSG, msg_to_deliver));		else		    break;	    }	}	/** Called by retransmission thread when gap is detected. Sends retr. request	    to originator of msg */	public void Retransmit(long seqno, Object sender, int num_tries) {	    System.out.println("--> Retransmit(" + seqno + ") to " + sender);	    	    NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, seqno, vid);	    Message      retransmit_msg=new Message(sender, null, null);	    retransmit_msg.AddHeader(hdr);	    try {		SendMessage(retransmit_msg, GroupRequest.GET_NONE, 0); // non-blocking request	    }	    catch(Exception e) {		System.err.println("NAKACK.NAKer.Retransmit(): " + e);	    }	}	// Retransmit from sent-table	void Retransmit(Object dest, long id) {	    Message m=(Message)sent_msgs.get(new Long(id)), retr_msg;	    if(m == null) {		System.err.println("NAKACK.LosslessTransmission.Retransmit(): message with " +				   "seqno=" + id + " not found !");		return;	    }	    	    retr_msg=m.Copy();	    retr_msg.SetDest(dest);	    	    try {		SendMessage(retr_msg, GroupRequest.GET_NONE, 0);  // non-blocking	    }	    catch(Exception e) {		System.err.println("NAKACK.LosslessTransmission.Retransmit(): " +e);	    }	}	void Stable(long id) {}    }            class NAKer extends LosslessTransmission {	void Send(long id, Message msg) {	    Message copy;	    if(vid == null) {		System.err.println("NAKACK.NAKer.Send(): vid is null !");		return;	    }	    msg.AddHeader(new NakAckHeader(NakAckHeader.NAK_MSG, id, vid));	    	    copy=msg.Copy();	    sent_msgs.put(new Long(id), copy);	    System.out.println("SEND #" + id + ", msg is " + msg);	    //  	    if(id % 7 == 0) {	    //transmitter.Retransmit(id, myaddr, 0);	    //return;  	    //}	    CastMessage(view.GetMembers(), msg, GroupRequest.GET_NONE, 0); // don't wait for rsps	}    }            class ACKer extends LosslessTransmission {	// contains sent_table	void Send(long id, Message msg) {	    if(vid == null) {		System.err.println("NAKACK.ACKer.Send(): vid is null !");		return;	    }	    // +++ Send reliably (with ACKs)	    //  	    msg.AddHeader(new NakAckHeader(NakAckHeader.ACK_MSG, id, vid));	    // sent_msgs.put(new Long(id), msg);//  	    System.out.println("NAKer: sending message #" + id + " --> " + msg);//  	    PassDown(new Event(Event.MSG, msg));	}	    }            private long    GetNextSeqno()  {return seqno++;}    public  String  GetName()       {return "NAKACK";}    /**       <b>Callback</b>. Called when a request for this protocol layer is received.     */    public Object Handle(Message req) {	Object         obj;	NakAckHeader   hdr;	int            rc;	obj=req.PeekHeader();	if(obj == null || !(obj instanceof NakAckHeader)) {	    System.err.println("NAKACK.Handle(): message does not contain a NakAckHeader !");	    return null;	}	    	hdr=(NakAckHeader)req.RemoveHeader();	if(vid != null) {	    if(hdr.vid == null) {		System.err.println("NAKACK.Handle(): message's view is null ! " +				   "Cannot check against our own view !");	    }	    else {		rc=hdr.vid.Compare(vid);		if(rc == 0) {               // same vid -> OK		    ;		}		else if(rc < 0) {           // message sent in prev. view -> discard !		    System.err.println("NAKACK.Handle(): message's vid is smaller than " +				       "current vid: message is discarded !");		    return null;		}		else if(rc > 0) {		    System.out.println("NAKACK.Handle(): message's vid is bigger than " +				       "current vid: message is queued !");		    		    // +++ Implement: queue message		}		else {		    System.err.println("NAKACK.Handle(): comparison of message's vid with ours " +				       "yielded " + rc);		}	    }	}	else	    System.err.println("NAKACK.Handle(): our vid is not available ! "+			      "Cannot check against message's vid !");		    	switch(hdr.type) {	case NakAckHeader.NAK_MSG:	case NakAckHeader.ACK_MSG:	    transmitter.Receive(hdr.seqno, req);	    break;	case NakAckHeader.RETRANSMIT_MSG:	    transmitter.Retransmit(req.GetSrc(), hdr.seqno);	    break;	default:	    System.err.println("NAKACK.HandleUpEvent(): NakAck header type " +			       hdr.type + " not known !");	    break;	}		return null;    }        /**       <b>Callback</b>. Called by superclass when event may be handled.<p>       <b>Do not use <code>PassUp</code> in this method as the event is passed up       by default by the superclass after this method returns !</b>       @return boolean Defaults to true. If false, event will not be passed up the stack.     */    public boolean HandleUpEvent(Event evt) {//    	switch(evt.GetType()) {//    	case Event.SET_LOCAL_ADDRESS://    	    myaddr=evt.GetArg();//    	    break;//    	}	return true;    }    /**       <b>Callback</b>. Called by superclass when event may be handled.<p>       <b>Do not use <code>PassDown</code> in this method as the event is passed down       by default by the superclass after this method returns !</b>       @return boolean Defaults to true. If false, event will not be passed down the stack.    */    public boolean HandleDownEvent(Event evt) {	Message msg;	switch(evt.GetType()) {	case Event.MSG:	    msg=(Message)evt.GetArg();	    if(msg.GetDest() != null && !((Address)msg.GetDest()).IsMulticastAddress())		return true; // unicast address: not null and not mcast, pass down unchanged	    transmitter.Send(GetNextSeqno(), msg);	    return false;    // don't pass down the stack, transmitter does this for us !	case Event.VIEW_CHANGE:	    view=(View)evt.GetArg();	    vid=new ViewId((Address)view.GetCreator(), view.GetId());	    seqno=0;  // reset sequence number	    transmitter.Reset();	    // ++ Implement: if there are any queued messages that were previously 	    //               sent for this vid, add those that match the new vid now !	    break;	case Event.SWITCH_NAK:	    transmitter=naker;	    break;	case Event.SWITCH_ACK:	    transmitter=acker;	    break;	}	return true;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?