nakack.java

来自「JGRoups源码」· Java 代码 · 共 1,094 行 · 第 1/3 页

JAVA
1,094
字号
// $Id: NAKACK.java,v 1.15 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.*;import org.jgroups.util.List;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Negative AcKnowledgement layer (NAKs), paired with positive ACKs. The default is to send a message * using NAKs: the sender sends messages with monotonically increasing seqnos, receiver requests * retransmissions of missing messages (gaps). When a SWITCH_NAK_ACK event is received, the mode * is switched to using NAK_ACKS: the sender still uses monotonically increasing seqnos, but the receiver * acknowledges every message. NAK and NAK_ACK seqnos are the same, when switching the mode, the current * seqno is reused. Both NAK and NAK_ACK messages use the current view ID in which the message is sent to * queue messages destined for an upcoming view, or discard messages sent in a previous view. Both modes * reset their seqnos to 0 when receiving a view change. The NAK_ACK scheme is used for broadcasting * view changes. * <p/> * The third mode is for out-of-band control messages (activated by SWITCH_OUT_OF_BAND): this mode does * neither employ view IDs, nor does it use the same seqnos as NAK and NAK_ACK. It uses its own seqnos, * unrelated to the ones used by NAK and NAK_ACK, and never resets them. In combination with the sender's * address, this makes every out-of-band message unique. Out-of-band messages are used for example for * broadcasting FLUSH messages.<p> * Once a mode is set, it remains in effect until exactly 1 message has been sent, afterwards the default * mode NAK is used again. * <p/> * The following communication between 2 peers exists (left side is initiator, * right side receiver): <pre> * <p/> * <p/> * send_out_of_band * -------------->       synchronous     (1) * <------------- * ack * <p/> * <p/> * send_nak * -------------->       asynchronous    (2) * <p/> * <p/> * send_nak_ack * -------------->       synchronous     (3) * <-------------- * ack * <p/> * <p/> * retransmit * <--------------       asynchronous    (4) * <p/> * <p/> * </pre> * <p/> * When a message is sent, it will contain a header describing the type of the * message, and containing additional data, such as sequence number etc. When a * message is received, it is fed into either the OutOfBander or NAKer, depending on the * header's type.<p> * Note that in the synchronous modes, ACKs are sent for each request. If a reliable unicast protocol layer * exists somewhere underneath this layer, then even the ACKs are transmitted reliably, thus increasing * the number of messages exchanged. However, since it is envisaged that ACK/OUT_OF_BAND are not used * frequently, this problem is currently not addressed. *  * @author Bela Ban */public class NAKACK extends Protocol {    long[] retransmit_timeout={2000, 3000, 5000, 8000};  // time(s) to wait before requesting xmit    NAKer naker=null;    OutOfBander out_of_bander=null;    ViewId vid=null;    View view=null;    boolean is_server=false;    Address local_addr=null;    final List queued_msgs=new List();      // msgs for next view (vid > current vid)    Vector members=null;                // for OutOfBander: this is the destination set to    // send messages to    boolean send_next_msg_out_of_band=false;    boolean send_next_msg_acking=false;    long rebroadcast_timeout=0;       // until all outstanding ACKs recvd (rebcasting)    TimeScheduler timer=null;    static final String WRAPPED_MSG_KEY="NAKACK.WRAPPED_HDR";    /**     * Do some initial tasks     */    public void init() throws Exception {        timer=stack != null? stack.timer : null;        if(timer == null)            if(log.isErrorEnabled()) log.error("timer is null");        naker=new NAKer();        out_of_bander=new OutOfBander();    }    public void stop() {        out_of_bander.stop();        naker.stop();    }    public String getName() {        return "NAKACK";    }    public Vector providedUpServices() {        Vector retval=new Vector(3);        retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));        retval.addElement(new Integer(Event.GET_MSG_DIGEST));        retval.addElement(new Integer(Event.GET_MSGS));        return retval;    }    public Vector providedDownServices() {        Vector retval=new Vector(1);        retval.addElement(new Integer(Event.GET_MSGS_RECEIVED));        return retval;    }    /**     * <b>Callback</b>. Called by superclass when event may be handled.<p>     * <b>Do not use <code>passUp()</code> in this method as the event is passed up     * by default by the superclass after this method returns !</b>     */    public void up(Event evt) {        NakAckHeader hdr;        Message msg, msg_copy;        int rc;        switch(evt.getType()) {            case Event.SUSPECT:                    if(log.isInfoEnabled()) log.info("received SUSPECT event (suspected member=" + evt.getArg() + ')');                naker.suspect((Address)evt.getArg());                out_of_bander.suspect((Address)evt.getArg());                break;            case Event.STABLE:  // generated by STABLE layer. Delete stable messages passed in arg                naker.stable((long[])evt.getArg());                return; // don't pass up further (Bela Aug 7 2001)            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.GET_MSGS_RECEIVED:  // returns the highest seqnos delivered to the appl. (used by STABLE)                long[] highest=naker.getHighestSeqnosDelivered();                passDown(new Event(Event.GET_MSGS_RECEIVED_OK, highest));                return; // don't pass up further (bela Aug 7 2001)            case Event.MSG:                synchronized(this) {                    msg=(Message)evt.getArg();                    // check to see if this is a wrapped msg. If yes, send an ACK                    hdr=(NakAckHeader)msg.removeHeader(WRAPPED_MSG_KEY); // see whether it is a wrapped message                    if(hdr != null && hdr.type == NakAckHeader.WRAPPED_MSG) { // send back an ACK to hdr.sender                        Message ack_msg=new Message(hdr.sender, null, null);                        NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);                        if(hdr.sender == null)                            if(warn) log.warn("WRAPPED: header's 'sender' field is null; " +                                    "cannot send ACK !");                        ack_msg.putHeader(getName(), h);                        passDown(new Event(Event.MSG, ack_msg));                    }                    hdr=(NakAckHeader)msg.removeHeader(getName());                    if(hdr == null)                        break;  // pass up                    switch(hdr.type) {                        case NakAckHeader.NAK_ACK_MSG:                        case NakAckHeader.NAK_MSG:                            if(hdr.type == NakAckHeader.NAK_ACK_MSG) {  // first thing: send ACK back to sender                                Message ack_msg=new Message(msg.getSrc(), null, null);                                NakAckHeader h=new NakAckHeader(NakAckHeader.NAK_ACK_RSP, hdr.seqno, null);                                ack_msg.putHeader(getName(), h);                                passDown(new Event(Event.MSG, ack_msg));                            }                            // while still a client, we just pass up all messages, without checking for message                            // view IDs or seqnos: other layers further up will discard messages not destined                            // for us (e.g. based on view IDs).                            // Also: store msg in queue, when view change is received, replay messages with the same                            // vid as the new view                            if(!is_server) {                                msg_copy=msg.copy();                // msg without header                                msg_copy.putHeader(getName(), hdr); // put header back on as we removed it above                                queued_msgs.add(msg_copy);          // need a copy since passUp() will modify msg                                passUp(new Event(Event.MSG, msg));                                return;                            }                            // check for VIDs: is the message's VID the same as ours ?                            if(vid != null && hdr.vid != null) { // only check if our vid and message's vid available                                Address my_addr=vid.getCoordAddress(), other_addr=hdr.vid.getCoordAddress();                                if(my_addr == null || other_addr == null) {                                    if(warn) log.warn("my vid or message's vid does not contain " +                                            "a coordinator; discarding message !");                                    return;                                }                                if(!my_addr.equals(other_addr)) {                                    if(warn) log.warn("creator of own vid (" + my_addr + ")is different from " +                                            "creator of message's vid (" + other_addr + "); discarding message !");                                    return;                                }                                rc=hdr.vid.compareTo(vid);                                if(rc > 0) {           // message is sent in next view -> store !                                        if(log.isInfoEnabled()) log.info("message's vid (" + hdr.vid + '#' + hdr.seqno +                                                ") is bigger than current vid: (" + vid + ") message is queued !");                                    msg.putHeader(getName(), hdr);  // put header back on as we removed it above                                    queued_msgs.add(msg);                                    return;                                }                                if(rc < 0) {      // message sent in prev. view -> discard !                                        if(warn) log.warn("message's vid (" + hdr.vid + ") is smaller than " +                                                "current vid (" + vid + "): message <" + msg.getSrc() + ":#" +                                                hdr.seqno + "> is discarded ! Hdr is " + hdr);                                    return;                                }                                // If we made it down here, the vids are the same --> OK                            }                            msg.putHeader(getName(), hdr);   // stored in received_msgs, re-sent later that's why hdr is added !                            naker.receive(hdr.seqno, msg, null);                            return;        // naker passes message up for us !                        case NakAckHeader.RETRANSMIT_MSG:                            naker.retransmit(msg.getSrc(), hdr.seqno, hdr.last_seqno);                            return;                        case NakAckHeader.NAK_ACK_RSP:                            naker.receiveAck(hdr.seqno, msg.getSrc());                            return;        // discard, no need to pass up                        case NakAckHeader.OUT_OF_BAND_MSG:                            out_of_bander.receive(hdr.seqno, msg, hdr.stable_msgs);                            return;        // naker passes message up for us !                        case NakAckHeader.OUT_OF_BAND_RSP:                            out_of_bander.receiveAck(hdr.seqno, msg.getSrc());                            return;                        default:                            if(log.isErrorEnabled()) log.error("NakAck header type " + hdr.type + " not known !");                            break;                    }                } //end synchronized        }        passUp(evt);    }    /**     * <b>Callback</b>. Called by superclass when event may be handled.<p>     * <b>Do not use <code>passDown</code> in this method as the event is passed down     * by default by the superclass after this method returns !</b>     */    public void down(Event evt) {        Message msg;        if(trace)            log.trace("queued_msgs has " + queued_msgs.size() + " messages " +                    "\n\nnaker:\n" + naker.dumpContents() + "\n\nout_of_bander: " +                    out_of_bander.dumpContents() + "\n-----------------------------\n");        switch(evt.getType()) {            case Event.MSG:                msg=(Message)evt.getArg();            // unicast address: not null and not mcast, pass down unchanged            if(vid == null || (msg.getDest() != null && !msg.getDest().isMulticastAddress()))                break;                if(send_next_msg_out_of_band) {                    out_of_bander.send(msg);                    send_next_msg_out_of_band=false;                }                else if(send_next_msg_acking) {                    naker.setAcks(true);  // require acks when sending a msg                    naker.send(msg);                    naker.setAcks(false);  // don't require acks when sending a msg                    send_next_msg_acking=false;                }                else                    naker.send(msg);                return;    // don't pass down the stack, naker does this for us !            case Event.GET_MSG_DIGEST:                long[] highest_seqnos=(long[])evt.getArg();                Digest digest=naker.computeMessageDigest(highest_seqnos);                passUp(new Event(Event.GET_MSG_DIGEST_OK, digest));                return;            case Event.GET_MSGS:                List lower_seqnos=naker.getMessagesInRange((long[][])evt.getArg());                passUp(new Event(Event.GET_MSGS_OK, lower_seqnos));                return;            case Event.REBROADCAST_MSGS:                rebroadcastMsgs((Vector)evt.getArg());                break;            case Event.TMP_VIEW:                Vector mbrs=((View)evt.getArg()).getMembers();                members=mbrs != null? (Vector)mbrs.clone() : new Vector(11);                break;            case Event.VIEW_CHANGE:                synchronized(this) {                    view=((View)((View)evt.getArg()).clone());                    vid=view.getVid();                    members=(Vector)view.getMembers().clone();                    naker.reset();                    out_of_bander.reset();                    is_server=true;  // check vids from now on                    // deliver messages received previously for this view                    if(queued_msgs.size() > 0)                        deliverQueuedMessages();                }                break;            case Event.BECOME_SERVER:                is_server=true;                break;            case Event.SWITCH_NAK:                naker.setAcks(false); // don't require acks when sending a msg                return;                     // don't pass down any further            case Event.SWITCH_NAK_ACK:                send_next_msg_acking=true;                return;                     // don't pass down any further            case Event.SWITCH_OUT_OF_BAND:                send_next_msg_out_of_band=true;                return;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?