nakack.java.separate
来自「JGRoups源码」· SEPARATE 代码 · 共 485 行
SEPARATE
485 行
package JavaGroups.JavaStack.Protocols;import java.io.*;import java.util.*;import JavaGroups.*;import JavaGroups.JavaStack.*;class NakAckHeader implements Serializable { public static final int NAK_MSG = 1; // asynchronous msg public static final int ACK_MSG = 2; // synchronous msg public static final int RETRANSMIT_MSG = 3; // retransmit msg public static final int ACK_RSP = 4; // ack to ACK_MSG, seqno contains ACKed int type=0; long seqno=-1; // either reg. ACK_MSG or first_seqno in retransmissions long last_seqno=-1; // used for retransmissions ViewId vid=null; NakAckHeader(int type, long seqno, ViewId vid) { this.type=type; this.seqno=seqno; this.vid=vid; } String Type2Str(int t) { switch(t) { case NAK_MSG: return "NAK_MSG"; case ACK_MSG: return "ACK_MSG"; case RETRANSMIT_MSG: return "RETRANSMIT_MSG"; case ACK_RSP: return "ACK_RSP"; default: return "<undefined>"; } } public String toString() { return "[NAKACK: " + Type2Str(type) + ", seqno=" + seqno + ", last_seqno=" + last_seqno + ", vid=" + vid + "]"; }}/** Combined NAK and ACK class. The layer contains a NAKer and an ACKer: depending on which one is set, a message will be sent using a negative acknowldegment scheme (NAK) or regular acknowledgement (ACK). Event SET_ACK_MODE switches between the 2 modes. The following communication between 2 peers exists (left side is initiator, right side receiver): <pre> send_nak --------------> asynchronous send_ack ------------->> synchronous <-------------- ack retransmit <-------------- asynchronous </pre> When using NAKs, a messages is just tagged with a sequence number (seqno) and broadcast asynchronously. A receiver requests retransmissions (asynchronous) for gaps between received messages. When ACKs are used, each message is broadcast to the current membership synchronously and only when a response from each member has been received (or a suspect notification) will the call return. If a response has not been received after a timeout, the message will be retransmitted to the corresponding member. If the response is not received after n retries, and no suspect notification has been received, the member will be suspected. When a message is sent, it will contain a header describing the type of the message, and containing additional data, such as sequence number etc. When a message is received, it is fed into either the ACKer or NAKer, depending on the header's type. */public class NAKACK extends Protocol { NAKer naker=new NAKer(); ACKer acker=new ACKer(); LosslessTransmission transmitter=naker; // NAK is default long nak_seqno=0; // current message sequence number for NAK msgs long ack_seqno=0; // current message sequence number for ACK msgs Hashtable received_msgs=new Hashtable(); // ordered by sender -> NakReceiverWindow Hashtable sent_msgs=new Hashtable(); // ordered by seqno (sent by me !) AckMcastSenderWindow ackwin=null; ViewId vid=null; View view=null; class LosslessTransmission implements NakReceiverWindow.RetransmitCommand { void Start() {} void Stop() {} void Reset() { NakReceiverWindow win; sent_msgs.clear(); for(Enumeration e=received_msgs.elements(); e.hasMoreElements();) { win=(NakReceiverWindow)e.nextElement(); win.Reset(); } received_msgs.clear(); } void Send(Message msg) { ; } void Receive(long id, Message msg) { ; } /** Called by retransmission thread when gap is detected. Sends retr. request to originator of msg */ public void Retransmit(long first_seqno, long last_seqno, Object sender) { System.out.println("NAKer --> Retransmit([" + first_seqno + ", " + last_seqno + "]) to " + sender); NakAckHeader hdr=new NakAckHeader(NakAckHeader.RETRANSMIT_MSG, first_seqno, vid); Message retransmit_msg=new Message(sender, null, null); hdr.last_seqno=last_seqno; retransmit_msg.AddHeader(hdr); try { PassDown(new Event(Event.MSG, retransmit_msg)); } catch(Exception e) { System.err.println("NAKACK.NAKer.Retransmit(): " + e); } } // Retransmit from sent-table void Retransmit(Object dest, long first_seqno, long last_seqno) { Message m, retr_msg; for(long i=first_seqno; i <= last_seqno; i++) { m=(Message)sent_msgs.get(new Long(i)); if(m == null) { System.err.println("NAKACK.LosslessTransmission.Retransmit(): message with " + "seqno=" + i + " not found !"); continue; } retr_msg=m.Copy(); retr_msg.SetDest(dest); try { PassDown(new Event(Event.MSG, retr_msg)); } catch(Exception e) { System.err.println("NAKACK.LosslessTransmission.Retransmit(): " + e); } } } } class NAKer extends LosslessTransmission { void Send(Message msg) { Message copy; long id; if(vid == null) { System.err.println("NAKACK.NAKer.Send(): vid is null !"); return; } id=nak_seqno++; msg.AddHeader(new NakAckHeader(NakAckHeader.NAK_MSG, id, vid)); copy=msg.Copy(); // save a copy for retransmission requests by receivers sent_msgs.put(new Long(id), copy); System.out.println("NAKer: SEND #" + id); PassDown(new Event(Event.MSG, msg)); } void Receive(long id, Message msg) { Object sender=msg.GetSrc(); NakReceiverWindow win=(NakReceiverWindow)received_msgs.get(sender); Message msg_to_deliver; if(win == null) { win=new NakReceiverWindow(sender, this, 0); received_msgs.put(sender, win); } System.out.println("NAKer: RECV #" + id + "\n"); win.Add(id, msg); while(true) { msg_to_deliver=win.Remove(); if(msg_to_deliver != null) PassUp(new Event(Event.MSG, msg_to_deliver)); else break; } } } class ACKer extends LosslessTransmission implements AckMcastSenderWindow.RetransmitCommand { public ACKer() { if(ackwin == null) ackwin=new AckMcastSenderWindow(this); } void Start() { if(ackwin == null) ackwin=new AckMcastSenderWindow(this); ackwin.Start(); } void Stop() { if(ackwin != null) ackwin.Stop(); } void Send(Message msg) { long id; if(vid == null) { System.err.println("NAKACK.ACKer.Send(): vid is null !"); return; } id=ack_seqno++; // Send reliably (with ACKs) if(ackwin == null) ackwin=new AckMcastSenderWindow(this); ackwin.Add(id, msg, view.GetMembers()); // msg is copied, will not be modified by code below msg.AddHeader(new NakAckHeader(NakAckHeader.ACK_MSG, id, vid)); System.out.println("ACKer: SEND #" + id); PassDown(new Event(Event.MSG, msg)); } void Receive(long id, Message msg) { System.out.println("ACKer: RECV #" + id + "\n"); Message ack_msg=new Message(msg.GetSrc(), null, null); NakAckHeader hdr=new NakAckHeader(NakAckHeader.ACK_RSP, id, vid); ack_msg.AddHeader(hdr); System.out.println("==> ACK #" + id); PassDown(new Event(Event.MSG, ack_msg)); } void ReceiveAck(long id, Object sender) { if(ackwin == null) { System.err.println("NAKACK.ACKer.ReceiveAck(): ack window is null !"); return; } ackwin.Ack(id, sender); } /** Called by retransmission thread of AckMcastSenderWindow. <code>msg</code> is already a copy, so does not need to be copied again. */ public void Retransmit(long seqno, Message msg, Object dest) { NakAckHeader hdr=new NakAckHeader(NakAckHeader.ACK_MSG, seqno, vid); msg.SetDest(dest); msg.AddHeader(hdr); PassDown(new Event(Event.MSG, msg)); } } public String GetName() {return "NAKACK";} /** <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this method as the event is passed up by default by the superclass after this method returns !</b> @return boolean Defaults to true. If false, event will not be passed up the stack. */ public void Up(Event evt) { Object obj; NakAckHeader hdr; Message msg; int rc; switch(evt.GetType()) { case Event.MSG: msg=(Message)evt.GetArg(); obj=msg.PeekHeader(); if(obj == null || !(obj instanceof NakAckHeader)) break; // pass up hdr=(NakAckHeader)msg.RemoveHeader(); if(vid != null) { if(hdr.vid == null) { System.err.println("NAKACK.Up(): message's view is null ! " + "Cannot check against our own view !"); } else { rc=hdr.vid.Compare(vid); if(rc == 0) { // same vid -> OK ; } else if(rc < 0) { // message sent in prev. view -> discard ! System.err.println("NAKACK.Up(): message's vid is smaller than " + "current vid: message is discarded !"); return; } else if(rc > 0) { // message is sent in next view -> store ! System.out.println("NAKACK.Up(): message's vid is bigger than " + "current vid: message is queued !"); // +++ Implement: queue message } else { System.err.println("NAKACK.Up(): comparison of message's vid with ours " + "yielded " + rc); } } } else System.err.println("NAKACK.Up(): our vid is not available ! "+ "Cannot check against message's vid !"); switch(hdr.type) { case NakAckHeader.NAK_MSG: naker.Receive(hdr.seqno, msg); return; // transmitter passes message up for us ! case NakAckHeader.ACK_MSG: acker.Receive(hdr.seqno, msg); return; // transmitter passes message up for us ! case NakAckHeader.RETRANSMIT_MSG: transmitter.Retransmit(msg.GetSrc(), hdr.seqno, hdr.last_seqno); return; case NakAckHeader.ACK_RSP: System.out.println("<== ACK # " + hdr.seqno); acker.ReceiveAck(hdr.seqno, msg.GetSrc()); return; // discard, no need to pass up default: System.err.println("NAKACK.Up(): NakAck header type " + hdr.type + " not known !"); break; } } PassUp(evt); } /** <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassDown</code> in this method as the event is passed down by default by the superclass after this method returns !</b> @return boolean Defaults to true. If false, event will not be passed down the stack. */ public void Down(Event evt) { Message msg; switch(evt.GetType()) { case Event.MSG: msg=(Message)evt.GetArg(); if(msg.GetDest() != null && !((Address)msg.GetDest()).IsMulticastAddress()) break; // unicast address: not null and not mcast, pass down unchanged transmitter.Send(msg); return; // don't pass down the stack, transmitter does this for us ! case Event.VIEW_CHANGE: view=(View)evt.GetArg(); vid=new ViewId((Address)view.GetCreator(), view.GetId()); nak_seqno=0; // reset sequence number ack_seqno=0; // reset sequence number transmitter.Reset(); // ++ Implement: if there are any queued messages that were previously // sent for this vid, add those that match the new vid now ! break; case Event.SWITCH_NAK: acker.Stop(); transmitter=naker; return; // don't pass down any further case Event.SWITCH_ACK: transmitter=acker; acker.Start(); return; // don't pass down any further } PassDown(evt); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?