link.java
来自「JGRoups源码」· Java 代码 · 共 655 行 · 第 1/2 页
JAVA
655 行
if(established) { return true; } try { if(writer == null) writer=new TimedWriter(); // create a socket to remote:remote_port, bind to local address (choose any local port); // outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port outgoing=writer.createSocket(local, remote, remote_port, timeout); outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default ! outstream=new DataOutputStream(outgoing.getOutputStream()); if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port); established=true; if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing)); return true; } catch(Exception e) { established=false; return false; } } } /** Closes the outgoing connection */ void closeOutgoingConnection() { synchronized(outgoing_mutex) { if(!established) { return; } if(outstream != null) { if(trace) System.out.println("-- CLOSE: outgoing is " + printSocket(outgoing)); try { outstream.close(); // flush data before socket is closed } catch(Exception e) {} outstream=null; } if(outgoing != null) { try { outgoing.close(); } catch(Exception e) {} outgoing=null; } established=false; if(receiver != null) receiver.linkDown(local, local_port, remote, remote_port); } } /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()), then it closes the outgoing *and* incoming socket. The latter needs to be done, so that we can return to accept() and await a new client connection request. */ synchronized void closeIncomingConnection() { if(instream != null) { if(trace) System.out.println("-- CLOSE: incoming is " + printSocket(incoming)); try {instream.close();} catch(Exception e) {} instream=null; } if(incoming != null) { try {incoming.close();} catch(Exception e) {} incoming=null; } } /** Close outgoing and incoming sockets. */ synchronized void closeConnections() { // 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat // thread cannot be stopped in here, because this method is called by it ! closeOutgoingConnection(); // 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()), // then it closes the outgoing *and* incoming socket. The latter needs to be done, // so that we can return to accept() and await a new client connection request. closeIncomingConnection(); } String printSocket(Socket s) { if(s == null) return "<null>"; StringBuffer ret=new StringBuffer(); ret.append(s.getLocalAddress().getHostName()); ret.append(':'); ret.append(s.getLocalPort()); ret.append(" --> "); ret.append(s.getInetAddress().getHostName()); ret.append(':'); ret.append(s.getPort()); return ret.toString(); } /** Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter for both sending and responding to heartbeats. The reason is that a write() might hang if the peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed, ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way, we don't hang sending timeouts. */ class Heartbeat implements Runnable { Thread thread=null; long hb_timeout=10000; // time to wait for heartbeats from peer, if not received -> boom ! long interval=3000; // {send a heartbeat | try to create connection} every 3 secs boolean stop_hb=false; long last_hb=System.currentTimeMillis(); boolean missed_hb=false; final TimedWriter timed_writer=new TimedWriter(); public Heartbeat(long timeout, long hb_interval) { this.hb_timeout=timeout; this.interval=hb_interval; } public synchronized void start() { stop(); stop_hb=false; missed_hb=false; last_hb=System.currentTimeMillis(); thread=new Thread(this, "HeartbeatThread"); thread.setDaemon(true); thread.start(); } public synchronized void interrupt() { thread.interrupt(); } public synchronized void stop() { if(thread != null && thread.isAlive()) { stop_hb=true; missed_hb=false; thread.interrupt(); try {thread.join(hb_timeout+1000);} catch(Exception e) {} thread=null; } } /** When we receive a message from the peer, this means the peer is alive. Therefore we update the time of the last heartbeat. */ public void receivedMessage() { last_hb=System.currentTimeMillis(); if(missed_hb) { if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port); missed_hb=false; } } /** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */ public void receivedHeartbeat() { last_hb=System.currentTimeMillis(); if(missed_hb) { if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port); missed_hb=false; } } /** Sends heartbeats when connection is established. Tries to establish connection when not established. Switches between 'established' and 'not established' roles. */ public void run() { long diff=0, curr_time=0, num_missed_hbs=0; if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " started"); while(!stop_hb) { if(established) { // send heartbeats // 1. Send heartbeat (use timed write) if(outstream != null) { try { timed_writer.write(outstream, HB_PACKET, 1500); Thread.sleep(interval); } catch(Exception io_ex) { // IOException and TimedWriter.Timeout closeOutgoingConnection(); // sets established to false continue; } } else { established=false; continue; } // 2. If time of last HB received > timeout --> close connection curr_time=System.currentTimeMillis(); diff=curr_time - last_hb; if(curr_time - last_hb > interval) { num_missed_hbs=(curr_time - last_hb) / interval; if(receiver != null) receiver.missedHeartbeat(local, local_port, remote, remote_port, (int)num_missed_hbs); missed_hb=true; } if(diff >= hb_timeout) { if(trace) System.out.println("###### Link.Heartbeat.run(): no heartbeat receveived for " + diff + " msecs. Closing connections. #####"); closeConnections(); // close both incoming *and* outgoing connections } } else { // try to establish connection synchronized(outgoing_mutex) { // serialize access with createOutgoingConnection() if(established) { continue; } try { outgoing=timed_writer.createSocket(local, remote, remote_port, interval); outstream=new DataOutputStream(outgoing.getOutputStream()); if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port); established=true; if(trace) System.out.println("-- CREATE (CE): " + printSocket(outgoing)); continue; } catch(InterruptedException interrupted_ex) { continue; } catch(Exception ex) { // IOException, TimedWriter.Timeout Util.sleep(interval); // returns when done or interrupted } } } } if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " stopped"); thread=null; } } private static class MyReceiver implements Link.Receiver { public void receive(byte[] msg) { System.out.println("<-- " + new String(msg)); } public void linkDown(InetAddress l, int lp, InetAddress r, int rp) { System.out.println("** linkDown(): " + r + ':' + rp); } public void linkUp(InetAddress l, int lp, InetAddress r, int rp) { System.out.println("** linkUp(): " + r + ':' + rp); } public void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) { System.out.println("** missedHeartbeat(): " + r + ':' + rp); } public void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) { System.out.println("** receivedHeartbeatAgain(): " + r + ':' + rp); } } public static void main(String[] args) { String local, remote; int local_port, remote_port; if(args.length != 4) { System.err.println("\nLink <local host> <local port> <remote host> <remote port>\n"); return; } local=args[0]; remote=args[2]; local_port=Integer.parseInt(args[1]); remote_port=Integer.parseInt(args[3]); Link l=new Link(local, local_port, remote, remote_port, new MyReceiver()); try { l.start(); System.out.println(l); BufferedReader in= new BufferedReader(new InputStreamReader(System.in)); while(true) { System.out.print("> "); System.out.flush(); String line=in.readLine(); l.send(line.getBytes()); } } catch(Exception e) { System.err.println(e); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?