unicast.java

来自「JGRoups源码」· Java 代码 · 共 600 行 · 第 1/2 页

JAVA
600
字号
// $Id: UNICAST.java,v 1.63 2006/09/11 13:12:19 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.AckReceiverWindow;import org.jgroups.stack.AckSenderWindow;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Streamable;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.*;import java.util.*;/** * Reliable unicast layer. Uses acknowledgement scheme similar to TCP to provide lossless transmission * of unicast messages (for reliable multicast see NAKACK layer). When a message is sent to a peer for * the first time, we add the pair <peer_addr, Entry> to the hashtable (peer address is the key). All * messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a * message from a peer for the first time, another entry will be created and added to the hashtable * (unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs.<p> This * layer is used to reliably transmit point-to-point messages, that is, either messages sent to a * single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The  * sender uses an <code>AckSenderWindow</code> which retransmits messages for which it hasn't received * an ACK, the receiver uses <code>AckReceiverWindow</code> which keeps track of the lowest seqno * received so far, and keeps messages in order.<p> * Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from * AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow * whenever a message is received: the new message is added and then we try to remove as many messages as * possible (until we stop at a gap, or there are no more messages). * @author Bela Ban */public class UNICAST extends Protocol implements AckSenderWindow.RetransmitCommand {    private final Vector     members=new Vector(11);    private final HashMap    connections=new HashMap(11);   // Object (sender or receiver) -- Entries    private long[]           timeout={400,800,1600,3200};  // for AckSenderWindow: max time to wait for missing acks    private Address          local_addr=null;    private TimeScheduler    timer=null;                    // used for retransmissions (passed to AckSenderWindow)    // if UNICAST is used without GMS, don't consult the membership on retransmit() if use_gms=false    // default is true    private boolean          use_gms=true;    private boolean          started=false;    /** A list of members who left, used to determine when to prevent sending messages to left mbrs */    private final BoundedList previous_members=new BoundedList(50);    private final static String name="UNICAST";    private static final long DEFAULT_FIRST_SEQNO=1;    private long num_msgs_sent=0, num_msgs_received=0, num_bytes_sent=0, num_bytes_received=0;    private long num_acks_sent=0, num_acks_received=0, num_xmit_requests_received=0;    /** All protocol names have to be unique ! */    public String  getName() {return name;}    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}    public String getMembers() {return members != null? members.toString() : "[]";}    public String printConnections() {        StringBuffer sb=new StringBuffer();        Map.Entry entry;        for(Iterator it=connections.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");        }        return sb.toString();    }    public long getNumMessagesSent() {        return num_msgs_sent;    }    public long getNumMessagesReceived() {        return num_msgs_received;    }    public long getNumBytesSent() {        return num_bytes_sent;    }    public long getNumBytesReceived() {        return num_bytes_received;    }    public long getNumAcksSent() {        return num_acks_sent;    }    public long getNumAcksReceived() {        return num_acks_received;    }    public long getNumberOfRetransmitRequestsReceived() {        return num_xmit_requests_received;    }    /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */    public int getNumberOfUnackedMessages() {        int num=0;        Entry entry;        synchronized(connections) {            for(Iterator it=connections.values().iterator(); it.hasNext();) {                entry=(Entry)it.next();                if(entry.sent_msgs != null)                num+=entry.sent_msgs.size();            }        }        return num;    }    public void resetStats() {        num_msgs_sent=num_msgs_received=num_bytes_sent=num_bytes_received=num_acks_sent=num_acks_received=0;        num_xmit_requests_received=0;    }    public Map dumpStats() {        Map m=new HashMap();        m.put("num_msgs_sent", new Long(num_msgs_sent));        m.put("num_msgs_received", new Long(num_msgs_received));        m.put("num_bytes_sent", new Long(num_bytes_sent));        m.put("num_bytes_received", new Long(num_bytes_received));        m.put("num_acks_sent", new Long(num_acks_sent));        m.put("num_acks_received", new Long(num_acks_received));        m.put("num_xmit_requests_received", new Long(num_xmit_requests_received));        return m;    }    public boolean setProperties(Properties props) {        String     str;        long[]     tmp;        super.setProperties(props);        str=props.getProperty("timeout");        if(str != null) {        tmp=Util.parseCommaDelimitedLongs(str);        if(tmp != null && tmp.length > 0)        timeout=tmp;            props.remove("timeout");        }        str=props.getProperty("window_size");        if(str != null) {            props.remove("window_size");            log.warn("window_size is deprecated and will be ignored");        }        str=props.getProperty("min_threshold");        if(str != null) {            props.remove("min_threshold");            log.warn("min_threshold is deprecated and will be ignored");        }        str=props.getProperty("use_gms");        if(str != null) {            use_gms=Boolean.valueOf(str).booleanValue();            props.remove("use_gms");        }        if(props.size() > 0) {            log.error("these properties are not recognized: " + props);            return false;        }        return true;    }    public void start() throws Exception {        timer=stack != null ? stack.timer : null;        if(timer == null)            throw new Exception("timer is null");        started=true;    }    public void stop() {        started=false;        removeAllConnections();    }    public void up(Event evt) {        Message        msg;        Address        dst, src;        UnicastHeader  hdr;        switch(evt.getType()) {        case Event.MSG:            msg=(Message)evt.getArg();            dst=msg.getDest();            if(dst == null || dst.isMulticastAddress())  // only handle unicast messages                break;  // pass up            // changed from removeHeader(): we cannot remove the header because if we do loopback=true at the            // transport level, we will not have the header on retransmit ! (bela Aug 22 2006)            hdr=(UnicastHeader)msg.getHeader(name);            if(hdr == null)                break;            src=msg.getSrc();            switch(hdr.type) {            case UnicastHeader.DATA:      // received regular message                if(handleDataReceived(src, hdr.seqno, msg))                    sendAck(src, hdr.seqno); // only send an ACK if added to the received_msgs table (bela Aug 2006)                return; // we pass the deliverable message up in handleDataReceived()            case UnicastHeader.ACK:  // received ACK for previously sent message                handleAckReceived(src, hdr.seqno);                break;            default:                log.error("UnicastHeader type " + hdr.type + " not known !");                break;            }            return;        case Event.SET_LOCAL_ADDRESS:            local_addr=(Address)evt.getArg();            break;        }        passUp(evt);   // Pass up to the layer above us    }    public void down(Event evt) {        switch (evt.getType()) {            case Event.MSG: // Add UnicastHeader, add to AckSenderWindow and pass down                Message msg=(Message) evt.getArg();                Object  dst=msg.getDest();                /* only handle unicast messages */                if (dst == null || ((Address) dst).isMulticastAddress()) {                    break;                }                if(previous_members.contains(dst)) {                    if(trace)                        log.trace("discarding message to " + dst + " as this member left the group," +                                " previous_members=" + previous_members);                    return;                }                if(!started) {                    if(warn)                        log.warn("discarded message as start() has not yet been called, message: " + msg);                    return;                }                Entry entry;                synchronized(connections) {                    entry=(Entry)connections.get(dst);                    if(entry == null) {                        entry=new Entry();                        connections.put(dst, entry);                        if(trace)                            log.trace(local_addr + ": created new connection for dst " + dst);                    }                }                Message tmp;                synchronized(entry) { // threads will only sync if they access the same entry                    long seqno=-2;                    try {                        seqno=entry.sent_msgs_seqno;                        UnicastHeader hdr=new UnicastHeader(UnicastHeader.DATA, seqno);                        if(entry.sent_msgs == null) { // first msg to peer 'dst'                            entry.sent_msgs=new AckSenderWindow(this, timeout, timer, this.local_addr); // use the protocol stack's timer                        }                        msg.putHeader(name, hdr);                        if(trace)                            log.trace(new StringBuffer().append(local_addr).append(" --> DATA(").append(dst).append(": #").                                    append(seqno));                        tmp=Global.copy? msg.copy() : msg;                        entry.sent_msgs.add(seqno, tmp);  // add *including* UnicastHeader, adds to retransmitter                        entry.sent_msgs_seqno++;                    }                    catch(Throwable t) {                        entry.sent_msgs.ack(seqno); // remove seqno again, so it is not transmitted                        if(t instanceof Error)                            throw (Error)t;                        if(t instanceof RuntimeException)                            throw (RuntimeException)t;                        else {                            throw new RuntimeException("failure adding msg " + msg + " to the retransmit table", t);                        }                    }                }                // moved passing down of message out of the synchronized block: similar to NAKACK, we do *not* need                // to send unicast messages in order of sequence numbers because they will be sorted into the correct                // order at the receiver anyway. Of course, most of the time, the order will be correct (FIFO), so                // the cost of reordering is minimal. This is part of http://jira.jboss.com/jira/browse/JGRP-303                try {                    passDown(new Event(Event.MSG, tmp));                    num_msgs_sent++;                    num_bytes_sent+=msg.getLength();

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?