📄 retransmitter.java
字号:
// $Id: Retransmitter.java,v 1.10 2005/11/03 11:42:59 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.util.*;/** * Maintains a pool of sequence numbers of messages that need to be retransmitted. Messages * are aged and retransmission requests sent according to age (linear backoff used). If a * TimeScheduler instance is given to the constructor, it will be used, otherwise Reransmitter * will create its own. The retransmit timeouts have to be set first thing after creating an instance. * The <code>add()</code> method adds a range of sequence numbers of messages to be retransmitted. The * <code>remove()</code> method removes a sequence number again, cancelling retransmission requests for it. * Whenever a message needs to be retransmitted, the <code>RetransmitCommand.retransmit()</code> method is called. * It can be used e.g. by an ack-based scheme (e.g. AckSenderWindow) to retransmit a message to the receiver, or * by a nak-based scheme to send a retransmission request to the sender of the missing message. * * @author John Giorgiadis * @author Bela Ban * @version $Revision: 1.10 $ */public class Retransmitter { private static final long SEC=1000; /** Default retransmit intervals (ms) - exponential approx. */ private static long[] RETRANSMIT_TIMEOUTS={2 * SEC, 3 * SEC, 5 * SEC, 8 * SEC}; /** Default retransmit thread suspend timeout (ms) */ private static final long SUSPEND_TIMEOUT=2000; private Address sender=null; private final LinkedList msgs=new LinkedList(); // List<Entry> of elements to be retransmitted private RetransmitCommand cmd=null; private boolean retransmitter_owned; private TimeScheduler retransmitter=null; protected static final Log log=LogFactory.getLog(Retransmitter.class); /** Retransmit command (see Gamma et al.) used to retrieve missing messages */ public interface RetransmitCommand { /** * Get the missing messages between sequence numbers * <code>first_seqno</code> and <code>last_seqno</code>. This can either be done by sending a * retransmit message to destination <code>sender</code> (nak-based scheme), or by * retransmitting the missing message(s) to <code>sender</code> (ack-based scheme). * @param first_seqno The sequence number of the first missing message * @param last_seqno The sequence number of the last missing message * @param sender The destination of the member to which the retransmit request will be sent * (nak-based scheme), or to which the message will be retransmitted (ack-based scheme). */ void retransmit(long first_seqno, long last_seqno, Address sender); } /** * Create a new Retransmitter associated with the given sender address * @param sender the address from which retransmissions are expected or to which retransmissions are sent * @param cmd the retransmission callback reference * @param sched retransmissions scheduler */ public Retransmitter(Address sender, RetransmitCommand cmd, TimeScheduler sched) { init(sender, cmd, sched, false); } /** * Create a new Retransmitter associated with the given sender address * @param sender the address from which retransmissions are expected or to which retransmissions are sent * @param cmd the retransmission callback reference */ public Retransmitter(Address sender, RetransmitCommand cmd) { init(sender, cmd, new TimeScheduler(SUSPEND_TIMEOUT), true); } public void setRetransmitTimeouts(long[] timeouts) { if(timeouts != null) RETRANSMIT_TIMEOUTS=timeouts; } /** * Add the given range [first_seqno, last_seqno] in the list of * entries eligible for retransmission. If first_seqno > last_seqno, * then the range [last_seqno, first_seqno] is added instead * <p> * If retransmitter thread is suspended, wake it up */ public void add(long first_seqno, long last_seqno) { Entry e; if(first_seqno > last_seqno) { long tmp=first_seqno; first_seqno=last_seqno; last_seqno=tmp; } synchronized(msgs) { e=new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS); msgs.add(e); retransmitter.add(e); } } /** * Remove the given sequence number from the list of seqnos eligible * for retransmission. If there are no more seqno intervals in the * respective entry, cancel the entry from the retransmission * scheduler and remove it from the pending entries */ public void remove(long seqno) { Entry e; synchronized(msgs) { for(ListIterator it=msgs.listIterator(); it.hasNext();) { e=(Entry)it.next(); if(seqno < e.low || seqno > e.high) continue; e.remove(seqno); if(e.low > e.high) { e.cancel(); it.remove(); } break; } } } /** * Reset the retransmitter: clear all msgs and cancel all the * respective tasks */ public void reset() { Entry entry; synchronized(msgs) { for(ListIterator it=msgs.listIterator(); it.hasNext();) { entry=(Entry)it.next(); entry.cancel(); } msgs.clear(); } } /** * Stop the rentransmition and clear all pending msgs. * <p> * If this retransmitter has been provided an externally managed * scheduler, then just clear all msgs and the associated tasks, else * stop the scheduler. In this case the method blocks until the * scheduler's thread is dead. Only the owner of the scheduler should * stop it. */ public void stop() { Entry entry; // i. If retransmitter is owned, stop it else cancel all tasks // ii. Clear all pending msgs synchronized(msgs) { if(retransmitter_owned) { try { retransmitter.stop(); } catch(InterruptedException ex) { if(log.isErrorEnabled()) log.error("failed stopping retransmitter", ex); } } else { for(ListIterator it=msgs.listIterator(); it.hasNext();) { entry=(Entry)it.next(); entry.cancel(); } } msgs.clear(); } } public String toString() { synchronized(msgs) { int size=size(); StringBuffer sb=new StringBuffer(); sb.append(size).append(" messages to retransmit: ").append(msgs); return sb.toString(); } } public int size() { int size=0; Entry entry; synchronized(msgs) { for(Iterator it=msgs.iterator(); it.hasNext();) { entry=(Retransmitter.Entry)it.next(); size+=entry.size(); } } return size; } /* ------------------------------- Private Methods -------------------------------------- */ /** * Init this object * * @param sender the address from which retransmissions are expected * @param cmd the retransmission callback reference * @param sched retransmissions scheduler * @param sched_owned whether the scheduler parameter is owned by this * object or is externally provided */ private void init(Address sender, RetransmitCommand cmd, TimeScheduler sched, boolean sched_owned) { this.sender=sender; this.cmd=cmd; retransmitter_owned=sched_owned; retransmitter=sched; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -