📄 nakack.java
字号:
// $Id: NAKACK.java,v 1.81 2006/10/27 07:02:00 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.NakReceiverWindow;import org.jgroups.stack.Protocol;import org.jgroups.stack.Retransmitter;import org.jgroups.util.*;import java.io.IOException;import java.util.*;/** * Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno). * Receivers deliver messages ordered according to seqno and request retransmission of missing messages. Retransmitted * messages are bundled into bigger ones, e.g. when getting an xmit request for messages 1-10, instead of sending 10 * unicast messages, we bundle all 10 messages into 1 and send it. However, since this protocol typically sits below * FRAG, we cannot count on FRAG to fragement/defragment the (possibly) large message into smaller ones. Therefore we * only bundle messages up to max_xmit_size bytes to prevent too large messages. For example, if the bundled message * size was a total of 34000 bytes, and max_xmit_size=16000, we'd send 3 messages: 2 16K and a 2K message. <em>Note that * max_xmit_size should be the same value as FRAG.frag_size (or smaller).</em><br/> Retransmit requests are always sent * to the sender. If the sender dies, and not everyone has received its messages, they will be lost. In the future, this * may be changed to have receivers store all messages, so that retransmit requests can be answered by any member. * Trivial to implement, but not done yet. For most apps, the default retransmit properties are sufficient, if not use * vsync. * * @author Bela Ban */public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, NakReceiverWindow.Listener { private long[] retransmit_timeout={600, 1200, 2400, 4800}; // time(s) to wait before requesting retransmission private boolean is_server=false; private Address local_addr=null; private final Vector members=new Vector(11); private View view; private long seqno=-1; // current message sequence number (starts with 0) private long max_xmit_size=8192; // max size of a retransmit message (otherwise send multiple) private int gc_lag=20; // number of msgs garbage collection lags behind /** * Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a * message, the sender only retransmits once. */ private boolean use_mcast_xmit=true; /** * Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be * set to false */ private boolean xmit_from_random_member=false; /** * Messages that have been received in order are sent up the stack (= delivered to the application). Delivered * messages are removed from NakReceiverWindow.received_msgs and moved to NakReceiverWindow.delivered_msgs, where * they are later garbage collected (by STABLE). Since we do retransmits only from sent messages, never * received or delivered messages, we can turn the moving to delivered_msgs off, so we don't keep the message * around, and don't need to wait for garbage collection to remove them. */ private boolean discard_delivered_msgs=false; /** If value is > 0, the retransmit buffer is bounded: only the max_xmit_buf_size latest messages are kept, * older ones are discarded when the buffer size is exceeded. A value <= 0 means unbounded buffers */ private int max_xmit_buf_size=0; /** * Hashtable<Address,NakReceiverWindow>. Stores received messages (keyed by sender). Note that this is no long term * storage; messages are just stored until they can be delivered (ie., until the correct FIFO order is established) */ private final HashMap received_msgs=new HashMap(11); /** TreeMap<Long,Message>. Map of messages sent by me (keyed and sorted on sequence number) */ private final TreeMap sent_msgs=new TreeMap(); private boolean leaving=false; private boolean started=false; private TimeScheduler timer=null; private static final String name="NAKACK"; private long xmit_reqs_received; private long xmit_reqs_sent; private long xmit_rsps_received; private long xmit_rsps_sent; private long missing_msgs_received; /** Captures stats on XMIT_REQS, XMIT_RSPS per sender */ private HashMap sent=new HashMap(); /** Captures stats on XMIT_REQS, XMIT_RSPS per receiver */ private HashMap received=new HashMap(); private int stats_list_size=20; /** BoundedList<XmitRequest>. Keeps track of the last stats_list_size XMIT requests */ private BoundedList receive_history; /** BoundedList<MissingMessage>. Keeps track of the last stats_list_size missing messages received */ private BoundedList send_history; public NAKACK() { } public String getName() { return name; } public long getXmitRequestsReceived() {return xmit_reqs_received;} public long getXmitRequestsSent() {return xmit_reqs_sent;} public long getXmitResponsesReceived() {return xmit_rsps_received;} public long getXmitResponsesSent() {return xmit_rsps_sent;} public long getMissingMessagesReceived() {return missing_msgs_received;} public int getPendingRetransmissionRequests() { int num=0; NakReceiverWindow win; synchronized(received_msgs) { for(Iterator it=received_msgs.values().iterator(); it.hasNext();) { win=(NakReceiverWindow)it.next(); num+=win.size(); } } return num; } public int getSentTableSize() { int size; synchronized(sent_msgs) { size=sent_msgs.size(); } return size; } public int getReceivedTableSize() { int ret=0; NakReceiverWindow win; Set s=new LinkedHashSet(received_msgs.values()); for(Iterator it=s.iterator(); it.hasNext();) { win=(NakReceiverWindow)it.next(); ret+=win.size(); } return ret; } public void resetStats() { xmit_reqs_received=xmit_reqs_sent=xmit_rsps_received=xmit_rsps_sent=missing_msgs_received=0; sent.clear(); received.clear(); if(receive_history !=null) receive_history.removeAll(); if(send_history != null) send_history.removeAll(); } public void init() throws Exception { if(stats) { send_history=new BoundedList(stats_list_size); receive_history=new BoundedList(stats_list_size); } } public int getGcLag() { return gc_lag; } public void setGcLag(int gc_lag) { this.gc_lag=gc_lag; } public boolean isUseMcastXmit() { return use_mcast_xmit; } public void setUseMcastXmit(boolean use_mcast_xmit) { this.use_mcast_xmit=use_mcast_xmit; } public boolean isXmitFromRandomMember() { return xmit_from_random_member; } public void setXmitFromRandomMember(boolean xmit_from_random_member) { this.xmit_from_random_member=xmit_from_random_member; } public boolean isDiscardDeliveredMsgs() { return discard_delivered_msgs; } public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs) { this.discard_delivered_msgs=discard_delivered_msgs; } public int getMaxXmitBufSize() { return max_xmit_buf_size; } public void setMaxXmitBufSize(int max_xmit_buf_size) { this.max_xmit_buf_size=max_xmit_buf_size; } public long getMaxXmitSize() { return max_xmit_size; } public void setMaxXmitSize(long max_xmit_size) { this.max_xmit_size=max_xmit_size; } public boolean setProperties(Properties props) { String str; long[] tmp; super.setProperties(props); str=props.getProperty("retransmit_timeout"); if(str != null) { tmp=Util.parseCommaDelimitedLongs(str); props.remove("retransmit_timeout"); if(tmp != null && tmp.length > 0) { retransmit_timeout=tmp; } } str=props.getProperty("gc_lag"); if(str != null) { gc_lag=Integer.parseInt(str); if(gc_lag < 0) { log.error("NAKACK.setProperties(): gc_lag cannot be negative, setting it to 0"); } props.remove("gc_lag"); } str=props.getProperty("max_xmit_size"); if(str != null) { max_xmit_size=Long.parseLong(str); props.remove("max_xmit_size"); } str=props.getProperty("use_mcast_xmit"); if(str != null) { use_mcast_xmit=Boolean.valueOf(str).booleanValue(); props.remove("use_mcast_xmit"); } str=props.getProperty("discard_delivered_msgs"); if(str != null) { discard_delivered_msgs=Boolean.valueOf(str).booleanValue(); props.remove("discard_delivered_msgs"); } str=props.getProperty("xmit_from_random_member"); if(str != null) { xmit_from_random_member=Boolean.valueOf(str).booleanValue(); props.remove("xmit_from_random_member"); } str=props.getProperty("max_xmit_buf_size"); if(str != null) { max_xmit_buf_size=Integer.parseInt(str); props.remove("max_xmit_buf_size"); } str=props.getProperty("stats_list_size"); if(str != null) { stats_list_size=Integer.parseInt(str); props.remove("stats_list_size"); } if(xmit_from_random_member) { if(discard_delivered_msgs) { discard_delivered_msgs=false; log.warn("xmit_from_random_member set to true: changed discard_delivered_msgs to false"); } } if(props.size() > 0) { log.error("NAKACK.setProperties(): these properties are not recognized: " + props); return false; } return true; } public Map dumpStats() { Map retval=super.dumpStats(); if(retval == null) retval=new HashMap(); retval.put("xmit_reqs_received", new Long(xmit_reqs_received)); retval.put("xmit_reqs_sent", new Long(xmit_reqs_sent)); retval.put("xmit_rsps_received", new Long(xmit_rsps_received)); retval.put("xmit_rsps_sent", new Long(xmit_rsps_sent)); retval.put("missing_msgs_received", new Long(missing_msgs_received)); retval.put("sent_msgs", printSentMsgs()); StringBuffer sb=new StringBuffer(); Map.Entry entry; Address addr; Object w; synchronized(received_msgs) { for(Iterator it=received_msgs.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); addr=(Address)entry.getKey(); w=entry.getValue(); sb.append(addr).append(": ").append(w.toString()).append('\n'); } } retval.put("received_msgs", sb.toString()); return retval; } public String printStats() { Map.Entry entry; Object key, val; StringBuffer sb=new StringBuffer(); sb.append("sent:\n"); for(Iterator it=sent.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); key=entry.getKey(); if(key == null) key="<mcast dest>"; val=entry.getValue(); sb.append(key).append(": ").append(val).append("\n"); } sb.append("\nreceived:\n"); for(Iterator it=received.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); key=entry.getKey(); val=entry.getValue(); sb.append(key).append(": ").append(val).append("\n"); } sb.append("\nXMIT_REQS sent:\n"); XmitRequest tmp; for(Enumeration en=send_history.elements(); en.hasMoreElements();) { tmp=(XmitRequest)en.nextElement(); sb.append(tmp).append("\n"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -