📄 tcp.java.txt
字号:
// $Id: TCP.java.txt,v 1.1 2005/06/27 08:48:12 belaban Exp $package org.jgroups.protocols;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.blocks.ConnectionTable;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.BoundedList;import org.jgroups.util.Util;import java.net.InetAddress;import java.net.SocketException;import java.net.UnknownHostException;import java.util.HashMap;import java.util.Properties;import java.util.Vector;/** * TCP based protocol. Creates a server socket, which gives us the local address of this group member. For * each accept() on the server socket, a new thread is created that listens on the socket. * For each outgoing message m, if m.dest is in the ougoing hashtable, the associated socket will be reused * to send message, otherwise a new socket is created and put in the hashtable. * When a socket connection breaks or a member is removed from the group, the corresponding items in the * incoming and outgoing hashtables will be removed as well.<br> * This functionality is in ConnectionTable, which isT used by TCP. TCP sends messages using ct.send() and * registers with the connection table to receive all incoming messages. * @author Bela Ban */public class TCP extends Protocol implements ConnectionTable.Receiver { private ConnectionTable ct=null; protected Address local_addr=null; private String group_addr=null; private InetAddress bind_addr=null; // local IP address to bind srv sock to (m-homed systems) private InetAddress external_addr=null; // the IP address which is broadcast to other group members private int start_port=7800; // find first available port starting at this port private int end_port=0; // maximum port to bind to private final Vector members=new Vector(11); private long reaper_interval=0; // time in msecs between connection reaps private long conn_expire_time=0; // max time a conn can be idle before being reaped boolean loopback=false; // loops back msgs to self if true /** If set it will be added to <tt>local_addr</tt>. Used to implement * for example transport independent addresses */ byte[] additional_data=null; /** List the maintains the currently suspected members. This is used so we don't send too many SUSPECT * events up the stack (one per message !) */ final BoundedList suspected_mbrs=new BoundedList(20); /** Should we drop unicast messages to suspected members or not */ boolean skip_suspected_members=true; int recv_buf_size=150000; int send_buf_size=150000; int sock_conn_timeout=2000; // max time in millis for a socket creation in ConnectionTable static final String name="TCP"; static final String IGNORE_BIND_ADDRESS_PROPERTY="ignore.bind.address"; int num_msgs_sent=0, num_msgs_received=0; public TCP() { } public String toString() { return "Protocol TCP(local address: " + local_addr + ')'; } public String getName() { return "TCP"; } public long getNumMessagesSent() {return num_msgs_sent;} public long getNumMessagesReceived() {return num_msgs_received;} public int getOpenConnections() {return ct.getNumConnections();} public InetAddress getBindAddr() {return bind_addr;} public void setBindAddr(InetAddress bind_addr) {this.bind_addr=bind_addr;} public int getStartPort() {return start_port;} public void setStartPort(int start_port) {this.start_port=start_port;} public int getEndPort() {return end_port;} public void setEndPort(int end_port) {this.end_port=end_port;} public long getReaperInterval() {return reaper_interval;} public void setReaperInterval(long reaper_interval) {this.reaper_interval=reaper_interval;} public long getConnExpireTime() {return conn_expire_time;} public void setConnExpireTime(long conn_expire_time) {this.conn_expire_time=conn_expire_time;} public boolean isLoopback() {return loopback;} public void setLoopback(boolean loopback) {this.loopback=loopback;} public String printConnections() {return ct.toString();} public void resetStats() { super.resetStats(); num_msgs_sent=num_msgs_received=0; } protected final Vector getMembers() { return members; } /** DON'T REMOVE ! This prevents the up-handler thread to be created, which essentially is superfluous: messages are received from the network rather than from a layer below. */ public void startUpHandler() { ; } public void start() throws Exception { ct=getConnectionTable(reaper_interval,conn_expire_time,bind_addr,external_addr,start_port,end_port); // ct.addConnectionListener(this); ct.setReceiveBufferSize(recv_buf_size); ct.setSendBufferSize(send_buf_size); ct.setSocketConnectionTimeout(sock_conn_timeout); local_addr=ct.getLocalAddress(); if(additional_data != null && local_addr instanceof IpAddress) ((IpAddress)local_addr).setAdditionalData(additional_data); passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); } /** * @param reaperInterval * @param connExpireTime * @param bindAddress * @param startPort * @throws Exception * @return ConnectionTable * Sub classes overrides this method to initialize a different version of * ConnectionTable. */ protected ConnectionTable getConnectionTable(long reaperInterval, long connExpireTime, InetAddress bindAddress, InetAddress externalAddress, int startPort, int endPort) throws Exception { ConnectionTable cTable=null; if(reaperInterval == 0 && connExpireTime == 0) { cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort); } else { if(reaperInterval == 0) { reaperInterval=5000; if(log.isWarnEnabled()) log.warn("reaper_interval was 0, set it to " + reaperInterval); } if(connExpireTime == 0) { connExpireTime=1000 * 60 * 5; if(log.isWarnEnabled()) log.warn("conn_expire_time was 0, set it to " + connExpireTime); } cTable=new ConnectionTable(this, bindAddress, externalAddress, startPort, endPort, reaperInterval, connExpireTime); } return cTable; } public void stop() { ct.stop(); } /** Sent to destination(s) using the ConnectionTable class. */ public void down(Event evt) { Message msg; Object dest_addr; if(evt.getType() != Event.MSG) { handleDownEvent(evt); return; } msg=(Message)evt.getArg(); num_msgs_sent++; if(group_addr != null) { // added patch sent by Roland Kurmann (bela March 20 2003) /* Add header (includes channel name) */ msg.putHeader(name, new TcpHeader(group_addr)); } dest_addr=msg.getDest(); /* Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver). This way, we still have performance numbers for TCP */ if(observer != null) observer.passDown(evt); if(dest_addr == null) { // broadcast (to all members) if(group_addr == null) { if(log.isWarnEnabled()) log.warn("dest address of message is null, and " + "sending to default address fails as group_addr is null, too !" + " Discarding message."); return; } else { sendMulticastMessage(msg); // send to current membership } } else { sendUnicastMessage(msg); // send to a single member } } /** ConnectionTable.Receiver interface */ public void receive(Message msg) { TcpHeader hdr=null; Event evt=new Event(Event.MSG, msg); /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer. This allows e.g. PerfObserver to get the time of reception of a message */ if(observer != null) observer.up(evt, up_queue.size()); if(log.isTraceEnabled()) log.trace("received msg " + msg); num_msgs_received++; hdr=(TcpHeader)msg.removeHeader(name); if(hdr != null) { /* Discard all messages destined for a channel with a different name */ String ch_name=null; if(hdr.group_addr != null) ch_name=hdr.group_addr; // below lines were commented as patch sent by Roland Kurmann (bela March 20 2003)// if(group_addr == null) {// if(log.isWarnEnabled()) log.warn("TCP.receive()", "group address in header was null, discarded");// return;// } // Discard if message's group name is not the same as our group name unless the // message is a diagnosis message (special group name DIAG_GROUP) if(ch_name != null && !group_addr.equals(ch_name) && !ch_name.equals(Util.DIAG_GROUP)) { if(log.isWarnEnabled()) log.warn("discarded message from different group (" + ch_name + "). Sender was " + msg.getSrc());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -