nakack.java
来自「JGRoups源码」· Java 代码 · 共 1,094 行 · 第 1/3 页
JAVA
1,094 行
// $Id: NAKACK.java,v 1.15 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.*;import org.jgroups.util.List;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting * view changes. * <p/> * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos, * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's * address, this makes every out-of-band message unique. Out-of-band messages are used for example for * broadcasting FLUSH messages.<p> * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default * mode NAK is used again. * <p/> * The following communication between 2 peers exists (left side is initiator, * right side receiver): <pre> * <p/> * <p/> * send_out_of_band * --------------> synchronous (1) * <------------- * ack * <p/> * <p/> * send_nak * --------------> asynchronous (2) * <p/> * <p/> * send_nak_ack * --------------> synchronous (3) * <-------------- * ack * <p/> * <p/> * retransmit * <-------------- asynchronous (4) * <p/> * <p/> * </pre> * <p/> * When a message is sent, it will contain a header describing the type of the * message, and containing additional data, such as sequence number etc. When a * message is received, it is fed into either the OutOfBander or NAKer, depending on the * header's type.<p> * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used * frequently, this problem is currently not addressed. * * @author Bela Ban */public class NAKACK extends Protocol { long[] retransmit_timeout={2000, 3000, 5000, 8000}; // time(s) to wait before requesting xmit NAKer naker=null; OutOfBander out_of_bander=null; ViewId vid=null; View view=null; boolean is_server=false; Address local_addr=null; final List queued_msgs=new List(); // msgs for next view (vid > current vid) Vector members=null; // for OutOfBander: this is the destination set to // send messages to boolean send_next_msg_out_of_band=false; boolean send_next_msg_acking=false; long rebroadcast_timeout=0; // until all outstanding ACKs recvd (rebcasting) TimeScheduler timer=null; static final String WRAPPED_MSG_KEY="NAKACK.WRAPPED_HDR"; /** * Do some initial tasks */ public void init() throws Exception { timer=stack != null? stack.timer : null; if(timer == null) if(log.isErrorEnabled()) log.error("timer is null"); naker=new NAKer(); out_of_bander=new OutOfBander(); } public void stop() { out_of_bander.stop(); naker.stop(); } public String getName() { return "NAKACK"; } public Vector providedUpServices() { Vector retval=new Vector(3); retval.addElement(new Integer(Event.GET_MSGS_RECEIVED)); retval.addElement(new Integer(Event.GET_MSG_DIGEST)); retval.addElement(new Integer(Event.GET_MSGS)); return retval; } public Vector providedDownServices() { Vector retval=new Vector(1); retval.addElement(new Integer(Event.GET_MSGS_RECEIVED)); return retval; } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> * <b>Do not use <code>passUp()</code> in this method as the event is passed up * by default by the superclass after this method returns !</b> */ public void up(Event evt) { NakAckHeader hdr; Message msg, msg_copy; int rc; switch(evt.getType()) { case Event.SUSPECT: if(log.isInfoEnabled()) log.info("received SUSPECT event (suspected member=" + evt.getArg() + ')'); naker.suspect((Address)evt.getArg()); out_of_bander.suspect((Address)evt.getArg()); break; case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg naker.stable((long[])evt.getArg()); return; // don't pass up further (Bela Aug 7 2001) case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.GET_MSGS_RECEIVED: // returns the highest seqnos delivered to the appl. (used by STABLE) long[] highest=naker.getHighestSeqnosDelivered(); passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest)); return; // don't pass up further (bela Aug 7 2001) case Event.MSG: synchronized(this) { msg=(Message)evt.getArg(); // check to see if this is a wrapped msg. If yes, send an ACK hdr=(NakAckHeader)msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message if(hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender Message ack_msg=new Message(hdr.sender, null, null); NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null); if(hdr.sender == null) if(warn) log.warn("WRAPPED: header's 'sender' field is null; " + "cannot send ACK !"); ack_msg.putHeader(getName(), h); passDown(new Event(Event.MSG, ack_msg)); } hdr=(NakAckHeader)msg.removeHeader(getName()); if(hdr == null) break; // pass up switch(hdr.type) { case NakAckHeader.NAK_ACK_MSG: case NakAckHeader.NAK_MSG: if(hdr.type == NakAckHeader.NAK_ACK_MSG) { // first thing: send ACK back to sender Message ack_msg=new Message(msg.getSrc(), null, null); NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null); ack_msg.putHeader(getName(), h); passDown(new Event(Event.MSG, ack_msg)); } // while still a client, we just pass up all messages, without checking for message // view IDs or seqnos: other layers further up will discard messages not destined // for us (e.g. based on view IDs). // Also: store msg in queue, when view change is received, replay messages with the same // vid as the new view if(!is_server) { msg_copy=msg.copy(); // msg without header msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above queued_msgs.add(msg_copy); // need a copy since passUp() will modify msg passUp(new Event(Event.MSG, msg)); return; } // check for VIDs: is the message's VID the same as ours ? if(vid != null && hdr.vid != null) { // only check if our vid and message's vid available Address my_addr=vid.getCoordAddress(), other_addr=hdr.vid.getCoordAddress(); if(my_addr == null || other_addr == null) { if(warn) log.warn("my vid or message's vid does not contain " + "a coordinator; discarding message !"); return; } if(!my_addr.equals(other_addr)) { if(warn) log.warn("creator of own vid (" + my_addr + ")is different from " + "creator of message's vid (" + other_addr + "); discarding message !"); return; } rc=hdr.vid.compareTo(vid); if(rc > 0) { // message is sent in next view -> store ! if(log.isInfoEnabled()) log.info("message's vid (" + hdr.vid + '#' + hdr.seqno + ") is bigger than current vid: (" + vid + ") message is queued !"); msg.putHeader(getName(), hdr); // put header back on as we removed it above queued_msgs.add(msg); return; } if(rc < 0) { // message sent in prev. view -> discard ! if(warn) log.warn("message's vid (" + hdr.vid + ") is smaller than " + "current vid (" + vid + "): message <" + msg.getSrc() + ":#" + hdr.seqno + "> is discarded ! Hdr is " + hdr); return; } // If we made it down here, the vids are the same --> OK } msg.putHeader(getName(), hdr); // stored in received_msgs, re-sent later that's why hdr is added ! naker.receive(hdr.seqno, msg, null); return; // naker passes message up for us ! case NakAckHeader.RETRANSMIT_MSG: naker.retransmit(msg.getSrc(), hdr.seqno, hdr.last_seqno); return; case NakAckHeader.NAK_ACK_RSP: naker.receiveAck(hdr.seqno, msg.getSrc()); return; // discard, no need to pass up case NakAckHeader.OUT_OF_BAND_MSG: out_of_bander.receive(hdr.seqno, msg, hdr.stable_msgs); return; // naker passes message up for us ! case NakAckHeader.OUT_OF_BAND_RSP: out_of_bander.receiveAck(hdr.seqno, msg.getSrc()); return; default: if(log.isErrorEnabled()) log.error("NakAck header type " + hdr.type + " not known !"); break; } } //end synchronized } passUp(evt); } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> * <b>Do not use <code>passDown</code> in this method as the event is passed down * by default by the superclass after this method returns !</b> */ public void down(Event evt) { Message msg; if(trace) log.trace("queued_msgs has " + queued_msgs.size() + " messages " + "\n\nnaker:\n" + naker.dumpContents() + "\n\nout_of_bander: " + out_of_bander.dumpContents() + "\n-----------------------------\n"); switch(evt.getType()) { case Event.MSG: msg=(Message)evt.getArg(); // unicast address: not null and not mcast, pass down unchanged if(vid == null || (msg.getDest() != null && !msg.getDest().isMulticastAddress())) break; if(send_next_msg_out_of_band) { out_of_bander.send(msg); send_next_msg_out_of_band=false; } else if(send_next_msg_acking) { naker.setAcks(true); // require acks when sending a msg naker.send(msg); naker.setAcks(false); // don't require acks when sending a msg send_next_msg_acking=false; } else naker.send(msg); return; // don't pass down the stack, naker does this for us ! case Event.GET_MSG_DIGEST: long[] highest_seqnos=(long[])evt.getArg(); Digest digest=naker.computeMessageDigest(highest_seqnos); passUp(new Event(Event.GET_MSG_DIGEST_OK, digest)); return; case Event.GET_MSGS: List lower_seqnos=naker.getMessagesInRange((long[][])evt.getArg()); passUp(new Event(Event.GET_MSGS_OK, lower_seqnos)); return; case Event.REBROADCAST_MSGS: rebroadcastMsgs((Vector)evt.getArg()); break; case Event.TMP_VIEW: Vector mbrs=((View)evt.getArg()).getMembers(); members=mbrs != null? (Vector)mbrs.clone() : new Vector(11); break; case Event.VIEW_CHANGE: synchronized(this) { view=((View)((View)evt.getArg()).clone()); vid=view.getVid(); members=(Vector)view.getMembers().clone(); naker.reset(); out_of_bander.reset(); is_server=true; // check vids from now on // deliver messages received previously for this view if(queued_msgs.size() > 0) deliverQueuedMessages(); } break; case Event.BECOME_SERVER: is_server=true; break; case Event.SWITCH_NAK: naker.setAcks(false); // don't require acks when sending a msg return; // don't pass down any further case Event.SWITCH_NAK_ACK: send_next_msg_acking=true; return; // don't pass down any further case Event.SWITCH_OUT_OF_BAND: send_next_msg_out_of_band=true; return;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?