📄 nakreceiverwindow.java
字号:
// $Id: NakReceiverWindow.java,v 1.28 2006/01/14 14:00:42 belaban Exp $package org.jgroups.stack;import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.util.List;import org.jgroups.util.TimeScheduler;import java.util.*;/** * Keeps track of messages according to their sequence numbers. Allows * messages to be added out of order, and with gaps between sequence numbers. * Method <code>remove()</code> removes the first message with a sequence * number that is 1 higher than <code>next_to_remove</code> (this variable is * then incremented), or it returns null if no message is present, or if no * message's sequence number is 1 higher. * <p> * When there is a gap upon adding a message, its seqno will be added to the * Retransmitter, which (using a timer) requests retransmissions of missing * messages and keeps on trying until the message has been received, or the * member who sent the message is suspected. * <p> * Started out as a copy of SlidingWindow. Main diff: RetransmitCommand is * different, and retransmission thread is only created upon detection of a * gap. * <p> * Change Nov 24 2000 (bela): for PBCAST, which has its own retransmission * (via gossip), the retransmitter thread can be turned off * <p> * Change April 25 2001 (igeorg):<br> * i. Restructuring: placed all nested class definitions at the top, then * class static/non-static variables, then class private/public methods.<br> * ii. Class and all nested classes are thread safe. Readers/writer lock * added on <tt>NakReceiverWindow</tt> for finer grained locking.<br> * iii. Internal or externally provided retransmission scheduler thread.<br> * iv. Exponential backoff in time for retransmissions.<br> * * @author Bela Ban May 27 1999, May 2004 * @author John Georgiadis May 8 2001 */public class NakReceiverWindow { public interface Listener { void missingMessageReceived(long seqno, Message msg); } /** The big read/write lock */ private final ReadWriteLock lock=new WriterPreferenceReadWriteLock(); //private final ReadWriteLock lock=new NullReadWriteLock(); /** keep track of *next* seqno to remove and highest received */ private long head=0; private long tail=0; /** lowest seqno delivered so far */ private long lowest_seen=0; /** highest deliverable (or delivered) seqno so far */ private long highest_seen=0; /** TreeMap<Long,Message>. Maintains messages keyed by (sorted) sequence numbers */ private final TreeMap received_msgs=new TreeMap(); /** TreeMap<Long,Message>. Delivered (= seen by all members) messages. A remove() method causes a message to be moved from received_msgs to delivered_msgs. Message garbage collection will gradually remove elements in this map */ private final TreeMap delivered_msgs=new TreeMap(); /** * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message * around, and don't need to wait for garbage collection to remove them. */ private boolean discard_delivered_msgs=false; /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept, * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers */ private int max_xmit_buf_size=0; /** if not set, no retransmitter thread will be started. Useful if * protocols do their own retransmission (e.g PBCAST) */ private Retransmitter retransmitter=null; private Listener listener=null; protected static final Log log=LogFactory.getLog(NakReceiverWindow.class); /** * Creates a new instance with the given retransmit command * * @param sender The sender associated with this instance * @param cmd The command used to retransmit a missing message, will * be invoked by the table. If null, the retransmit thread will not be * started * @param start_seqno The first sequence number to be received * @param sched the external scheduler to use for retransmission * requests of missing msgs. If it's not provided or is null, an internal * one is created */ public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long start_seqno, TimeScheduler sched) { head=start_seqno; tail=head; if(cmd != null) retransmitter=sched == null ? new Retransmitter(sender, cmd) : new Retransmitter(sender, cmd, sched); } /** * Creates a new instance with the given retransmit command * * @param sender The sender associated with this instance * @param cmd The command used to retransmit a missing message, will * be invoked by the table. If null, the retransmit thread will not be * started * @param start_seqno The first sequence number to be received */ public NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long start_seqno) { this(sender, cmd, start_seqno, null); } /** * Creates a new instance without a retransmission thread * * @param sender The sender associated with this instance * @param start_seqno The first sequence number to be received */ public NakReceiverWindow(Address sender, long start_seqno) { this(sender, null, start_seqno); } public void setRetransmitTimeouts(long[] timeouts) { if(retransmitter != null) retransmitter.setRetransmitTimeouts(timeouts); } public void setDiscardDeliveredMessages(boolean flag) { this.discard_delivered_msgs=flag; } public int getMaxXmitBufSize() { return max_xmit_buf_size; } public void setMaxXmitBufSize(int max_xmit_buf_size) { this.max_xmit_buf_size=max_xmit_buf_size; } public void setListener(Listener l) { this.listener=l; } /** * Adds a message according to its sequence number (ordered). * <p> * Variables <code>head</code> and <code>tail</code> mark the start and * end of the messages received, but not delivered yet. When a message is * received, if its seqno is smaller than <code>head</code>, it is * discarded (already received). If it is bigger than <code>tail</code>, * we advance <code>tail</code> and add empty elements. If it is between * <code>head</code> and <code>tail</code>, we set the corresponding * missing (or already present) element. If it is equal to * <code>tail</code>, we advance the latter by 1 and add the message * (default case). */ public void add(long seqno, Message msg) { long old_tail; try { lock.writeLock().acquire(); try { old_tail=tail; if(seqno < head) { if(log.isTraceEnabled()) { StringBuffer sb=new StringBuffer("seqno "); sb.append(seqno).append(" is smaller than ").append(head).append("); discarding message"); log.trace(sb.toString()); } return; } // add at end (regular expected msg) if(seqno == tail) { received_msgs.put(new Long(seqno), msg); tail++; if(highest_seen+2 == tail) { highest_seen++; } else { updateHighestSeen(); } // highest_seen=seqno; } // gap detected // i. add placeholders, creating gaps // ii. add real msg // iii. tell retransmitter to retrieve missing msgs else if(seqno > tail) { for(long i=tail; i < seqno; i++) { received_msgs.put(new Long(i), null); // XmitEntry xmit_entry=new XmitEntry(); //xmits.put(new Long(i), xmit_entry); tail++; } received_msgs.put(new Long(seqno), msg); tail=seqno + 1; if(retransmitter != null) { retransmitter.add(old_tail, seqno - 1); } } else if(seqno < tail) { // finally received missing message if(log.isTraceEnabled()) { log.trace(new StringBuffer("added missing msg ").append(msg.getSrc()).append('#').append(seqno)); } if(listener != null) { try {listener.missingMessageReceived(seqno, msg);} catch(Throwable t) {} } Object val=received_msgs.get(new Long(seqno)); if(val == null) { // only set message if not yet received (bela July 23 2003) received_msgs.put(new Long(seqno), msg); if(highest_seen +1 == seqno || seqno == head) updateHighestSeen(); //XmitEntry xmit_entry=(XmitEntry)xmits.get(new Long(seqno)); //if(xmit_entry != null) // xmit_entry.received=System.currentTimeMillis(); //long xmit_diff=xmit_entry == null? -1 : xmit_entry.received - xmit_entry.created; //NAKACK.addXmitResponse(msg.getSrc(), seqno); if(retransmitter != null) retransmitter.remove(seqno); } } updateLowestSeen(); } finally { lock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring write lock", e); } } /** Start from the current sequence number and set highest_seen until we find a gap (null value in the entry) */ void updateHighestSeen() { SortedMap map=received_msgs.tailMap(new Long(highest_seen)); Map.Entry entry; for(Iterator it=map.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); if(entry.getValue() != null) highest_seen=((Long)entry.getKey()).longValue(); else break; } } public Message remove() { Message retval=null; Long key; boolean bounded_buffer_enabled=max_xmit_buf_size > 0; try { lock.writeLock().acquire(); try { while(received_msgs.size() > 0) { key=(Long)received_msgs.firstKey(); retval=(Message)received_msgs.get(key); if(retval != null) { // message exists and is ready for delivery received_msgs.remove(key); // move from received_msgs to ... if(discard_delivered_msgs == false) { delivered_msgs.put(key, retval); // delivered_msgs } head++; // is removed from retransmitter somewhere else (when missing message is received) return retval; } else { // message has not yet been received (gap in the message sequence stream) if(bounded_buffer_enabled && received_msgs.size() > max_xmit_buf_size) { received_msgs.remove(key); // move from received_msgs to ... head++; retransmitter.remove(key.longValue()); } else { break; } } } return retval; } finally { lock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring write lock", e); return null; } } /** * Delete all messages <= seqno (they are stable, that is, have been * received at all members). Stop when a number > seqno is encountered * (all messages are ordered on seqnos). */ public void stable(long seqno) { try { lock.writeLock().acquire(); try { // we need to remove all seqnos *including* seqno: because headMap() *excludes* seqno, we // simply increment it, so we have to correct behavior SortedMap m=delivered_msgs.headMap(new Long(seqno +1)); if(m.size() > 0) lowest_seen=Math.max(lowest_seen, ((Long)m.lastKey()).longValue()); m.clear(); // removes entries from delivered_msgs } finally { lock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring write lock", e); } } /** * Reset the retransmitter and the nak window<br> */ public void reset() { try { lock.writeLock().acquire(); try { if(retransmitter != null) retransmitter.reset(); _reset(); } finally { lock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring write lock", e); } } /** * Stop the retransmitter and reset the nak window<br> */ public void destroy() { try { lock.writeLock().acquire(); try { if(retransmitter != null) retransmitter.stop(); _reset(); } finally { lock.writeLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring write lock", e); } } /** * @return the highest sequence number of a message consumed by the * application (by <code>remove()</code>) */ public long getHighestDelivered() { try { lock.readLock().acquire(); try { return (Math.max(head - 1, -1)); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return -1; } } /** * @return the lowest sequence number of a message that has been * delivered or is a candidate for delivery (by the next call to * <code>remove()</code>) */ public long getLowestSeen() { try { lock.readLock().acquire(); try { return (lowest_seen); } finally { lock.readLock().release(); } } catch(InterruptedException e) { log.error("failed acquiring read lock", e); return -1; } } /** * Returns the highest deliverable seqno; e.g., for 1,2,3,5,6 it would * be 3. *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -