⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nakack.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        sb.append("\nMissing messages received\n");        MissingMessage missing;        for(Enumeration en=receive_history.elements(); en.hasMoreElements();) {            missing=(MissingMessage)en.nextElement();            sb.append(missing).append("\n");        }        return sb.toString();    }    public Vector providedUpServices() {        Vector retval=new Vector(5);        retval.addElement(new Integer(Event.GET_DIGEST));        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));        retval.addElement(new Integer(Event.GET_DIGEST_STATE));        retval.addElement(new Integer(Event.SET_DIGEST));        retval.addElement(new Integer(Event.MERGE_DIGEST));        return retval;    }    public Vector providedDownServices() {        Vector retval=new Vector(2);        retval.addElement(new Integer(Event.GET_DIGEST));        retval.addElement(new Integer(Event.GET_DIGEST_STABLE));        return retval;    }    public void start() throws Exception {        timer=stack != null ? stack.timer : null;        if(timer == null)            throw new Exception("timer is null");        started=true;    }    public void stop() {        started=false;        reset();  // clears sent_msgs and destroys all NakReceiverWindows    }    /**     * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>passDown()</code> in this     * method as the event is passed down by default by the superclass after this method returns !</b>     */    public void down(Event evt) {        Digest  digest;        Vector  mbrs;        switch(evt.getType()) {        case Event.MSG:            Message msg=(Message)evt.getArg();            Address dest=msg.getDest();            if(dest != null && !dest.isMulticastAddress()) {                break; // unicast address: not null and not mcast, pass down unchanged            }            send(evt, msg);            return;    // don't pass down the stack        case Event.STABLE:  // generated by STABLE layer. Delete stable messages passed in arg            stable((Digest)evt.getArg());            return;  // do not pass down further (Bela Aug 7 2001)        case Event.GET_DIGEST:            digest=getDigest();            passUp(new Event(Event.GET_DIGEST_OK, digest != null ? digest.copy() : null));            return;        case Event.GET_DIGEST_STABLE:            digest=getDigestHighestDeliveredMsgs();            passUp(new Event(Event.GET_DIGEST_STABLE_OK, digest != null ? digest.copy() : null));            return;        case Event.GET_DIGEST_STATE:            digest=getDigest();            passUp(new Event(Event.GET_DIGEST_STATE_OK, digest != null ? digest.copy() : null));            return;        case Event.SET_DIGEST:            setDigest((Digest)evt.getArg());            return;        case Event.MERGE_DIGEST:            mergeDigest((Digest)evt.getArg());            return;        case Event.CONFIG:            passDown(evt);            if(log.isDebugEnabled()) {                log.debug("received CONFIG event: " + evt.getArg());            }            handleConfigEvent((HashMap)evt.getArg());            return;        case Event.TMP_VIEW:            View tmp_view=(View)evt.getArg();            mbrs=tmp_view.getMembers();            members.clear();            members.addAll(mbrs);            adjustReceivers(false);            break;        case Event.VIEW_CHANGE:            tmp_view=(View)evt.getArg();            mbrs=tmp_view.getMembers();            members.clear();            members.addAll(mbrs);            adjustReceivers(true);            is_server=true;  // check vids from now on            Set tmp=new LinkedHashSet(members);            tmp.add(null); // for null destination (= mcast)            sent.keySet().retainAll(tmp);            received.keySet().retainAll(tmp);            view=tmp_view;            break;        case Event.BECOME_SERVER:            is_server=true;            break;        case Event.DISCONNECT:            leaving=true;            reset();            break;        }        passDown(evt);    }    /**     * <b>Callback</b>. Called by superclass when event may be handled.<p> <b>Do not use <code>PassUp</code> in this     * method as the event is passed up by default by the superclass after this method returns !</b>     */    public void up(Event evt) {        NakAckHeader hdr;        Message msg;        Digest digest;        switch(evt.getType()) {        case Event.MSG:            msg=(Message)evt.getArg();            hdr=(NakAckHeader)msg.getHeader(name);            if(hdr == null)                break;  // pass up (e.g. unicast msg)            // discard messages while not yet server (i.e., until JOIN has returned)            if(!is_server) {                if(trace)                    log.trace("message was discarded (not yet server)");                return;            }            // Changed by bela Jan 29 2003: we must not remove the header, otherwise            // further xmit requests will fail !            //hdr=(NakAckHeader)msg.removeHeader(getName());            switch(hdr.type) {            case NakAckHeader.MSG:                handleMessage(msg, hdr);                return;        // transmitter passes message up for us !            case NakAckHeader.XMIT_REQ:                if(hdr.range == null) {                    if(log.isErrorEnabled()) {                        log.error("XMIT_REQ: range of xmit msg is null; discarding request from " + msg.getSrc());                    }                    return;                }                handleXmitReq(msg.getSrc(), hdr.range.low, hdr.range.high, hdr.sender);                return;            case NakAckHeader.XMIT_RSP:                if(trace)                    log.trace("received missing messages " + hdr.range);                handleXmitRsp(msg);                return;            default:                if(log.isErrorEnabled()) {                    log.error("NakAck header type " + hdr.type + " not known !");                }                return;            }        case Event.STABLE:  // generated by STABLE layer. Delete stable messages passed in arg            stable((Digest)evt.getArg());            return;  // do not pass up further (Bela Aug 7 2001)        case Event.GET_DIGEST:            digest=getDigestHighestDeliveredMsgs();            passDown(new Event(Event.GET_DIGEST_OK, digest));            return;        case Event.GET_DIGEST_STABLE:            digest=getDigestHighestDeliveredMsgs();            passDown(new Event(Event.GET_DIGEST_STABLE_OK, digest));            return;        case Event.SET_LOCAL_ADDRESS:            local_addr=(Address)evt.getArg();            break;        case Event.CONFIG:            passUp(evt);            if(log.isDebugEnabled()) {                log.debug("received CONFIG event: " + evt.getArg());            }            handleConfigEvent((HashMap)evt.getArg());            return;        }        passUp(evt);    }    /* --------------------------------- Private Methods --------------------------------------- */    /**     * Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't     * store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a     * message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details.     * Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to     * sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006     */    private void send(Event evt, Message msg) {        if(msg == null)            throw new NullPointerException("msg is null; event is " + evt);        if(!started) {            if(warn)                log.warn("[" + local_addr + "] discarded message as start() has not been called, message: " + msg);            return;        }        long msg_id;        synchronized(sent_msgs) {            try { // incrementing seqno and adding the msg to sent_msgs needs to be atomic                msg_id=seqno +1;                msg.putHeader(name, new NakAckHeader(NakAckHeader.MSG, msg_id));                if(Global.copy) {                    sent_msgs.put(new Long(msg_id), msg.copy());                }                else {                    sent_msgs.put(new Long(msg_id), msg);                }                seqno=msg_id;            }            catch(Throwable t) {                if(t instanceof Error)                    throw (Error)t;                if(t instanceof RuntimeException)                    throw (RuntimeException)t;                else {                    throw new RuntimeException("failure adding msg " + msg + " to the retransmit table", t);                }            }        }        try { // moved passDown() out of synchronized clause (bela Sept 7 2006) http://jira.jboss.com/jira/browse/JGRP-300            if(trace)                log.trace("sending " + local_addr + "#" + msg_id);            passDown(evt); // if this fails, since msg is in sent_msgs, it can be retransmitted        }        catch(Throwable t) { // eat the exception, don't pass it up the stack            if(warn) {                log.warn("failure passing message down", t);            }        }    }    /**     * Finds the corresponding NakReceiverWindow and adds the message to it (according to seqno). Then removes as many     * messages as possible from the NRW and passes them up the stack. Discards messages from non-members.     */    private void handleMessage(Message msg, NakAckHeader hdr) {        NakReceiverWindow win;        Message msg_to_deliver;        Address sender=msg.getSrc();        if(sender == null) {            if(log.isErrorEnabled())                log.error("sender of message is null");            return;        }        if(trace) {            StringBuffer sb=new StringBuffer('[');            sb.append(local_addr).append(": received ").append(sender).append('#').append(hdr.seqno);            log.trace(sb.toString());        }        // msg is potentially re-sent later as result of XMIT_REQ reception; that's why hdr is added !        // Changed by bela Jan 29 2003: we currently don't resend from received msgs, just from sent_msgs !        // msg.putHeader(getName(), hdr);        synchronized(received_msgs) {            win=(NakReceiverWindow)received_msgs.get(sender);        }        if(win == null) {  // discard message if there is no entry for sender            if(leaving)                return;            if(warn) {                StringBuffer sb=new StringBuffer('[');                sb.append(local_addr).append("] discarded message from non-member ")                        .append(sender).append(", my view is " ).append(this.view);                log.warn(sb);            }            return;        }        win.add(hdr.seqno, msg);  // add in order, then remove and pass up as many msgs as possible        // Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198);        // this is all the more important once we have a threadless stack (http://jira.jboss.com/jira/browse/JGRP-181),        // where lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time        // We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in        // delivery of P1, Q1, Q2, P2: FIFO (implemented by NAKACK) says messages need to be delivered only in the        // order in which they were sent by the sender        synchronized(win) {            while((msg_to_deliver=win.remove()) != null) {                // Changed by bela Jan 29 2003: not needed (see above)                //msg_to_deliver.removeHeader(getName());                passUp(new Event(Event.MSG, msg_to_deliver));            }        }    }    /**     * Retransmit from sent-table, called when XMIT_REQ is received. Bundles all messages to be xmitted into one large     * message and sends them back with an XMIT_RSP header. Note that since we cannot count on a fragmentation layer     * below us, we have to make sure the message doesn't exceed max_xmit_size bytes. If this is the case, we split the     * message into multiple, smaller-chunked messages. But in most cases this still yields fewer messages than if each     * requested message was retransmitted separately.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -