⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp.java.txt

📁 JGRoups源码
💻 TXT
📖 第 1 页 / 共 2 页
字号:
// $Id: TCP.java.txt,v 1.1 2005/06/27 08:48:12 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.blocks.ConnectionTable;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Util;import java.net.InetAddress;import java.net.SocketException;import java.net.UnknownHostException;import java.util.HashMap;import java.util.Properties;import java.util.Vector;/** * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For * each accept() on the server socket, a new thread is created that listens on the socket. * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused * to send message, otherwise a new socket is created and put in the hashtable. * When a socket connection breaks or a member is removed from the group, the corresponding items in the * incoming and outgoing hashtables will be removed as well.<br> * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and * registers with the connection table to receive all incoming messages. * @author Bela Ban */public class TCP extends Protocol implements ConnectionTable.Receiver {    private ConnectionTable ct=null;    protected Address       local_addr=null;    private String          group_addr=null;    private InetAddress     bind_addr=null;  // local IP address to bind srv sock to (m-homed systems)    private InetAddress	    external_addr=null; // the IP address which is broadcast to other group members    private int             start_port=7800; // find first available port starting at this port    private int	            end_port=0;      // maximum port to bind to    private final Vector    members=new Vector(11);    private long            reaper_interval=0;  // time in msecs between connection reaps    private long            conn_expire_time=0; // max time a conn can be idle before being reaped    boolean                 loopback=false;     // loops back msgs to self if true    /** If set it will be added to <tt>local_addr</tt>. Used to implement     * for example transport independent addresses */    byte[]                 additional_data=null;    /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT     * events up the stack (one per message !)     */    final BoundedList      suspected_mbrs=new BoundedList(20);    /** Should we drop unicast messages to suspected members or not */    boolean                skip_suspected_members=true;    int                    recv_buf_size=150000;    int                    send_buf_size=150000;    int                    sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable    static final String    name="TCP";    static final String    IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address";    int num_msgs_sent=0, num_msgs_received=0;    public TCP() {    }    public String toString() {        return "Protocol TCP(local address: " + local_addr + ')';    }    public String getName() {        return "TCP";    }    public long getNumMessagesSent()     {return num_msgs_sent;}    public long getNumMessagesReceived() {return num_msgs_received;}    public int getOpenConnections()      {return ct.getNumConnections();}    public InetAddress getBindAddr() {return bind_addr;}    public void setBindAddr(InetAddress bind_addr) {this.bind_addr=bind_addr;}    public int getStartPort() {return start_port;}    public void setStartPort(int start_port) {this.start_port=start_port;}    public int getEndPort() {return end_port;}    public void setEndPort(int end_port) {this.end_port=end_port;}    public long getReaperInterval() {return reaper_interval;}    public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;}    public long getConnExpireTime() {return conn_expire_time;}    public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;}    public boolean isLoopback() {return loopback;}    public void setLoopback(boolean loopback) {this.loopback=loopback;}    public String printConnections()     {return ct.toString();}    public void resetStats() {        super.resetStats();        num_msgs_sent=num_msgs_received=0;    }    protected final Vector getMembers() {        return members;    }    /**     DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous:     messages are received from the network rather than from a layer below.     */    public void startUpHandler() {        ;    }    public void start() throws Exception {        ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port);        // ct.addConnectionListener(this);        ct.setReceiveBufferSize(recv_buf_size);        ct.setSendBufferSize(send_buf_size);        ct.setSocketConnectionTimeout(sock_conn_timeout);        local_addr=ct.getLocalAddress();        if(additional_data != null && local_addr instanceof IpAddress)            ((IpAddress)local_addr).setAdditionalData(additional_data);        passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));    }   /**    * @param reaperInterval    * @param connExpireTime    * @param bindAddress    * @param startPort    * @throws Exception    * @return ConnectionTable    * Sub classes overrides this method to initialize a different version of    * ConnectionTable.    */   protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress,                                                InetAddress externalAddress, int startPort, int endPort) throws Exception {       ConnectionTable cTable=null;       if(reaperInterval == 0 && connExpireTime == 0) {           cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort);       }       else {           if(reaperInterval == 0) {               reaperInterval=5000;               if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + reaperInterval);           }           if(connExpireTime == 0) {               connExpireTime=1000 * 60 * 5;               if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + connExpireTime);           }           cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort,                                       reaperInterval, connExpireTime);       }       return cTable;   }    public void stop() {        ct.stop();    }    /**     Sent to destination(s) using the ConnectionTable class.     */    public void down(Event evt) {        Message msg;        Object dest_addr;        if(evt.getType() != Event.MSG) {            handleDownEvent(evt);            return;        }        msg=(Message)evt.getArg();        num_msgs_sent++;        if(group_addr != null) { // added patch sent by Roland Kurmann (bela March 20 2003)            /* Add header (includes channel name) */            msg.putHeader(name, new TcpHeader(group_addr));        }        dest_addr=msg.getDest();        /* Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver). This way,           we still have performance numbers for TCP */        if(observer != null)            observer.passDown(evt);        if(dest_addr == null) {  // broadcast (to all members)            if(group_addr == null) {                if(log.isWarnEnabled()) log.warn("dest address of message is null, and " +                                         "sending to default address fails as group_addr is null, too !" +                                         " Discarding message.");                return;            }            else {                sendMulticastMessage(msg);  // send to current membership            }        }        else {            sendUnicastMessage(msg);        // send to a single member        }    }    /** ConnectionTable.Receiver interface */    public void receive(Message msg) {        TcpHeader hdr=null;        Event     evt=new Event(Event.MSG, msg);        /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.           This allows e.g. PerfObserver to get the time of reception of a message */        if(observer != null)            observer.up(evt, up_queue.size());        if(log.isTraceEnabled()) log.trace("received msg " + msg);        num_msgs_received++;        hdr=(TcpHeader)msg.removeHeader(name);        if(hdr != null) {            /* Discard all messages destined for a channel with a different name */            String ch_name=null;            if(hdr.group_addr != null)                ch_name=hdr.group_addr;            // below lines were commented as patch sent by Roland Kurmann (bela March 20 2003)//             if(group_addr == null) {//                 if(log.isWarnEnabled()) log.warn("TCP.receive()", "group address in header was null, discarded");//              return;//             }            // Discard if message's group name is not the same as our group name unless the            // message is a diagnosis message (special group name DIAG_GROUP)            if(ch_name != null && !group_addr.equals(ch_name) &&                    !ch_name.equals(Util.DIAG_GROUP)) {                if(log.isWarnEnabled()) log.warn("discarded message from different group (" +                                            ch_name + "). Sender was " + msg.getSrc());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -