unicast.java
来自「JGRoups源码」· Java 代码 · 共 600 行 · 第 1/2 页
JAVA
600 行
// $Id: UNICAST.java,v 1.63 2006/09/11 13:12:19 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.AckReceiverWindow;import org.jgroups.stack.AckSenderWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Streamable;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a * message from a peer for the first time, another entry will be created and added to the hashtable * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno * received so far, and keeps messages in order.<p> * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow * whenever a message is received: the new message is added and then we try to remove as many messages as * possible (until we stop at a gap, or there are no more messages). * @author Bela Ban */public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand { private final Vector members=new Vector(11); private final HashMap connections=new HashMap(11); // Object (sender or receiver) -- Entries private long[] timeout={400,800,1600,3200}; // for AckSenderWindow: max time to wait for missing acks private Address local_addr=null; private TimeScheduler timer=null; // used for retransmissions (passed to AckSenderWindow) // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false // default is true private boolean use_gms=true; private boolean started=false; /** A list of members who left, used to determine when to prevent sending messages to left mbrs */ private final BoundedList previous_members=new BoundedList(50); private final static String name="UNICAST"; private static final long DEFAULT_FIRST_SEQNO=1; private long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0; private long num_acks_sent=0, num_acks_received=0, num_xmit_requests_received=0; /** All protocol names have to be unique ! */ public String getName() {return name;} public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} public String getMembers() {return members != null? members.toString() : "[]";} public String printConnections() { StringBuffer sb=new StringBuffer(); Map.Entry entry; for(Iterator it=connections.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); } return sb.toString(); } public long getNumMessagesSent() { return num_msgs_sent; } public long getNumMessagesReceived() { return num_msgs_received; } public long getNumBytesSent() { return num_bytes_sent; } public long getNumBytesReceived() { return num_bytes_received; } public long getNumAcksSent() { return num_acks_sent; } public long getNumAcksReceived() { return num_acks_received; } public long getNumberOfRetransmitRequestsReceived() { return num_xmit_requests_received; } /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */ public int getNumberOfUnackedMessages() { int num=0; Entry entry; synchronized(connections) { for(Iterator it=connections.values().iterator(); it.hasNext();) { entry=(Entry)it.next(); if(entry.sent_msgs != null) num+=entry.sent_msgs.size(); } } return num; } public void resetStats() { num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_acks_sent=num_acks_received=0; num_xmit_requests_received=0; } public Map dumpStats() { Map m=new HashMap(); m.put("num_msgs_sent", new Long(num_msgs_sent)); m.put("num_msgs_received", new Long(num_msgs_received)); m.put("num_bytes_sent", new Long(num_bytes_sent)); m.put("num_bytes_received", new Long(num_bytes_received)); m.put("num_acks_sent", new Long(num_acks_sent)); m.put("num_acks_received", new Long(num_acks_received)); m.put("num_xmit_requests_received", new Long(num_xmit_requests_received)); return m; } public boolean setProperties(Properties props) { String str; long[] tmp; super.setProperties(props); str=props.getProperty("timeout"); if(str != null) { tmp=Util.parseCommaDelimitedLongs(str); if(tmp != null && tmp.length > 0) timeout=tmp; props.remove("timeout"); } str=props.getProperty("window_size"); if(str != null) { props.remove("window_size"); log.warn("window_size is deprecated and will be ignored"); } str=props.getProperty("min_threshold"); if(str != null) { props.remove("min_threshold"); log.warn("min_threshold is deprecated and will be ignored"); } str=props.getProperty("use_gms"); if(str != null) { use_gms=Boolean.valueOf(str).booleanValue(); props.remove("use_gms"); } if(props.size() > 0) { log.error("these properties are not recognized: " + props); return false; } return true; } public void start() throws Exception { timer=stack != null ? stack.timer : null; if(timer == null) throw new Exception("timer is null"); started=true; } public void stop() { started=false; removeAllConnections(); } public void up(Event evt) { Message msg; Address dst, src; UnicastHeader hdr; switch(evt.getType()) { case Event.MSG: msg=(Message)evt.getArg(); dst=msg.getDest(); if(dst == null || dst.isMulticastAddress()) // only handle unicast messages break; // pass up // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the // transport level, we will not have the header on retransmit ! (bela Aug 22 2006) hdr=(UnicastHeader)msg.getHeader(name); if(hdr == null) break; src=msg.getSrc(); switch(hdr.type) { case UnicastHeader.DATA: // received regular message if(handleDataReceived(src, hdr.seqno, msg)) sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006) return; // we pass the deliverable message up in handleDataReceived() case UnicastHeader.ACK: // received ACK for previously sent message handleAckReceived(src, hdr.seqno); break; default: log.error("UnicastHeader type " + hdr.type + " not known !"); break; } return; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; } passUp(evt); // Pass up to the layer above us } public void down(Event evt) { switch (evt.getType()) { case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down Message msg=(Message) evt.getArg(); Object dst=msg.getDest(); /* only handle unicast messages */ if (dst == null || ((Address) dst).isMulticastAddress()) { break; } if(previous_members.contains(dst)) { if(trace) log.trace("discarding message to " + dst + " as this member left the group," + " previous_members=" + previous_members); return; } if(!started) { if(warn) log.warn("discarded message as start() has not yet been called, message: " + msg); return; } Entry entry; synchronized(connections) { entry=(Entry)connections.get(dst); if(entry == null) { entry=new Entry(); connections.put(dst, entry); if(trace) log.trace(local_addr + ": created new connection for dst " + dst); } } Message tmp; synchronized(entry) { // threads will only sync if they access the same entry long seqno=-2; try { seqno=entry.sent_msgs_seqno; UnicastHeader hdr=new UnicastHeader(UnicastHeader.DATA, seqno); if(entry.sent_msgs == null) { // first msg to peer 'dst' entry.sent_msgs=new AckSenderWindow(this, timeout, timer, this.local_addr); // use the protocol stack's timer } msg.putHeader(name, hdr); if(trace) log.trace(new StringBuffer().append(local_addr).append(" --> DATA(").append(dst).append(": #"). append(seqno)); tmp=Global.copy? msg.copy() : msg; entry.sent_msgs.add(seqno, tmp); // add *including* UnicastHeader, adds to retransmitter entry.sent_msgs_seqno++; } catch(Throwable t) { entry.sent_msgs.ack(seqno); // remove seqno again, so it is not transmitted if(t instanceof Error) throw (Error)t; if(t instanceof RuntimeException) throw (RuntimeException)t; else { throw new RuntimeException("failure adding msg " + msg + " to the retransmit table", t); } } } // moved passing down of message out of the synchronized block: similar to NAKACK, we do *not* need // to send unicast messages in order of sequence numbers because they will be sorted into the correct // order at the receiver anyway. Of course, most of the time, the order will be correct (FIFO), so // the cost of reordering is minimal. This is part of http://jira.jboss.com/jira/browse/JGRP-303 try { passDown(new Event(Event.MSG, tmp)); num_msgs_sent++; num_bytes_sent+=msg.getLength();
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?