⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nakreceiverwindow.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: NakReceiverWindow.java,v 1.28 2006/01/14 14:00:42 belaban Exp $package org.jgroups.stack;import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.util.List;import org.jgroups.util.TimeScheduler;import java.util.*;/** * Keeps track of messages according to their sequence numbers. Allows * messages to be added out of order, and with gaps between sequence numbers. * Method <code>remove()</code> removes the first message with a sequence * number that is 1 higher than <code>next_to_remove</code> (this variable is * then incremented), or it returns null if no message is present, or if no * message's sequence number is 1 higher. * <p> * When there is a gap upon adding a message, its seqno will be added to the * Retransmitter, which (using a timer) requests retransmissions of missing * messages and keeps on trying until the message has been received, or the * member who sent the message is suspected. * <p> * Started out as a copy of SlidingWindow. Main diff: RetransmitCommand is * different, and retransmission thread is only created upon detection of a * gap. * <p> * Change Nov 24 2000 (bela): for PBCAST, which has its own retransmission * (via gossip), the retransmitter thread can be turned off * <p> * Change April 25 2001 (igeorg):<br> * i. Restructuring: placed all nested class definitions at the top, then * class static/non-static variables, then class private/public methods.<br> * ii. Class and all nested classes are thread safe. Readers/writer lock * added on <tt>NakReceiverWindow</tt> for finer grained locking.<br> * iii. Internal or externally provided retransmission scheduler thread.<br> * iv. Exponential backoff in time for retransmissions.<br> * * @author Bela Ban May 27 1999, May 2004 * @author John Georgiadis May 8 2001 */public class NakReceiverWindow {    public interface Listener {        void missingMessageReceived(long seqno, Message msg);    }    /** The big read/write lock */    private final ReadWriteLock lock=new WriterPreferenceReadWriteLock();    //private final ReadWriteLock lock=new NullReadWriteLock();    /** keep track of *next* seqno to remove and highest received */    private long   head=0;    private long   tail=0;    /** lowest seqno delivered so far */    private long   lowest_seen=0;    /** highest deliverable (or delivered) seqno so far */    private long   highest_seen=0;    /** TreeMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers */    private final TreeMap received_msgs=new TreeMap();    /** TreeMap<Long,Message>. Delivered (= seen by all members) messages. A remove() method causes a message to be     moved from received_msgs to delivered_msgs. Message garbage collection will gradually remove elements in this map */    private final TreeMap delivered_msgs=new TreeMap();    /**     * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered     * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where     * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never     * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message     * around, and don't need to wait for garbage collection to remove them.     */    private boolean discard_delivered_msgs=false;    /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,     * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers     */    private int max_xmit_buf_size=0;    /** if not set, no retransmitter thread will be started. Useful if     * protocols do their own retransmission (e.g PBCAST) */    private Retransmitter retransmitter=null;    private Listener listener=null;    protected static final Log log=LogFactory.getLog(NakReceiverWindow.class);    /**     * Creates a new instance with the given retransmit command     *     * @param sender The sender associated with this instance     * @param cmd The command used to retransmit a missing message, will     * be invoked by the table. If null, the retransmit thread will not be     * started     * @param start_seqno The first sequence number to be received     * @param sched the external scheduler to use for retransmission     * requests of missing msgs. If it's not provided or is null, an internal     * one is created     */    public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd,                             long start_seqno, TimeScheduler sched) {        head=start_seqno;        tail=head;        if(cmd != null)            retransmitter=sched == null ?                    new Retransmitter(sender, cmd) :                    new Retransmitter(sender, cmd, sched);    }    /**     * Creates a new instance with the given retransmit command     *     * @param sender The sender associated with this instance     * @param cmd The command used to retransmit a missing message, will     * be invoked by the table. If null, the retransmit thread will not be     * started     * @param start_seqno The first sequence number to be received     */    public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long start_seqno) {        this(sender, cmd, start_seqno, null);    }    /**     * Creates a new instance without a retransmission thread     *     * @param sender The sender associated with this instance     * @param start_seqno The first sequence number to be received     */    public NakReceiverWindow(Address sender, long start_seqno) {        this(sender, null, start_seqno);    }    public void setRetransmitTimeouts(long[] timeouts) {        if(retransmitter != null)            retransmitter.setRetransmitTimeouts(timeouts);    }    public void setDiscardDeliveredMessages(boolean flag) {        this.discard_delivered_msgs=flag;    }    public int getMaxXmitBufSize() {        return max_xmit_buf_size;    }    public void setMaxXmitBufSize(int max_xmit_buf_size) {        this.max_xmit_buf_size=max_xmit_buf_size;    }    public void setListener(Listener l) {        this.listener=l;    }    /**     * Adds a message according to its sequence number (ordered).     * <p>     * Variables <code>head</code> and <code>tail</code> mark the start and     * end of the messages received, but not delivered yet. When a message is     * received, if its seqno is smaller than <code>head</code>, it is     * discarded (already received). If it is bigger than <code>tail</code>,     * we advance <code>tail</code> and add empty elements. If it is between     * <code>head</code> and <code>tail</code>, we set the corresponding     * missing (or already present) element. If it is equal to     * <code>tail</code>, we advance the latter by 1 and add the message     * (default case).     */    public void add(long seqno, Message msg) {        long old_tail;        try {            lock.writeLock().acquire();            try {                old_tail=tail;                if(seqno < head) {                    if(log.isTraceEnabled()) {                        StringBuffer sb=new StringBuffer("seqno ");                        sb.append(seqno).append(" is smaller than ").append(head).append("); discarding message");                        log.trace(sb.toString());                    }                    return;                }                // add at end (regular expected msg)                if(seqno == tail) {                    received_msgs.put(new Long(seqno), msg);                    tail++;                    if(highest_seen+2 == tail) {                        highest_seen++;                    }                    else {                       updateHighestSeen();                    }                    // highest_seen=seqno;                }                // gap detected                // i. add placeholders, creating gaps                // ii. add real msg                // iii. tell retransmitter to retrieve missing msgs                else if(seqno > tail) {                    for(long i=tail; i < seqno; i++) {                        received_msgs.put(new Long(i), null);                        // XmitEntry xmit_entry=new XmitEntry();                        //xmits.put(new Long(i), xmit_entry);                        tail++;                    }                    received_msgs.put(new Long(seqno), msg);                    tail=seqno + 1;                    if(retransmitter != null) {                        retransmitter.add(old_tail, seqno - 1);                    }                }                else if(seqno < tail) { // finally received missing message                    if(log.isTraceEnabled()) {                        log.trace(new StringBuffer("added missing msg ").append(msg.getSrc()).append('#').append(seqno));                    }                    if(listener != null) {                        try {listener.missingMessageReceived(seqno, msg);} catch(Throwable t) {}                    }                    Object val=received_msgs.get(new Long(seqno));                    if(val == null) {                        // only set message if not yet received (bela July 23 2003)                        received_msgs.put(new Long(seqno), msg);                        if(highest_seen +1 == seqno || seqno == head)                            updateHighestSeen();                        //XmitEntry xmit_entry=(XmitEntry)xmits.get(new Long(seqno));                        //if(xmit_entry != null)                        //  xmit_entry.received=System.currentTimeMillis();                        //long xmit_diff=xmit_entry == null? -1 : xmit_entry.received - xmit_entry.created;                        //NAKACK.addXmitResponse(msg.getSrc(), seqno);                        if(retransmitter != null) retransmitter.remove(seqno);                    }                }                updateLowestSeen();            }            finally {                lock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring write lock", e);        }    }    /** Start from the current sequence number and set highest_seen until we find a gap (null value in the entry) */    void updateHighestSeen() {        SortedMap map=received_msgs.tailMap(new Long(highest_seen));        Map.Entry entry;        for(Iterator it=map.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            if(entry.getValue() != null)                highest_seen=((Long)entry.getKey()).longValue();            else                break;        }    }    public Message remove() {        Message retval=null;        Long    key;        boolean bounded_buffer_enabled=max_xmit_buf_size > 0;        try {            lock.writeLock().acquire();            try {                while(received_msgs.size() > 0) {                    key=(Long)received_msgs.firstKey();                    retval=(Message)received_msgs.get(key);                    if(retval != null) { // message exists and is ready for delivery                        received_msgs.remove(key);       // move from received_msgs to ...                        if(discard_delivered_msgs == false) {                            delivered_msgs.put(key, retval); // delivered_msgs                        }                        head++;  // is removed from retransmitter somewhere else (when missing message is received)                        return retval;                    }                    else { // message has not yet been received (gap in the message sequence stream)                        if(bounded_buffer_enabled && received_msgs.size() > max_xmit_buf_size) {                            received_msgs.remove(key);       // move from received_msgs to ...                            head++;                            retransmitter.remove(key.longValue());                        }                        else {                            break;                        }                    }                }                return retval;            }            finally {                lock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring write lock", e);            return null;        }    }    /**     * Delete all messages <= seqno (they are stable, that is, have been     * received at all members). Stop when a number > seqno is encountered     * (all messages are ordered on seqnos).     */    public void stable(long seqno) {        try {            lock.writeLock().acquire();            try {                // we need to remove all seqnos *including* seqno: because headMap() *excludes* seqno, we                // simply increment it, so we have to correct behavior                SortedMap m=delivered_msgs.headMap(new Long(seqno +1));                if(m.size() > 0)                    lowest_seen=Math.max(lowest_seen, ((Long)m.lastKey()).longValue());                m.clear(); // removes entries from delivered_msgs            }            finally {                lock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring write lock", e);        }    }    /**     * Reset the retransmitter and the nak window<br>     */    public void reset() {        try {            lock.writeLock().acquire();            try {                if(retransmitter != null)                    retransmitter.reset();                _reset();            }            finally {                lock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring write lock", e);        }    }    /**     * Stop the retransmitter and reset the nak window<br>     */    public void destroy() {        try {            lock.writeLock().acquire();            try {                if(retransmitter != null)                    retransmitter.stop();                _reset();            }            finally {                lock.writeLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring write lock", e);        }    }    /**     * @return the highest sequence number of a message consumed by the     * application (by <code>remove()</code>)     */    public long getHighestDelivered() {        try {            lock.readLock().acquire();            try {                return (Math.max(head - 1, -1));            }            finally {                lock.readLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring read lock", e);            return -1;        }    }    /**     * @return the lowest sequence number of a message that has been     * delivered or is a candidate for delivery (by the next call to     * <code>remove()</code>)     */    public long getLowestSeen() {        try {            lock.readLock().acquire();            try {                return (lowest_seen);            }            finally {                lock.readLock().release();            }        }        catch(InterruptedException e) {            log.error("failed acquiring read lock", e);            return -1;        }    }    /**     * Returns the highest deliverable seqno; e.g., for 1,2,3,5,6 it would     * be 3.     *

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -