⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 retransmitter.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: Retransmitter.java,v 1.10 2005/11/03 11:42:59 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.util.*;/** * Maintains a pool of sequence numbers of messages that need to be retransmitted. Messages * are aged and retransmission requests sent according to age (linear backoff used). If a * TimeScheduler instance is given to the constructor, it will be used, otherwise Reransmitter * will create its own. The retransmit timeouts have to be set first thing after creating an instance. * The <code>add()</code> method adds a range of sequence numbers of messages to be retransmitted. The * <code>remove()</code> method removes a sequence number again, cancelling retransmission requests for it. * Whenever a message needs to be retransmitted, the <code>RetransmitCommand.retransmit()</code> method is called. * It can be used e.g. by an ack-based scheme (e.g. AckSenderWindow) to retransmit a message to the receiver, or * by a nak-based scheme to send a retransmission request to the sender of the missing message. * * @author John Giorgiadis * @author Bela Ban * @version $Revision: 1.10 $ */public class Retransmitter {    private static final long SEC=1000;    /** Default retransmit intervals (ms) - exponential approx. */    private static long[] RETRANSMIT_TIMEOUTS={2 * SEC, 3 * SEC, 5 * SEC, 8 * SEC};    /** Default retransmit thread suspend timeout (ms) */    private static final long SUSPEND_TIMEOUT=2000;    private Address              sender=null;    private final LinkedList     msgs=new LinkedList();  // List<Entry> of elements to be retransmitted    private RetransmitCommand    cmd=null;    private boolean              retransmitter_owned;    private TimeScheduler        retransmitter=null;    protected static final Log   log=LogFactory.getLog(Retransmitter.class);    /** Retransmit command (see Gamma et al.) used to retrieve missing messages */    public interface RetransmitCommand {        /**         * Get the missing messages between sequence numbers         * <code>first_seqno</code> and <code>last_seqno</code>. This can either be done by sending a         * retransmit message to destination <code>sender</code> (nak-based scheme), or by         * retransmitting the missing message(s) to <code>sender</code> (ack-based scheme).         * @param first_seqno The sequence number of the first missing message         * @param last_seqno  The sequence number of the last missing message         * @param sender The destination of the member to which the retransmit request will be sent         *               (nak-based scheme), or to which the message will be retransmitted (ack-based scheme).         */        void retransmit(long first_seqno, long last_seqno, Address sender);    }    /**     * Create a new Retransmitter associated with the given sender address     * @param sender the address from which retransmissions are expected or to which retransmissions are sent     * @param cmd the retransmission callback reference     * @param sched retransmissions scheduler     */    public Retransmitter(Address sender, RetransmitCommand cmd, TimeScheduler sched) {        init(sender, cmd, sched, false);    }    /**     * Create a new Retransmitter associated with the given sender address     * @param sender the address from which retransmissions are expected or to which retransmissions are sent     * @param cmd the retransmission callback reference     */    public Retransmitter(Address sender, RetransmitCommand cmd) {        init(sender, cmd, new TimeScheduler(SUSPEND_TIMEOUT), true);    }    public void setRetransmitTimeouts(long[] timeouts) {        if(timeouts != null)            RETRANSMIT_TIMEOUTS=timeouts;    }    /**     * Add the given range [first_seqno, last_seqno] in the list of     * entries eligible for retransmission. If first_seqno > last_seqno,     * then the range [last_seqno, first_seqno] is added instead     * <p>     * If retransmitter thread is suspended, wake it up     */    public void add(long first_seqno, long last_seqno) {        Entry e;        if(first_seqno > last_seqno) {            long tmp=first_seqno;            first_seqno=last_seqno;            last_seqno=tmp;        }        synchronized(msgs) {            e=new Entry(first_seqno, last_seqno, RETRANSMIT_TIMEOUTS);            msgs.add(e);            retransmitter.add(e);        }    }    /**     * Remove the given sequence number from the list of seqnos eligible     * for retransmission. If there are no more seqno intervals in the     * respective entry, cancel the entry from the retransmission     * scheduler and remove it from the pending entries     */    public void remove(long seqno) {        Entry e;        synchronized(msgs) {            for(ListIterator it=msgs.listIterator(); it.hasNext();) {                e=(Entry)it.next();                if(seqno < e.low || seqno > e.high) continue;                e.remove(seqno);                if(e.low > e.high) {                    e.cancel();                    it.remove();                }                break;            }        }    }    /**     * Reset the retransmitter: clear all msgs and cancel all the     * respective tasks     */    public void reset() {        Entry entry;        synchronized(msgs) {            for(ListIterator it=msgs.listIterator(); it.hasNext();) {                entry=(Entry)it.next();                entry.cancel();            }            msgs.clear();        }    }    /**     * Stop the rentransmition and clear all pending msgs.     * <p>     * If this retransmitter has been provided  an externally managed     * scheduler, then just clear all msgs and the associated tasks, else     * stop the scheduler. In this case the method blocks until the     * scheduler's thread is dead. Only the owner of the scheduler should     * stop it.     */    public void stop() {        Entry entry;        // i. If retransmitter is owned, stop it else cancel all tasks        // ii. Clear all pending msgs        synchronized(msgs) {            if(retransmitter_owned) {                try {                    retransmitter.stop();                }                catch(InterruptedException ex) {                    if(log.isErrorEnabled()) log.error("failed stopping retransmitter", ex);                }            }            else {                for(ListIterator it=msgs.listIterator(); it.hasNext();) {                    entry=(Entry)it.next();                    entry.cancel();                }            }            msgs.clear();        }    }    public String toString() {        synchronized(msgs) {            int size=size();            StringBuffer sb=new StringBuffer();            sb.append(size).append(" messages to retransmit: ").append(msgs);            return sb.toString();        }    }    public int size() {        int size=0;        Entry entry;        synchronized(msgs) {            for(Iterator it=msgs.iterator(); it.hasNext();) {                entry=(Retransmitter.Entry)it.next();                size+=entry.size();            }        }        return size;    }    /* ------------------------------- Private Methods -------------------------------------- */    /**     * Init this object     *     * @param sender the address from which retransmissions are expected     * @param cmd the retransmission callback reference     * @param sched retransmissions scheduler     * @param sched_owned whether the scheduler parameter is owned by this     * object or is externally provided     */    private void init(Address sender, RetransmitCommand cmd, TimeScheduler sched, boolean sched_owned) {        this.sender=sender;        this.cmd=cmd;        retransmitter_owned=sched_owned;        retransmitter=sched;    }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -