⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nakack.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
     *     * @param xmit_requester        The sender of the XMIT_REQ, we have to send the requested copy of the message to this address     * @param first_seqno The first sequence number to be retransmitted (<= last_seqno)     * @param last_seqno  The last sequence number to be retransmitted (>= first_seqno)     * @param original_sender The member who originally sent the messsage. Guaranteed to be non-null     */    private void handleXmitReq(Address xmit_requester, long first_seqno, long last_seqno, Address original_sender) {        Message m, tmp;        LinkedList list;        long size=0, marker=first_seqno, len;        NakReceiverWindow win=null;        boolean      amISender; // am I the original sender ?        if(trace) {            StringBuffer sb=new StringBuffer();            sb.append(local_addr).append(": received xmit request from ").append(xmit_requester).append(" for ");            sb.append(original_sender).append(" [").append(first_seqno).append(" - ").append(last_seqno).append("]");            log.trace(sb.toString());        }        if(first_seqno > last_seqno) {            if(log.isErrorEnabled())                log.error("first_seqno (" + first_seqno + ") > last_seqno (" + last_seqno + "): not able to retransmit");            return;        }        if(stats) {            xmit_reqs_received+=last_seqno - first_seqno +1;            updateStats(received, xmit_requester, 1, 0, 0);        }        amISender=local_addr.equals(original_sender);        if(!amISender)            win=(NakReceiverWindow)received_msgs.get(original_sender);        list=new LinkedList();        for(long i=first_seqno; i <= last_seqno; i++) {            if(amISender) {                m=(Message)sent_msgs.get(new Long(i)); // no need to synchronize            }            else {                m=win != null? win.get(i) : null;            }            if(m == null) {                if(log.isErrorEnabled()) {                    StringBuffer sb=new StringBuffer();                    sb.append("(requester=").append(xmit_requester).append(", local_addr=").append(this.local_addr);                    sb.append(") message ").append(original_sender).append("::").append(i);                    sb.append(" not found in ").append((amISender? "sent" : "received")).append(" msgs. ");                    if(win != null) {                        sb.append("Received messages from ").append(original_sender).append(": ").append(win.toString());                    }                    else {                        sb.append("\nSent messages: ").append(printSentMsgs());                    }                    log.error(sb);                }                continue;            }            len=m.size();            size+=len;            if(size > max_xmit_size && list.size() > 0) { // changed from >= to > (yaron-r, bug #943709)                // yaronr: added &&listSize()>0 since protocols between FRAG and NAKACK add headers, and message exceeds size.                // size has reached max_xmit_size. go ahead and send message (excluding the current message)                if(trace)                    log.trace("xmitting msgs [" + marker + '-' + (i - 1) + "] to " + xmit_requester);                sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, i - 1);                marker=i;                list.clear();                // fixed Dec 15 2003 (bela, patch from Joel Dice (dicej)), see explanantion under                // bug report #854887                size=len;            }            if(Global.copy) {                tmp=m.copy();            }            else {                tmp=m;            }            // tmp.setDest(xmit_requester);            // tmp.setSrc(local_addr);            if(tmp.getSrc() == null)                tmp.setSrc(local_addr);            list.add(tmp);        }        if(list.size() > 0) {            if(trace)                log.trace("xmitting msgs [" + marker + '-' + last_seqno + "] to " + xmit_requester);            sendXmitRsp(xmit_requester, (LinkedList)list.clone(), marker, last_seqno);            list.clear();        }    }    private static void updateStats(HashMap map, Address key, int req, int rsp, int missing) {        Entry entry=(Entry)map.get(key);        if(entry == null) {            entry=new Entry();            map.put(key, entry);        }        entry.xmit_reqs+=req;        entry.xmit_rsps+=rsp;        entry.missing_msgs_rcvd+=missing;    }    private void sendXmitRsp(Address dest, LinkedList xmit_list, long first_seqno, long last_seqno) {        Buffer buf;        if(xmit_list == null || xmit_list.size() == 0) {            if(log.isErrorEnabled())                log.error("xmit_list is empty");            return;        }        if(use_mcast_xmit)            dest=null;        if(stats) {            xmit_rsps_sent+=xmit_list.size();            updateStats(sent, dest, 0, 1, 0);        }        try {            buf=Util.msgListToByteBuffer(xmit_list);            Message msg=new Message(dest, null, buf.getBuf(), buf.getOffset(), buf.getLength());            msg.putHeader(name, new NakAckHeader(NakAckHeader.XMIT_RSP, first_seqno, last_seqno));            passDown(new Event(Event.MSG, msg));        }        catch(IOException ex) {            log.error("failed marshalling xmit list", ex);        }    }    private void handleXmitRsp(Message msg) {        LinkedList list;        Message m;        if(msg == null) {            if(warn)                log.warn("message is null");            return;        }        try {            list=Util.byteBufferToMessageList(msg.getRawBuffer(), msg.getOffset(), msg.getLength());            if(list != null) {                if(stats) {                    xmit_rsps_received+=list.size();                    updateStats(received, msg.getSrc(), 0, 1, 0);                }                for(Iterator it=list.iterator(); it.hasNext();) {                    m=(Message)it.next();                    up(new Event(Event.MSG, m));                }                list.clear();            }        }        catch(Exception ex) {            if(log.isErrorEnabled()) {                log.error("failed reading list of retransmitted messages", ex);            }        }    }    /**     * Remove old members from NakReceiverWindows and add new members (starting seqno=0). Essentially removes all     * entries from received_msgs that are not in <code>members</code>     */    private void adjustReceivers(boolean remove) {        Address sender;        NakReceiverWindow win;        synchronized(received_msgs) {            if(remove) {                // 1. Remove all senders in received_msgs that are not members anymore                for(Iterator it=received_msgs.keySet().iterator(); it.hasNext();) {                    sender=(Address)it.next();                    if(!members.contains(sender)) {                        win=(NakReceiverWindow)received_msgs.get(sender);                        win.reset();                        if(log.isDebugEnabled()) {                            log.debug("removing " + sender + " from received_msgs (not member anymore)");                        }                        it.remove();                    }                }            }            // 2. Add newly joined members to received_msgs (starting seqno=0)           for(int i=0; i < members.size(); i++) {              sender=(Address)members.elementAt(i);              if(!received_msgs.containsKey(sender)) {                 win=createNakReceiverWindow(sender, 0);                 received_msgs.put(sender, win);              }           }        }    }    /**     * Returns a message digest: for each member P the highest seqno received from P is added to the digest.     */    private Digest getDigest() {        Digest digest;        Address sender;        Range range;        digest=new Digest(members.size());        for(int i=0; i < members.size(); i++) {            sender=(Address)members.elementAt(i);            range=getLowestAndHighestSeqno(sender, false);  // get the highest received seqno            if(range == null) {                if(log.isErrorEnabled()) {                    log.error("range is null");                }                continue;            }            digest.add(sender, range.low, range.high);  // add another entry to the digest        }        return digest;    }    /**     * Returns a message digest: for each member P the highest seqno received from P <em>without a gap</em> is added to     * the digest. E.g. if the seqnos received from P are [+3 +4 +5 -6 +7 +8], then 5 will be returned. Also, the     * highest seqno <em>seen</em> is added. The max of all highest seqnos seen will be used (in STABLE) to determine     * whether the last seqno from a sender was received (see "Last Message Dropped" topic in DESIGN).     */    private Digest getDigestHighestDeliveredMsgs() {        Digest digest;        Address sender;        Range range;        long high_seqno_seen;        digest=new Digest(members.size());        for(int i=0; i < members.size(); i++) {            sender=(Address)members.elementAt(i);            range=getLowestAndHighestSeqno(sender, true);  // get the highest deliverable seqno            if(range == null) {                if(log.isErrorEnabled()) {                    log.error("range is null");                }                continue;            }            high_seqno_seen=getHighSeqnoSeen(sender);            digest.add(sender, range.low, range.high, high_seqno_seen);  // add another entry to the digest        }        return digest;    }    /**     * Creates a NakReceiverWindow for each sender in the digest according to the sender's seqno. If NRW already exists,     * reset it.     */    private void setDigest(Digest d) {        if(d == null || d.senders == null) {            if(log.isErrorEnabled()) {                log.error("digest or digest.senders is null");            }            return;        }        clear();        Map.Entry entry;        Address sender;        org.jgroups.protocols.pbcast.Digest.Entry val;        long initial_seqno;        NakReceiverWindow win;        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sender=(Address)entry.getKey();            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            if(sender == null || val == null) {                if(warn) {                    log.warn("sender or value is null");                }                continue;            }            initial_seqno=val.high_seqno;            win=createNakReceiverWindow(sender, initial_seqno);            synchronized(received_msgs) {                received_msgs.put(sender, win);            }        }    }    /**     * For all members of the digest, adjust the NakReceiverWindows in the received_msgs hashtable. If the member     * already exists, sets its seqno to be the max of the seqno and the seqno of the member in the digest. If no entry     * exists, create one with the initial seqno set to the seqno of the member in the digest.     */    private void mergeDigest(Digest d) {        if(d == null || d.senders == null) {            if(log.isErrorEnabled()) {                log.error("digest or digest.senders is null");            }            return;        }        Map.Entry entry;        Address sender;        org.jgroups.protocols.pbcast.Digest.Entry val;        NakReceiverWindow win;        long initial_seqno;        for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sender=(Address)entry.getKey();            val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();            if(sender == null || val == null) {                if(warn) {                    log.warn("sender or value is null");                }                continue;            }            initial_seqno=val.high_seqno;            synchronized(received_msgs) {                win=(NakReceiverWindow)received_msgs.get(sender);                if(win == null) {                    win=createNakReceiverWindow(sender, initial_seqno);                    received_msgs.put(sender, win);                }                else {                    if(win.getHighestReceived() < initial_seqno) {                        win.reset();                        received_msgs.remove(sender);                        win=createNakReceiverWindow(sender, initial_seqno);                        received_msgs.put(sender, win);                    }                }            }        }    }    private NakReceiverWindow createNakReceiverWindow(Address sender, long initial_seqno) {        NakReceiverWindow win=new NakReceiverWindow(sender, this, initial_seqno, timer);        win.setRetransmitTimeouts(retransmit_timeout);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -