⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nakack.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        win.setDiscardDeliveredMessages(discard_delivered_msgs);        win.setMaxXmitBufSize(this.max_xmit_buf_size);        if(stats)            win.setListener(this);        return win;    }    /**     * Returns the lowest seqno still in cache (so it can be retransmitted) and the highest seqno received so far.     *     * @param sender       The address for which the highest and lowest seqnos are to be retrieved     * @param stop_at_gaps If true, the highest seqno *deliverable* will be returned. If false, the highest seqno     *                     *received* will be returned. E.g. for [+3 +4 +5 -6 +7 +8], the highest_seqno_received is 8,     *                     whereas the higheset_seqno_seen (deliverable) is 5.     */    private Range getLowestAndHighestSeqno(Address sender, boolean stop_at_gaps) {        Range r=null;        NakReceiverWindow win;        if(sender == null) {            if(log.isErrorEnabled()) {                log.error("sender is null");            }            return r;        }        synchronized(received_msgs) {            win=(NakReceiverWindow)received_msgs.get(sender);        }        if(win == null) {            if(log.isErrorEnabled()) {                log.error("sender " + sender + " not found in received_msgs");            }            return r;        }        if(stop_at_gaps) {            r=new Range(win.getLowestSeen(), win.getHighestSeen());       // deliverable messages (no gaps)        }        else {            r=new Range(win.getLowestSeen(), win.getHighestReceived() + 1); // received messages        }        return r;    }    /**     * Returns the highest seqno seen from sender. E.g. if we received 1, 2, 4, 5 from P, then 5 will be returned     * (doesn't take gaps into account). If we are the sender, we will return the highest seqno <em>sent</em> rather     * then <em>received</em>     */    private long getHighSeqnoSeen(Address sender) {        NakReceiverWindow win;        long ret=0;        if(sender == null) {            if(log.isErrorEnabled()) {                log.error("sender is null");            }            return ret;        }        if(sender.equals(local_addr)) {            return seqno - 1;        }        synchronized(received_msgs) {            win=(NakReceiverWindow)received_msgs.get(sender);        }        if(win == null) {            if(log.isErrorEnabled()) {                log.error("sender " + sender + " not found in received_msgs");            }            return ret;        }        ret=win.getHighestReceived();        return ret;    }    /**     * Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest     * which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update received_msgs:     * for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the     * NakReceiverWindow corresponding to P which are <= seqno at digest[P].     */    private void stable(Digest d) {        NakReceiverWindow recv_win;        long my_highest_rcvd;        // highest seqno received in my digest for a sender P        long stability_highest_rcvd; // highest seqno received in the stability vector for a sender P        if(members == null || local_addr == null || d == null) {            if(warn)                log.warn("members, local_addr or digest are null !");            return;        }        if(trace) {            log.trace("received stable digest " + d);        }        Map.Entry entry;        Address sender;        org.jgroups.protocols.pbcast.Digest.Entry val;        long high_seqno_delivered, high_seqno_received;        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sender=(Address)entry.getKey();            if(sender == null)                continue;            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            high_seqno_delivered=val.high_seqno;            high_seqno_received=val.high_seqno_seen;            // check whether the last seqno received for a sender P in the stability vector is > last seqno            // received for P in my digest. if yes, request retransmission (see "Last Message Dropped" topic            // in DESIGN)            synchronized(received_msgs) {                recv_win=(NakReceiverWindow)received_msgs.get(sender);            }            if(recv_win != null) {                my_highest_rcvd=recv_win.getHighestReceived();                stability_highest_rcvd=high_seqno_received;                if(stability_highest_rcvd >= 0 && stability_highest_rcvd > my_highest_rcvd) {                    if(trace) {                        log.trace("my_highest_rcvd (" + my_highest_rcvd + ") < stability_highest_rcvd (" +                                stability_highest_rcvd + "): requesting retransmission of " +                                sender + '#' + stability_highest_rcvd);                    }                    retransmit(stability_highest_rcvd, stability_highest_rcvd, sender);                }            }            high_seqno_delivered-=gc_lag;            if(high_seqno_delivered < 0) {                continue;            }            if(trace)                log.trace("deleting msgs <= " + high_seqno_delivered + " from " + sender);            // garbage collect from sent_msgs if sender was myself            if(sender.equals(local_addr)) {                synchronized(sent_msgs) {                    // gets us a subset from [lowest seqno - seqno]                    SortedMap stable_keys=sent_msgs.headMap(new Long(high_seqno_delivered));                    if(stable_keys != null) {                        stable_keys.clear(); // this will modify sent_msgs directly                    }                }            }            // delete *delivered* msgs that are stable            // recv_win=(NakReceiverWindow)received_msgs.get(sender);            if(recv_win != null)                recv_win.stable(high_seqno_delivered);  // delete all messages with seqnos <= seqno        }    }    /* ---------------------- Interface Retransmitter.RetransmitCommand ---------------------- */    /**     * Implementation of Retransmitter.RetransmitCommand. Called by retransmission thread when gap is detected.     */    public void retransmit(long first_seqno, long last_seqno, Address sender) {        NakAckHeader hdr;        Message retransmit_msg;        Address dest=sender; // to whom do we send the XMIT request ?        if(xmit_from_random_member && !local_addr.equals(sender)) {            Address random_member=(Address)Util.pickRandomElement(members);            if(random_member != null && !local_addr.equals(random_member)) {                dest=random_member;                if(trace)                    log.trace("picked random member " + dest + " to send XMIT request to");            }        }        hdr=new NakAckHeader(NakAckHeader.XMIT_REQ, first_seqno, last_seqno, sender);        retransmit_msg=new Message(dest, null, null);        if(trace)            log.trace(local_addr + ": sending XMIT_REQ ([" + first_seqno + ", " + last_seqno + "]) to " + dest);        retransmit_msg.putHeader(name, hdr);        passDown(new Event(Event.MSG, retransmit_msg));        if(stats) {            xmit_reqs_sent+=last_seqno - first_seqno +1;            updateStats(sent, dest, 1, 0, 0);            for(long i=first_seqno; i <= last_seqno; i++) {                XmitRequest req=new XmitRequest(sender, i, dest);                send_history.add(req);            }        }    }    /* ------------------- End of Interface Retransmitter.RetransmitCommand -------------------- */    /* ----------------------- Interface NakReceiverWindow.Listener ---------------------- */    public void missingMessageReceived(long seqno, Message msg) {        if(stats) {            missing_msgs_received++;            updateStats(received, msg.getSrc(), 0, 0, 1);            MissingMessage missing=new MissingMessage(msg.getSrc(), seqno);            receive_history.add(missing);        }    }    /* ------------------- End of Interface NakReceiverWindow.Listener ------------------- */    private void clear() {        NakReceiverWindow win;        // changed April 21 2004 (bela): SourceForge bug# 938584. We cannot delete our own messages sent between        // a join() and a getState(). Otherwise retransmission requests from members who missed those msgs might        // fail. Not to worry though: those msgs will be cleared by STABLE (message garbage collection)        // sent_msgs.clear();        synchronized(received_msgs) {            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {                win=(NakReceiverWindow)it.next();                win.reset();            }            received_msgs.clear();        }    }    private void reset() {        NakReceiverWindow win;        synchronized(sent_msgs) {            sent_msgs.clear();            seqno=-1;        }        synchronized(received_msgs) {            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {                win=(NakReceiverWindow)it.next();                win.destroy();            }            received_msgs.clear();        }    }   public String printMessages() {        StringBuffer ret=new StringBuffer();        Map.Entry entry;        Address addr;        Object w;       ret.append("\nsent_msgs: ").append(printSentMsgs());        ret.append("\nreceived_msgs:\n");        synchronized(received_msgs) {            for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                addr=(Address)entry.getKey();                w=entry.getValue();                ret.append(addr).append(": ").append(w.toString()).append('\n');            }        }        return ret.toString();    }    public String printSentMsgs() {        StringBuffer sb=new StringBuffer();        Long min_seqno, max_seqno;        synchronized(sent_msgs) {            min_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.firstKey() : new Long(0);            max_seqno=sent_msgs.size() > 0 ? (Long)sent_msgs.lastKey() : new Long(0);        }        sb.append('[').append(min_seqno).append(" - ").append(max_seqno).append("] (").append(sent_msgs.size()).append(")");        return sb.toString();    }    private void handleConfigEvent(HashMap map) {        if(map == null) {            return;        }        if(map.containsKey("frag_size")) {            max_xmit_size=((Integer)map.get("frag_size")).intValue();            if(log.isInfoEnabled()) {                log.info("max_xmit_size=" + max_xmit_size);            }        }    }    static class Entry {        long xmit_reqs, xmit_rsps, missing_msgs_rcvd;        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append(xmit_reqs).append(" xmit_reqs").append(", ").append(xmit_rsps).append(" xmit_rsps");            sb.append(", ").append(missing_msgs_rcvd).append(" missing msgs");            return sb.toString();        }    }    static class XmitRequest {        Address original_sender; // original sender of message        long    seq, timestamp=System.currentTimeMillis();        Address xmit_dest;       // destination to which XMIT_REQ is sent, usually the original sender        XmitRequest(Address original_sender, long seqno, Address xmit_dest) {            this.original_sender=original_sender;            this.xmit_dest=xmit_dest;            this.seq=seqno;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);            sb.append(" (XMIT_REQ sent to ").append(xmit_dest).append(")");            return sb.toString();        }    }    static class MissingMessage {        Address original_sender;        long    seq, timestamp=System.currentTimeMillis();        MissingMessage(Address original_sender, long seqno) {            this.original_sender=original_sender;            this.seq=seqno;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append(new Date(timestamp)).append(": ").append(original_sender).append(" #").append(seq);            return sb.toString();        }    }    /* ----------------------------- End of Private Methods ------------------------------------ */}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -