link.java

来自「JGRoups源码」· Java 代码 · 共 655 行 · 第 1/2 页

JAVA
655
字号
	    if(established) {		return true;	    }	    try {		if(writer == null) writer=new TimedWriter();		// create a socket to remote:remote_port, bind to local address (choose any local port);		// outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port		outgoing=writer.createSocket(local, remote, remote_port, timeout);		outgoing.setSoLinger(true, 1);  // 1 second  // +++ ? needed ? it is off by default !		outstream=new DataOutputStream(outgoing.getOutputStream());		if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);		established=true;		if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));		return true;	    }	    catch(Exception e) {		established=false;		return false;	    }	}    }    /** Closes the outgoing connection */    void closeOutgoingConnection() {	synchronized(outgoing_mutex) {	    if(!established) {		return;	    }	    if(outstream != null) {				if(trace) System.out.println("-- CLOSE: outgoing is " + printSocket(outgoing));				try {		    outstream.close(); // flush data before socket is closed		} 		catch(Exception e) {}		outstream=null;	    }	    if(outgoing != null) {		try {		    outgoing.close();		} 		catch(Exception e) {}		outgoing=null;	    }	    established=false;	    if(receiver != null) receiver.linkDown(local, local_port, remote, remote_port);	}    }        /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),         then it closes the outgoing *and* incoming socket. The latter needs to be done,        so that we can return to accept() and await a new client connection request. */    synchronized void closeIncomingConnection() {	if(instream != null) {	    	    if(trace) System.out.println("-- CLOSE: incoming is " + printSocket(incoming));	    	    try {instream.close();} catch(Exception e) {}	    instream=null;	}	if(incoming != null) {	    try {incoming.close();} catch(Exception e) {}	    incoming=null;	}    }    /** Close outgoing and incoming sockets. */    synchronized void closeConnections() {	// 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat	//    thread cannot be stopped in here, because this method is called by it !	    	closeOutgoingConnection();	// 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()), 	//    then it closes the outgoing *and* incoming socket. The latter needs to be done,	//    so that we can return to accept() and await a new client connection request.	closeIncomingConnection();    }    String printSocket(Socket s) {	if(s == null) return "<null>";	StringBuffer ret=new StringBuffer();	ret.append(s.getLocalAddress().getHostName());	ret.append(':');	ret.append(s.getLocalPort());	ret.append(" --> ");	ret.append(s.getInetAddress().getHostName());	ret.append(':');	ret.append(s.getPort());	return ret.toString();    }        /**       Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter       for both sending and responding to heartbeats. The reason is that a write() might hang if the       peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed,       ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way,       we don't hang sending timeouts.     */    class Heartbeat implements Runnable {	Thread       thread=null;	long         hb_timeout=10000;         // time to wait for heartbeats from peer, if not received -> boom !	long         interval=3000; // {send a heartbeat | try to create connection} every 3 secs	boolean      stop_hb=false;	long         last_hb=System.currentTimeMillis();	boolean      missed_hb=false;	final TimedWriter  timed_writer=new TimedWriter();	public Heartbeat(long timeout, long hb_interval) {	    this.hb_timeout=timeout;	    this.interval=hb_interval;	}	public synchronized void start() {	    stop();	    stop_hb=false;	    missed_hb=false;	    last_hb=System.currentTimeMillis();	    thread=new Thread(this, "HeartbeatThread");            thread.setDaemon(true);	    thread.start();	}	public synchronized void interrupt() {	    thread.interrupt();	}	public synchronized void stop() {	    if(thread != null && thread.isAlive()) {		stop_hb=true;		missed_hb=false;		thread.interrupt();		try {thread.join(hb_timeout+1000);} catch(Exception e) {}		thread=null;	    }	}			/**	   When we receive a message from the peer, this means the peer is alive. Therefore we	   update the time of the last heartbeat.	 */	public void receivedMessage() {	    last_hb=System.currentTimeMillis();	    if(missed_hb) {		if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);		missed_hb=false;	    }	}	/** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */	public void receivedHeartbeat() {	    last_hb=System.currentTimeMillis();	    if(missed_hb) {		if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);		missed_hb=false;	    }	}		/**	   Sends heartbeats when connection is established. Tries to establish connection when not established.	   Switches between 'established' and 'not established' roles.	 */	public void run() {	    long  diff=0, curr_time=0, num_missed_hbs=0;	    	    if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " started");	    while(!stop_hb) {		if(established) {  // send heartbeats		    // 1. Send heartbeat (use timed write)		    if(outstream != null) {			try {			    timed_writer.write(outstream, HB_PACKET, 1500);			    Thread.sleep(interval);			}			catch(Exception io_ex) {       // IOException and TimedWriter.Timeout			    closeOutgoingConnection(); // sets established to false			    continue;			}		    }		    else {			established=false;			continue;		    }			     		    // 2. If time of last HB received > timeout --> close connection		    curr_time=System.currentTimeMillis();		    diff=curr_time - last_hb;		    if(curr_time - last_hb > interval) {			num_missed_hbs=(curr_time - last_hb) / interval;			if(receiver != null)			    receiver.missedHeartbeat(local, local_port, remote, remote_port, (int)num_missed_hbs);			missed_hb=true;		    }		    if(diff >= hb_timeout) {			if(trace) System.out.println("###### Link.Heartbeat.run(): no heartbeat receveived for " + 						     diff + " msecs. Closing connections. #####");			closeConnections(); // close both incoming *and* outgoing connections		    }		}		else {    // try to establish connection		    synchronized(outgoing_mutex) { // serialize access with createOutgoingConnection()			if(established) {			    continue;			}						try {			    outgoing=timed_writer.createSocket(local, remote, remote_port, interval);			    outstream=new DataOutputStream(outgoing.getOutputStream());			    if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);			    established=true;			    if(trace) System.out.println("-- CREATE (CE): " + printSocket(outgoing));			    continue;			}			catch(InterruptedException interrupted_ex) {			    continue;			}			catch(Exception ex) {         // IOException, TimedWriter.Timeout			    Util.sleep(interval);  // returns when done or interrupted			}		    }		}	    }	    if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " stopped");	    thread=null;	}    }        private static class MyReceiver implements Link.Receiver {		public void receive(byte[] msg) {	    System.out.println("<-- " + new String(msg));	}		public void linkDown(InetAddress l, int lp, InetAddress r, int rp) {	    System.out.println("** linkDown(): " + r + ':' + rp);	}		public void linkUp(InetAddress l, int lp, InetAddress r, int rp) {	    System.out.println("** linkUp(): " + r + ':' + rp);	}		public void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) {	    System.out.println("** missedHeartbeat(): " + r + ':' + rp);	}		public void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) {	    System.out.println("** receivedHeartbeatAgain(): " + r + ':' + rp);	}    }    public static void main(String[] args) {	String   local, remote;	int      local_port, remote_port;	if(args.length != 4) {	    System.err.println("\nLink <local host> <local port> <remote host> <remote port>\n");	    return;	}	local=args[0];	remote=args[2];	local_port=Integer.parseInt(args[1]);	remote_port=Integer.parseInt(args[3]);	Link l=new Link(local, local_port, remote, remote_port, new MyReceiver());	try {	    l.start();	    System.out.println(l);	    	    BufferedReader in= new BufferedReader(new InputStreamReader(System.in));	    	    while(true) {		System.out.print("> "); System.out.flush();		String line=in.readLine();		l.send(line.getBytes());	    }	}	catch(Exception e) {	    System.err.println(e);	}    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?