⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ackmcastsenderwindow.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: AckMcastSenderWindow.java,v 1.10 2006/01/14 14:00:42 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.util.TimeScheduler;import java.io.PrintWriter;import java.io.StringWriter;import java.util.*;/** * Keeps track of ACKs from receivers for each message. When a new message is * sent, it is tagged with a sequence number and the receiver set (set of * members to which the message is sent) and added to a hashtable * (key = sequence number, val = message + receiver set). Each incoming ACK * is noted and when all ACKs for a specific sequence number haven been * received, the corresponding entry is removed from the hashtable. A * retransmission thread periodically re-sends the message point-to-point to * all receivers from which no ACKs have been received yet. A view change or * suspect message causes the corresponding non-existing receivers to be * removed from the hashtable. * <p> * This class may need flow control in order to avoid needless * retransmissions because of timeouts. * * @author Bela Ban June 9 1999 * @author John Georgiadis May 8 2001 * @version $Revision: 1.10 $ */public class AckMcastSenderWindow {    /**     * Called by retransmitter thread whenever a message needs to be re-sent     * to a destination. <code>dest</code> has to be set in the     * <code>dst</code> field of <code>msg</code>, as the latter was sent     * multicast, but now we are sending a unicast message. Message has to be     * copied before sending it (as headers will be appended and therefore     * the message changed!).     */    public interface RetransmitCommand {	/**	 * Retranmit the given msg	 *	 * @param seqno the sequence number associated with the message	 * @param msg the msg to retransmit (it should be a copy!)	 * @param dest the msg destination	 */	void retransmit(long seqno, Message msg, Address dest);    }    /**     * The retransmit task executed by the scheduler in regular intervals     */    private static abstract class Task implements TimeScheduler.Task {	private final Interval intervals;	private boolean  cancelled;	protected Task(long[] intervals) {	    this.intervals = new Interval(intervals);	    this.cancelled = false;	}	public long nextInterval() { return(intervals.next()); }	public void cancel() { cancelled = true; }	public boolean cancelled() { return(cancelled); }    }    /**     * The entry associated with a pending msg     */    private class Entry extends Task {	/** The msg sequence number */	public final long seqno;	/** The msg to retransmit */	public Message msg = null;	/** destination addr -> boolean (true = received, false = not) */	public final Hashtable senders = new Hashtable();	/** How many destinations have received the msg */	public int num_received = 0;	public Entry(long seqno, Message msg, Vector dests, long[] intervals) {	    super(intervals);	    this.seqno = seqno;	    this.msg   = msg;	    for (int i = 0; i < dests.size(); i++)		senders.put(dests.elementAt(i), Boolean.FALSE);	}	boolean allReceived() {	    return(num_received >= senders.size());	}	/** Retransmit this entry */	public void run() { _retransmit(this); }	public String toString() {	    StringBuffer buff = new StringBuffer();        buff.append("num_received = ").append(num_received).append(", received msgs = ").append(senders);	    return(buff.toString());	}    }        private static final long SEC = 1000;    /** Default retransmit intervals (ms) - exponential approx. */    private static final long[] RETRANSMIT_TIMEOUTS = {	2*SEC,	3*SEC,	5*SEC,	8*SEC};    /** Default retransmit thread suspend timeout (ms) */    private static final long SUSPEND_TIMEOUT = 2000;    protected static final Log log=LogFactory.getLog(AckMcastSenderWindow.class);    // Msg tables related    /** Table of pending msgs: seqno -> Entry */    private final Hashtable  msgs = new Hashtable();    /** List of recently suspected members. Used to cease retransmission to suspected members */    private final LinkedList suspects=new LinkedList();    /** Max number in suspects list */    private static final int max_suspects=20;    /**     * List of acknowledged msgs since the last call to     * <code>getStableMessages()</code>     */    private final Vector stable_msgs = new Vector();    /** Whether a call to <code>waitUntilAcksReceived()</code> is still active */    private boolean waiting = false;    // Retransmission thread related    /** Whether retransmitter is externally provided or owned by this object */    private boolean retransmitter_owned;    /** The retransmission scheduler */    private TimeScheduler retransmitter = null;    /** Retransmission intervals */    private long[] retransmit_intervals;    /** The callback object for retransmission */    private RetransmitCommand cmd = null;    /**     * Convert exception stack trace to string     */    private static String _toString(Throwable ex) {	StringWriter sw = new StringWriter();	PrintWriter  pw = new PrintWriter(sw);	ex.printStackTrace(pw);	return(sw.toString());    }    /**     * @param entry the record associated with the msg to retransmit. It     * contains the list of receivers that haven't yet ack reception     */    private void _retransmit(Entry entry) {	Address sender;	boolean received;	synchronized(entry) {	    for(Enumeration e = entry.senders.keys(); e.hasMoreElements();) {		sender   = (Address)e.nextElement();		received = ((Boolean)entry.senders.get(sender)).booleanValue();		if (!received) {		    if(suspects.contains(sender)) {			    if(log.isWarnEnabled()) log.warn("removing " + sender +				       " from retransmit list as it is in the suspect list");			remove(sender);			continue;		    }			if(log.isInfoEnabled()) log.info("--> retransmitting msg #" +				   entry.seqno + " to " + sender);		    cmd.retransmit(entry.seqno, entry.msg.copy(), sender);		}	    }	}    }    /**     * Setup this object's state     *     * @param cmd the callback object for retranmissions     * @param retransmit_intervals the interval between two consecutive     * retransmission attempts     * @param sched the external scheduler to use to schedule retransmissions     * @param sched_owned if true, the scheduler is owned by this object and     * can be started/stopped/destroyed. If false, the scheduler is shared     * among multiple objects and start()/stop() should not be called from     * within this object     *     * @throws IllegalArgumentException if <code>cmd</code> is null     */    private void init(RetransmitCommand cmd, long[] retransmit_intervals,		      TimeScheduler sched, boolean sched_owned) {	if (cmd == null) {	    if(log.isErrorEnabled()) log.error("command is null. Cannot retransmit " + "messages !");	    throw new IllegalArgumentException("cmd");	}	retransmitter_owned = sched_owned;	retransmitter = sched;	this.retransmit_intervals = retransmit_intervals;	this.cmd = cmd;	start();    }    /**     * Create and <b>start</b> the retransmitter     *     * @param cmd the callback object for retranmissions     * @param retransmit_intervals the interval between two consecutive     * retransmission attempts     * @param sched the external scheduler to use to schedule retransmissions     *     * @throws IllegalArgumentException if <code>cmd</code> is null     */    public AckMcastSenderWindow(RetransmitCommand cmd,				long[] retransmit_intervals, TimeScheduler sched) {	init(cmd, retransmit_intervals, sched, false);    }    /**     * Create and <b>start</b> the retransmitter     *     * @param cmd the callback object for retranmissions     * @param sched the external scheduler to use to schedule retransmissions     *     * @throws IllegalArgumentException if <code>cmd</code> is null     */    public AckMcastSenderWindow(RetransmitCommand cmd, TimeScheduler sched) {	init(cmd, RETRANSMIT_TIMEOUTS, sched, false);    }    /**     * Create and <b>start</b> the retransmitter     *     * @param cmd the callback object for retranmissions     * @param retransmit_intervals the interval between two consecutive     * retransmission attempts     *     * @throws IllegalArgumentException if <code>cmd</code> is null     */    public AckMcastSenderWindow(RetransmitCommand cmd, long[] retransmit_intervals) {	init(cmd, retransmit_intervals, new TimeScheduler(SUSPEND_TIMEOUT), true);    }    /**     * Create and <b>start</b> the retransmitter     *     * @param cmd the callback object for retranmissions     *     * @throws IllegalArgumentException if <code>cmd</code> is null     */    public AckMcastSenderWindow(RetransmitCommand cmd) {	this(cmd, RETRANSMIT_TIMEOUTS);    }    /**     * Adds a new message to the hash table.     *     * @param seqno The sequence number associated with the message     * @param msg The message (should be a copy!)     * @param receivers The set of addresses to which the message was sent     * and from which consequently an ACK is expected     */    public void add(long seqno, Message msg, Vector receivers) {	Entry e;	if (waiting) return;	if (receivers.size() == 0) return;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -