nakack.java.messageprotocol
来自「JGRoups源码」· MESSAGEPROTOCOL 代码 · 共 423 行
MESSAGEPROTOCOL
423 行
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class NakAckHeader implements Serializable { public static final int NAK_MSG = 1; // asynchronous msg public static final int ACK_MSG = 2; // synchronous msg public static final int RETRANSMIT_MSG = 3; // retransmit msg int type=0; long seqno=-1; ViewId vid=null; NakAckHeader(int type, long seqno, ViewId vid) { this.type=type; this.seqno=seqno; this.vid=vid; } String Type2Str(int t) { switch(t) { case NAK_MSG: return "NAK_MSG"; case ACK_MSG: return "ACK_MSG"; case RETRANSMIT_MSG: return "RETRANSMIT_MSG"; default: return "<undefined>"; } } public String toString() { return "[NAKACK: " + Type2Str(type) + ", seqno=" + seqno + ", vid=" + vid + "]"; }}/** Combined NAK and ACK class. The layer contains a NAKer and an ACKer: depending on which one is set, a message will be sent using a negative acknowldegment scheme (NAK) or regular acknowledgement (ACK). Event SET_ACK_MODE switches between the 2 modes. The following communication between 2 peers exists (left side is initiator, right side receiver): <pre> send_nak --------------> asynchronous send_ack ------------->> synchronous <-------------- ack retransmit <-------------- asynchronous </pre> When using NAKs, a messages is just tagged with a sequence number (seqno) and broadcast asynchronously. A receiver requests retransmissions (asynchronous) for gaps between received messages. When ACKs are used, each message is broadcast to the current membership synchronously and only when a response from each member has been received (or a suspect notification) will the call return. If a response has not been received after a timeout, the message will be retransmitted to the corresponding member. If the response is not received after n retries, and no suspect notification has been received, the member will be suspected. When a message is sent, it will contain a header describing the type of the message, and containing additional data, such as sequence number etc. When a message is received, it is fed into either the ACKer or NAKer, depending on the header's type.*/public class NAKACK extends MessageProtocol { LosslessTransmission naker=new NAKer(); // LosslessTransmission acker=new ACKer(); // LosslessTransmission transmitter=naker; // NAK is default long seqno=0; // current message sequence number Hashtable received_msgs=new Hashtable(); // ordered by sender -> NakReceiverWindow Hashtable sent_msgs=new Hashtable(); // ordered by seqno (sent by me !) ViewId vid=null; View view=null; //Object myaddr=null; class LosslessTransmission implements NakReceiverWindow.RetransmitCommand { void Reset() { NakReceiverWindow win; sent_msgs.clear(); for(Enumeration e=received_msgs.elements(); e.hasMoreElements();) { win=(NakReceiverWindow)e.nextElement(); win.Reset(); } received_msgs.clear(); } void Send(long id, Message msg) { } void Receive(long id, Message msg) { Object sender=msg.GetSrc(); NakReceiverWindow win=(NakReceiverWindow)received_msgs.get(sender); Message msg_to_deliver; if(win == null) { win=new NakReceiverWindow(sender, this, 0); received_msgs.put(sender, win); } System.out.println("RECV #" + id); win.Add(id, msg); while(true) { msg_to_deliver=win.Remove(); if(msg_to_deliver != null) PassUp(new Event(Event.MSG, msg_to_deliver)); else break; } } /** Called by retransmission thread when gap is detected. Sends retr. request to originator of msg */ public void Retransmit(long seqno, Object sender, int num_tries) { System.out.println("--> Retransmit(" + seqno + ") to " + sender); NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, seqno, vid); Message retransmit_msg=new Message(sender, null, null); retransmit_msg.AddHeader(hdr); try { SendMessage(retransmit_msg, GroupRequest.GET_NONE, 0); // non-blocking request } catch(Exception e) { System.err.println("NAKACK.NAKer.Retransmit(): " + e); } } // Retransmit from sent-table void Retransmit(Object dest, long id) { Message m=(Message)sent_msgs.get(new Long(id)), retr_msg; if(m == null) { System.err.println("NAKACK.LosslessTransmission.Retransmit(): message with " + "seqno=" + id + " not found !"); return; } retr_msg=m.Copy(); retr_msg.SetDest(dest); try { SendMessage(retr_msg, GroupRequest.GET_NONE, 0); // non-blocking } catch(Exception e) { System.err.println("NAKACK.LosslessTransmission.Retransmit(): " +e); } } void Stable(long id) {} } class NAKer extends LosslessTransmission { void Send(long id, Message msg) { Message copy; if(vid == null) { System.err.println("NAKACK.NAKer.Send(): vid is null !"); return; } msg.AddHeader(new NakAckHeader(NakAckHeader.NAK_MSG, id, vid)); copy=msg.Copy(); sent_msgs.put(new Long(id), copy); System.out.println("SEND #" + id + ", msg is " + msg); // if(id % 7 == 0) { //transmitter.Retransmit(id, myaddr, 0); //return; //} CastMessage(view.GetMembers(), msg, GroupRequest.GET_NONE, 0); // don't wait for rsps } } class ACKer extends LosslessTransmission { // contains sent_table void Send(long id, Message msg) { if(vid == null) { System.err.println("NAKACK.ACKer.Send(): vid is null !"); return; } // +++ Send reliably (with ACKs) // msg.AddHeader(new NakAckHeader(NakAckHeader.ACK_MSG, id, vid)); // sent_msgs.put(new Long(id), msg);// System.out.println("NAKer: sending message #" + id + " --> " + msg);// PassDown(new Event(Event.MSG, msg)); } } private long GetNextSeqno() {return seqno++;} public String GetName() {return "NAKACK";} /** <b>Callback</b>. Called when a request for this protocol layer is received. */ public Object Handle(Message req) { Object obj; NakAckHeader hdr; int rc; obj=req.PeekHeader(); if(obj == null || !(obj instanceof NakAckHeader)) { System.err.println("NAKACK.Handle(): message does not contain a NakAckHeader !"); return null; } hdr=(NakAckHeader)req.RemoveHeader(); if(vid != null) { if(hdr.vid == null) { System.err.println("NAKACK.Handle(): message's view is null ! " + "Cannot check against our own view !"); } else { rc=hdr.vid.Compare(vid); if(rc == 0) { // same vid -> OK ; } else if(rc < 0) { // message sent in prev. view -> discard ! System.err.println("NAKACK.Handle(): message's vid is smaller than " + "current vid: message is discarded !"); return null; } else if(rc > 0) { System.out.println("NAKACK.Handle(): message's vid is bigger than " + "current vid: message is queued !"); // +++ Implement: queue message } else { System.err.println("NAKACK.Handle(): comparison of message's vid with ours " + "yielded " + rc); } } } else System.err.println("NAKACK.Handle(): our vid is not available ! "+ "Cannot check against message's vid !"); switch(hdr.type) { case NakAckHeader.NAK_MSG: case NakAckHeader.ACK_MSG: transmitter.Receive(hdr.seqno, req); break; case NakAckHeader.RETRANSMIT_MSG: transmitter.Retransmit(req.GetSrc(), hdr.seqno); break; default: System.err.println("NAKACK.HandleUpEvent(): NakAck header type " + hdr.type + " not known !"); break; } return null; } /** <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this method as the event is passed up by default by the superclass after this method returns !</b> @return boolean Defaults to true. If false, event will not be passed up the stack. */ public boolean HandleUpEvent(Event evt) {// switch(evt.GetType()) {// case Event.SET_LOCAL_ADDRESS:// myaddr=evt.GetArg();// break;// } return true; } /** <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassDown</code> in this method as the event is passed down by default by the superclass after this method returns !</b> @return boolean Defaults to true. If false, event will not be passed down the stack. */ public boolean HandleDownEvent(Event evt) { Message msg; switch(evt.GetType()) { case Event.MSG: msg=(Message)evt.GetArg(); if(msg.GetDest() != null && !((Address)msg.GetDest()).IsMulticastAddress()) return true; // unicast address: not null and not mcast, pass down unchanged transmitter.Send(GetNextSeqno(), msg); return false; // don't pass down the stack, transmitter does this for us ! case Event.VIEW_CHANGE: view=(View)evt.GetArg(); vid=new ViewId((Address)view.GetCreator(), view.GetId()); seqno=0; // reset sequence number transmitter.Reset(); // ++ Implement: if there are any queued messages that were previously // sent for this vid, add those that match the new vid now ! break; case Event.SWITCH_NAK: transmitter=naker; break; case Event.SWITCH_ACK: transmitter=acker; break; } return true; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?