📄 ackmcastsenderwindow.java
字号:
// $Id: AckMcastSenderWindow.java,v 1.10 2006/01/14 14:00:42 belaban Exp $package org.jgroups.stack;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.Message;import org.jgroups.util.TimeScheduler;import java.io.PrintWriter;import java.io.StringWriter;import java.util.*;/** * Keeps track of ACKs from receivers for each message. When a new message is * sent, it is tagged with a sequence number and the receiver set (set of * members to which the message is sent) and added to a hashtable * (key = sequence number, val = message + receiver set). Each incoming ACK * is noted and when all ACKs for a specific sequence number haven been * received, the corresponding entry is removed from the hashtable. A * retransmission thread periodically re-sends the message point-to-point to * all receivers from which no ACKs have been received yet. A view change or * suspect message causes the corresponding non-existing receivers to be * removed from the hashtable. * <p> * This class may need flow control in order to avoid needless * retransmissions because of timeouts. * * @author Bela Ban June 9 1999 * @author John Georgiadis May 8 2001 * @version $Revision: 1.10 $ */public class AckMcastSenderWindow { /** * Called by retransmitter thread whenever a message needs to be re-sent * to a destination. <code>dest</code> has to be set in the * <code>dst</code> field of <code>msg</code>, as the latter was sent * multicast, but now we are sending a unicast message. Message has to be * copied before sending it (as headers will be appended and therefore * the message changed!). */ public interface RetransmitCommand { /** * Retranmit the given msg * * @param seqno the sequence number associated with the message * @param msg the msg to retransmit (it should be a copy!) * @param dest the msg destination */ void retransmit(long seqno, Message msg, Address dest); } /** * The retransmit task executed by the scheduler in regular intervals */ private static abstract class Task implements TimeScheduler.Task { private final Interval intervals; private boolean cancelled; protected Task(long[] intervals) { this.intervals = new Interval(intervals); this.cancelled = false; } public long nextInterval() { return(intervals.next()); } public void cancel() { cancelled = true; } public boolean cancelled() { return(cancelled); } } /** * The entry associated with a pending msg */ private class Entry extends Task { /** The msg sequence number */ public final long seqno; /** The msg to retransmit */ public Message msg = null; /** destination addr -> boolean (true = received, false = not) */ public final Hashtable senders = new Hashtable(); /** How many destinations have received the msg */ public int num_received = 0; public Entry(long seqno, Message msg, Vector dests, long[] intervals) { super(intervals); this.seqno = seqno; this.msg = msg; for (int i = 0; i < dests.size(); i++) senders.put(dests.elementAt(i), Boolean.FALSE); } boolean allReceived() { return(num_received >= senders.size()); } /** Retransmit this entry */ public void run() { _retransmit(this); } public String toString() { StringBuffer buff = new StringBuffer(); buff.append("num_received = ").append(num_received).append(", received msgs = ").append(senders); return(buff.toString()); } } private static final long SEC = 1000; /** Default retransmit intervals (ms) - exponential approx. */ private static final long[] RETRANSMIT_TIMEOUTS = { 2*SEC, 3*SEC, 5*SEC, 8*SEC}; /** Default retransmit thread suspend timeout (ms) */ private static final long SUSPEND_TIMEOUT = 2000; protected static final Log log=LogFactory.getLog(AckMcastSenderWindow.class); // Msg tables related /** Table of pending msgs: seqno -> Entry */ private final Hashtable msgs = new Hashtable(); /** List of recently suspected members. Used to cease retransmission to suspected members */ private final LinkedList suspects=new LinkedList(); /** Max number in suspects list */ private static final int max_suspects=20; /** * List of acknowledged msgs since the last call to * <code>getStableMessages()</code> */ private final Vector stable_msgs = new Vector(); /** Whether a call to <code>waitUntilAcksReceived()</code> is still active */ private boolean waiting = false; // Retransmission thread related /** Whether retransmitter is externally provided or owned by this object */ private boolean retransmitter_owned; /** The retransmission scheduler */ private TimeScheduler retransmitter = null; /** Retransmission intervals */ private long[] retransmit_intervals; /** The callback object for retransmission */ private RetransmitCommand cmd = null; /** * Convert exception stack trace to string */ private static String _toString(Throwable ex) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); ex.printStackTrace(pw); return(sw.toString()); } /** * @param entry the record associated with the msg to retransmit. It * contains the list of receivers that haven't yet ack reception */ private void _retransmit(Entry entry) { Address sender; boolean received; synchronized(entry) { for(Enumeration e = entry.senders.keys(); e.hasMoreElements();) { sender = (Address)e.nextElement(); received = ((Boolean)entry.senders.get(sender)).booleanValue(); if (!received) { if(suspects.contains(sender)) { if(log.isWarnEnabled()) log.warn("removing " + sender + " from retransmit list as it is in the suspect list"); remove(sender); continue; } if(log.isInfoEnabled()) log.info("--> retransmitting msg #" + entry.seqno + " to " + sender); cmd.retransmit(entry.seqno, entry.msg.copy(), sender); } } } } /** * Setup this object's state * * @param cmd the callback object for retranmissions * @param retransmit_intervals the interval between two consecutive * retransmission attempts * @param sched the external scheduler to use to schedule retransmissions * @param sched_owned if true, the scheduler is owned by this object and * can be started/stopped/destroyed. If false, the scheduler is shared * among multiple objects and start()/stop() should not be called from * within this object * * @throws IllegalArgumentException if <code>cmd</code> is null */ private void init(RetransmitCommand cmd, long[] retransmit_intervals, TimeScheduler sched, boolean sched_owned) { if (cmd == null) { if(log.isErrorEnabled()) log.error("command is null. Cannot retransmit " + "messages !"); throw new IllegalArgumentException("cmd"); } retransmitter_owned = sched_owned; retransmitter = sched; this.retransmit_intervals = retransmit_intervals; this.cmd = cmd; start(); } /** * Create and <b>start</b> the retransmitter * * @param cmd the callback object for retranmissions * @param retransmit_intervals the interval between two consecutive * retransmission attempts * @param sched the external scheduler to use to schedule retransmissions * * @throws IllegalArgumentException if <code>cmd</code> is null */ public AckMcastSenderWindow(RetransmitCommand cmd, long[] retransmit_intervals, TimeScheduler sched) { init(cmd, retransmit_intervals, sched, false); } /** * Create and <b>start</b> the retransmitter * * @param cmd the callback object for retranmissions * @param sched the external scheduler to use to schedule retransmissions * * @throws IllegalArgumentException if <code>cmd</code> is null */ public AckMcastSenderWindow(RetransmitCommand cmd, TimeScheduler sched) { init(cmd, RETRANSMIT_TIMEOUTS, sched, false); } /** * Create and <b>start</b> the retransmitter * * @param cmd the callback object for retranmissions * @param retransmit_intervals the interval between two consecutive * retransmission attempts * * @throws IllegalArgumentException if <code>cmd</code> is null */ public AckMcastSenderWindow(RetransmitCommand cmd, long[] retransmit_intervals) { init(cmd, retransmit_intervals, new TimeScheduler(SUSPEND_TIMEOUT), true); } /** * Create and <b>start</b> the retransmitter * * @param cmd the callback object for retranmissions * * @throws IllegalArgumentException if <code>cmd</code> is null */ public AckMcastSenderWindow(RetransmitCommand cmd) { this(cmd, RETRANSMIT_TIMEOUTS); } /** * Adds a new message to the hash table. * * @param seqno The sequence number associated with the message * @param msg The message (should be a copy!) * @param receivers The set of addresses to which the message was sent * and from which consequently an ACK is expected */ public void add(long seqno, Message msg, Vector receivers) { Entry e; if (waiting) return; if (receivers.size() == 0) return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -