⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nakack.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
// $Id: NAKACK.java,v 1.81 2006/10/27 07:02:00 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.NakReceiverWindow;import org.jgroups.stack.Protocol;import org.jgroups.stack.Retransmitter;import org.jgroups.util.*;import java.io.IOException;import java.util.*;/** * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno). * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10 * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member. * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use * vsync. * * @author Bela Ban */public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, NakReceiverWindow.Listener {    private long[]        retransmit_timeout={600, 1200, 2400, 4800}; // time(s) to wait before requesting retransmission    private boolean       is_server=false;    private Address       local_addr=null;    private final Vector  members=new Vector(11);    private View          view;    private long          seqno=-1;                                  // current message sequence number (starts with 0)    private long          max_xmit_size=8192;                        // max size of a retransmit message (otherwise send multiple)    private int           gc_lag=20;                                 // number of msgs garbage collection lags behind    /**     * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a     * message, the sender only retransmits once.     */    private boolean use_mcast_xmit=true;    /**     * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be     * set to false     */    private boolean xmit_from_random_member=false;    /**     * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered     * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where     * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never     * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message     * around, and don't need to wait for garbage collection to remove them.     */    private boolean discard_delivered_msgs=false;    /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept,     * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers     */    private int max_xmit_buf_size=0;    /**     * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term     * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established)     */    private final HashMap received_msgs=new HashMap(11);    /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */    private final TreeMap sent_msgs=new TreeMap();    private boolean leaving=false;    private boolean started=false;    private TimeScheduler timer=null;    private static final String name="NAKACK";    private long xmit_reqs_received;    private long xmit_reqs_sent;    private long xmit_rsps_received;    private long xmit_rsps_sent;    private long missing_msgs_received;    /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */    private HashMap sent=new HashMap();    /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */    private HashMap received=new HashMap();    private int stats_list_size=20;    /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */    private BoundedList receive_history;    /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */    private BoundedList send_history;    public NAKACK() {    }    public String getName() {        return name;    }    public long getXmitRequestsReceived() {return xmit_reqs_received;}    public long getXmitRequestsSent() {return xmit_reqs_sent;}    public long getXmitResponsesReceived() {return xmit_rsps_received;}    public long getXmitResponsesSent() {return xmit_rsps_sent;}    public long getMissingMessagesReceived() {return missing_msgs_received;}    public int getPendingRetransmissionRequests() {        int num=0;        NakReceiverWindow win;        synchronized(received_msgs) {            for(Iterator it=received_msgs.values().iterator(); it.hasNext();) {                win=(NakReceiverWindow)it.next();                num+=win.size();            }        }        return num;    }    public int getSentTableSize() {        int size;        synchronized(sent_msgs) {            size=sent_msgs.size();        }        return size;    }    public int getReceivedTableSize() {        int ret=0;        NakReceiverWindow win;        Set s=new LinkedHashSet(received_msgs.values());        for(Iterator it=s.iterator(); it.hasNext();) {            win=(NakReceiverWindow)it.next();            ret+=win.size();        }        return ret;    }    public void resetStats() {        xmit_reqs_received=xmit_reqs_sent=xmit_rsps_received=xmit_rsps_sent=missing_msgs_received=0;        sent.clear();        received.clear();        if(receive_history !=null)            receive_history.removeAll();        if(send_history != null)            send_history.removeAll();    }    public void init() throws Exception {        if(stats) {            send_history=new BoundedList(stats_list_size);            receive_history=new BoundedList(stats_list_size);        }    }    public int getGcLag() {        return gc_lag;    }    public void setGcLag(int gc_lag) {        this.gc_lag=gc_lag;    }    public boolean isUseMcastXmit() {        return use_mcast_xmit;    }    public void setUseMcastXmit(boolean use_mcast_xmit) {        this.use_mcast_xmit=use_mcast_xmit;    }    public boolean isXmitFromRandomMember() {        return xmit_from_random_member;    }    public void setXmitFromRandomMember(boolean xmit_from_random_member) {        this.xmit_from_random_member=xmit_from_random_member;    }    public boolean isDiscardDeliveredMsgs() {        return discard_delivered_msgs;    }    public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) {        this.discard_delivered_msgs=discard_delivered_msgs;    }    public int getMaxXmitBufSize() {        return max_xmit_buf_size;    }    public void setMaxXmitBufSize(int max_xmit_buf_size) {        this.max_xmit_buf_size=max_xmit_buf_size;    }    public long getMaxXmitSize() {        return max_xmit_size;    }    public void setMaxXmitSize(long max_xmit_size) {        this.max_xmit_size=max_xmit_size;    }    public boolean setProperties(Properties props) {        String str;        long[] tmp;        super.setProperties(props);        str=props.getProperty("retransmit_timeout");        if(str != null) {            tmp=Util.parseCommaDelimitedLongs(str);            props.remove("retransmit_timeout");            if(tmp != null && tmp.length > 0) {                retransmit_timeout=tmp;            }        }        str=props.getProperty("gc_lag");        if(str != null) {            gc_lag=Integer.parseInt(str);            if(gc_lag < 0) {                log.error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0");            }            props.remove("gc_lag");        }        str=props.getProperty("max_xmit_size");        if(str != null) {            max_xmit_size=Long.parseLong(str);            props.remove("max_xmit_size");        }        str=props.getProperty("use_mcast_xmit");        if(str != null) {            use_mcast_xmit=Boolean.valueOf(str).booleanValue();            props.remove("use_mcast_xmit");        }        str=props.getProperty("discard_delivered_msgs");        if(str != null) {            discard_delivered_msgs=Boolean.valueOf(str).booleanValue();            props.remove("discard_delivered_msgs");        }        str=props.getProperty("xmit_from_random_member");        if(str != null) {            xmit_from_random_member=Boolean.valueOf(str).booleanValue();            props.remove("xmit_from_random_member");        }        str=props.getProperty("max_xmit_buf_size");        if(str != null) {            max_xmit_buf_size=Integer.parseInt(str);            props.remove("max_xmit_buf_size");        }        str=props.getProperty("stats_list_size");        if(str != null) {            stats_list_size=Integer.parseInt(str);            props.remove("stats_list_size");        }        if(xmit_from_random_member) {            if(discard_delivered_msgs) {                discard_delivered_msgs=false;                log.warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false");            }        }        if(props.size() > 0) {            log.error("NAKACK.setProperties(): these properties are not recognized: " + props);            return false;        }        return true;    }    public Map dumpStats() {        Map retval=super.dumpStats();        if(retval == null)            retval=new HashMap();        retval.put("xmit_reqs_received", new Long(xmit_reqs_received));        retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent));        retval.put("xmit_rsps_received", new Long(xmit_rsps_received));        retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent));        retval.put("missing_msgs_received", new Long(missing_msgs_received));        retval.put("sent_msgs", printSentMsgs());        StringBuffer sb=new StringBuffer();        Map.Entry entry;        Address addr;        Object w;        synchronized(received_msgs) {            for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                addr=(Address)entry.getKey();                w=entry.getValue();                sb.append(addr).append(": ").append(w.toString()).append('\n');            }        }        retval.put("received_msgs", sb.toString());        return retval;            }    public String printStats() {        Map.Entry entry;        Object key, val;        StringBuffer sb=new StringBuffer();        sb.append("sent:\n");        for(Iterator it=sent.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            key=entry.getKey();            if(key == null) key="<mcast dest>";            val=entry.getValue();            sb.append(key).append(": ").append(val).append("\n");        }        sb.append("\nreceived:\n");        for(Iterator it=received.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            key=entry.getKey();            val=entry.getValue();            sb.append(key).append(": ").append(val).append("\n");        }        sb.append("\nXMIT_REQS sent:\n");        XmitRequest tmp;        for(Enumeration en=send_history.elements(); en.hasMoreElements();) {            tmp=(XmitRequest)en.nextElement();            sb.append(tmp).append("\n");        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -