link.java

来自「JGRoups源码」· Java 代码 · 共 655 行 · 第 1/2 页

JAVA
655
字号
// $Id: Link.java,v 1.7 2005/05/30 16:14:33 belaban Exp $package org.jgroups.blocks;import org.jgroups.util.TimedWriter;import org.jgroups.util.Util;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.*;import java.net.InetAddress;import java.net.ServerSocket;import java.net.Socket;/** * Implements a physical link between 2 parties (point-to-point connection). For incoming traffic, * a server socket is created (bound to a given local address and port). The receiver thread does the * following: it accepts a new connection from the server socket and (on the same thread) reads messages * until the connection breaks. Then it goes back to accept(). This is done in 2 nested while-loops. * The outgoing connection is established when started. If this fails, the link is marked as not established. * This means that there is not outgoing socket.<br> * A heartbeat will be exchanged between the 2 peers periodically as long as the connection is established * (outgoing socket is okay). When the connection breaks, the heartbeat will stop and a connection establisher * thread will be started. It periodically tries to re-establish connection to the peer. When this happens * it will stop and the heartbeat thread will resume.<br> * For details see Link.txt * @author  Bela Ban, June 2000 */public class Link implements Runnable {    String                 local_addr=null, remote_addr=null;    InetAddress            local=null, remote=null;    int                    local_port=0, remote_port=0;    ServerSocket           srv_sock=null;    Socket                 outgoing=null;          // traffic to peer    Socket                 incoming=null;          // traffic from peer    DataOutputStream       outstream=null;    DataInputStream        instream=null;    boolean                established=false;  // (incoming and outgoing) connections to peer are up and running    boolean                stop=false;    boolean                trace=false;    Thread                 receiver_thread=null;    final long             receiver_thread_join_timeout=2000;    Receiver               receiver=null;    static final int       HB_PACKET=-99;    Heartbeat              hb=null;    long                   timeout=10000;  // if no heartbeat was received for timeout msecs, assume peer is dead    long                   hb_interval=3000;        // send a heartbeat every n msecs    final Object           outgoing_mutex=new Object();  // sync on creation and closing of outgoing socket    TimedWriter            writer=null;    Log                    log=LogFactory.getLog(getClass());    public interface Receiver {	void receive(byte[] msg);	void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port);	void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port);	void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs);	void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port);    }        public Link(String local_addr, int local_port, String remote_addr, int remote_port) {	this.local_addr=local_addr;   this.local_port=local_port;	this.remote_addr=remote_addr; this.remote_port=remote_port;	hb=new Heartbeat(timeout, hb_interval);    }    public Link(String local_addr, int local_port, String remote_addr, int remote_port, Receiver r) {	this(local_addr, local_port, remote_addr, remote_port);	setReceiver(r);    }    public Link(String local_addr, int local_port, String remote_addr, int remote_port, 		long timeout, long hb_interval, Receiver r) {	this.local_addr=local_addr;   this.local_port=local_port;	this.remote_addr=remote_addr; this.remote_port=remote_port;	this.timeout=timeout; this.hb_interval=hb_interval;	hb=new Heartbeat(timeout, hb_interval);	setReceiver(r);    }    public void          setTrace(boolean t)     {trace=t;}    public void          setReceiver(Receiver r) {receiver=r;}    public boolean       established()           {return established;}    public InetAddress   getLocalAddress()       {return local;}    public InetAddress   getRemoteAddress()      {return remote;}    public int           getLocalPort()          {return local_port;}    public int           getRemotePort()         {return remote_port;}    public void start() throws Exception {	local=InetAddress.getByName(local_addr);	remote=InetAddress.getByName(remote_addr);	srv_sock=new ServerSocket(local_port, 1, local);	createOutgoingConnection(hb_interval);   // connection to peer established, sets established=true	startReceiverThread();                   // start reading from incoming socket	hb.start();                              // starts heartbeat (conn establisher is not yet started)    }    public void stop() {	stopReceiverThread();	hb.stop();	try {srv_sock.close();}	catch(Exception e) {}	established=false;    }    /** Tries to send buffer across out socket. Tries to establish connection if not yet connected. */    public boolean send(byte[] buf) {	if(buf == null || buf.length == 0) {	    if(trace) System.err.println("Link.send(): buffer is null or does not contain any data !");	    return false;	}	if(!established) { // will be set by ConnectionEstablisher when connection has been set up	    if(trace) log.error("Link.send(): connection not established, discarding message");	    return false;	}	try {	    outstream.writeInt(buf.length);  // synchronized anyway	    outstream.write(buf);            // synchronized anyway, we don't need to sync on outstream	    return true;	}	catch(Exception ex) {  // either IOException or EOFException (subclass if IOException)	    if(trace) log.error("Link.send1(): sending failed; retrying");	    return retry(buf);	}    }    boolean retry(byte[] buf) {	closeOutgoingConnection();	        // there something wrong, close connection	if(!createOutgoingConnection()) {       // ... and re-open. if this fails,	    closeOutgoingConnection();          // just abort and return failure to caller	    return false;	}	else {	    try {		outstream.writeInt(buf.length);		outstream.write(buf);		return true;	    }	    catch(Exception e) {		if(trace) System.out.println("Link.send2(): failed, closing connection");		closeOutgoingConnection();		return false;	    }	}    }    /** Receiver thread main loop. Accept a connection and then read on it until the connection	breaks. Only then is the next connection handled. The reason is that there is only supposed	to be 1 connection to this server socket at the same time.     */    public void run() {	int              num_bytes;	byte[]           buf;	InetAddress      peer=null;	int              peer_port=0;	while(!stop) {	    try {		if(trace) System.out.println("-- WAITING for ACCEPT");		incoming=srv_sock.accept();		instream=new DataInputStream(incoming.getInputStream());		peer=incoming.getInetAddress();		peer_port=incoming.getPort();		if(trace) System.out.println("-- ACCEPT: incoming is " + printSocket(incoming));				/** This piece of code would only accept connections from the peer address defined above. */		if(remote.equals(incoming.getInetAddress())) {		    if(trace)			System.out.println("Link.run(): accepted connection from " + peer + ':' + peer_port);		}		else {		    if(trace) 			log.error("Link.run(): rejected connection request from " +					   peer + ':' + peer_port + ". Address not specified as peer in link !");		    closeIncomingConnection();  // only close incoming connection		    continue;		}				// now try to create outgoing connection		if(!established) {		    createOutgoingConnection();		}		while(!stop) {		    try {			num_bytes=instream.readInt();			if(num_bytes == HB_PACKET) {			    hb.receivedHeartbeat();			    continue;			}			buf=new byte[num_bytes];			instream.readFully(buf, 0, buf.length);			hb.receivedMessage();  // equivalent to heartbeat response (HB_PACKET)			if(receiver != null)			    receiver.receive(buf);		    }		    catch(Exception ex) {  // IOException, EOFException, SocketException			closeIncomingConnection(); // close incoming when read() fails			break;		    }		}	    }	    catch(IOException io_ex) {		receiver_thread=null;		break;	    }	    catch(Exception e) {	    }	}    }    public String toString() {	StringBuffer ret=new StringBuffer();	ret.append("Link <" + local_addr + ':' + local_port + " --> " +		   remote_addr + ':' + remote_port + '>');	ret.append(established? " (established)" : " (not established)");	return ret.toString();    }    public boolean equals(Object other) {	Link o;	if(other == null)	    return false;	if(!(other instanceof Link))	    return false;	o=(Link)other;	if(local_addr.equals(o.local_addr) && remote_addr.equals(o.remote_addr) &&	   local_port == o.local_port && remote_port == o.remote_port)	    return true;	else	    return false;    }    public int hashCode() {	return local_addr.hashCode() + remote_addr.hashCode() + local_port + remote_port;    }    void startReceiverThread() {	stopReceiverThread();	receiver_thread=new Thread(this, "Link.ReceiverThreadThread");        receiver_thread.setDaemon(true);	receiver_thread.start();    }    void stopReceiverThread() {	if(receiver_thread != null && receiver_thread.isAlive()) {	    stop=true;	    closeIncomingConnection();	    try {receiver_thread.join(receiver_thread_join_timeout);} catch(Exception e) {}	    stop=false;	}	receiver_thread=null;    }    /** Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>	stop the connection establisher ! The reason is that this method is going to be called by the	connection establisher as well, therefore it would kill itself ! */    boolean createOutgoingConnection() {	synchronized(outgoing_mutex) {  // serialize access with ConnectionEstablisher	    if(established) {		return true;	    }	    try {		// create a socket to remote:remote_port, bind to local address (choose any local port);		outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port		outgoing.setSoLinger(true, 1);  // 1 second  // +++ ? needed ? it is off by default !		outstream=new DataOutputStream(outgoing.getOutputStream());		if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);		established=true;		if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));		return true;	    }	    catch(Exception e) {		established=false;		return false;	    }	}    }        /** 	Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>	stop the connection establisher ! The reason is that this method is going to be called by the	connection establisher as well, therefore it would kill itself !    */    boolean createOutgoingConnection(long timeout) {	synchronized(outgoing_mutex) {  // serialize access with ConnectionEstablisher

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?