basicconnectiontable.java

来自「JGRoups源码」· Java 代码 · 共 783 行 · 第 1/2 页

JAVA
783
字号
package org.jgroups.blocks;

import org.jgroups.Address;
import org.jgroups.Version;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Queue;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.Util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.net.Socket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.ServerSocket;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.util.*;

/**
 * Shared class for TCP connection tables.
 * @author Scott Marlow
 */
public abstract class BasicConnectionTable {
    final HashMap       conns=new HashMap();       // keys: Addresses (peer address), values: Connection
    Receiver            receiver=null;
    boolean             use_send_queues=true;
    InetAddress         bind_addr=null;
    Address             local_addr=null;             // bind_addr + port of srv_sock
    int                 srv_port=7800;
    int                 recv_buf_size=120000;
    int                 send_buf_size=60000;
    final Vector        conn_listeners=new Vector(); // listeners to be notified when a conn is established/torn down
    final Object        recv_mutex=new Object();     // to serialize simultaneous access to receive() from multiple Connections
    Reaper              reaper=null;                 // closes conns that have been idle for more than n secs
    long                reaper_interval=60000;       // reap unused conns once a minute
    long                conn_expire_time=300000;     // connections can be idle for 5 minutes before they are reaped
    int                 sock_conn_timeout=1000;      // max time in millis to wait for Socket.connect() to return
    ThreadGroup         thread_group=null;
    protected final Log log= LogFactory.getLog(getClass());
    final byte[]        cookie={'b', 'e', 'l', 'a'};
    boolean             use_reaper=false;            // by default we don't reap idle conns
    static final int    backlog=20;                  // 20 conn requests are queued by ServerSocket (addtl will be discarded)
    ServerSocket        srv_sock=null;
    boolean             reuse_addr=false;
    boolean             tcp_nodelay=false;
    int                 linger=-1;

   /**
    * The address which will be broadcast to the group (the externally visible address which this host should
    * be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
    */
    InetAddress		    external_addr=null;
    int                 max_port=0;                   // maximum port to bind to (if < srv_port, no limit)
    Thread              acceptor=null;               // continuously calls srv_sock.accept()
    boolean             running=false;

    final static long   MAX_JOIN_TIMEOUT=10000;


    public final void setReceiver(Receiver r) {
        receiver=r;
    }

   public void addConnectionListener(ConnectionListener l) {
       if(l != null && !conn_listeners.contains(l))
           conn_listeners.addElement(l);
   }

   public void removeConnectionListener(ConnectionListener l) {
       if(l != null) conn_listeners.removeElement(l);
   }

   public Address getLocalAddress() {
       if(local_addr == null)
           local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
       return local_addr;
   }

   public int getSendBufferSize() {
       return send_buf_size;
   }

   public void setSendBufferSize(int send_buf_size) {
       this.send_buf_size=send_buf_size;
   }

   public int getReceiveBufferSize() {
       return recv_buf_size;
   }

   public void setReceiveBufferSize(int recv_buf_size) {
       this.recv_buf_size=recv_buf_size;
   }

   public int getSocketConnectionTimeout() {
       return sock_conn_timeout;
   }

   public void setSocketConnectionTimeout(int sock_conn_timeout) {
       this.sock_conn_timeout=sock_conn_timeout;
   }

   public int getNumConnections() {
       return conns.size();
   }

    public boolean getTcpNodelay() {
        return tcp_nodelay;
    }

    public void setTcpNodelay(boolean tcp_nodelay) {
        this.tcp_nodelay=tcp_nodelay;
    }

    public int getLinger() {
        return linger;
    }

    public void setLinger(int linger) {
        this.linger=linger;
    }

    public boolean getUseSendQueues() {return use_send_queues;}

   public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}

   public void start() throws Exception {
       running=true;
   }

   public void stop() {
       running=false;
   }

   /**
    Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
    */
   public void remove(Address addr) {
       Connection conn;

       synchronized(conns) {
           conn=(Connection)conns.remove(addr);
       }

       if(conn != null) {
           try {
               conn.destroy();  // won't do anything if already destroyed
           }
           catch(Exception e) {
           }
       }
       if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
   }

   /**
    * Calls the receiver callback. We serialize access to this method because it may be called concurrently
    * by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
    */
   public void receive(Address sender, byte[] data, int offset, int length) {
       if(receiver != null) {
           synchronized(recv_mutex) {
               receiver.receive(sender, data, offset, length);
           }
       }
       else
           if(log.isErrorEnabled()) log.error("receiver is null (not set) !");
   }

   public String toString() {
       StringBuffer ret=new StringBuffer();
       Address key;
       Connection val;
       Map.Entry entry;
       HashMap copy;

       synchronized(conns) {
           copy=new HashMap(conns);
       }
       ret.append("connections (" + copy.size() + "):\n");
       for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
           entry=(Map.Entry)it.next();
           key=(Address)entry.getKey();
           val=(Connection)entry.getValue();
           ret.append("key: " + key + ": " + val + '\n');
       }
       ret.append('\n');
       return ret.toString();
   }

   void notifyConnectionOpened(Address peer) {
       if(peer == null) return;
       for(int i=0; i < conn_listeners.size(); i++)
           ((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer);
   }

   void notifyConnectionClosed(Address peer) {
       if(peer == null) return;
       for(int i=0; i < conn_listeners.size(); i++)
           ((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer);
   }

   void addConnection(Address peer, Connection c) {
       conns.put(peer, c);
       if(reaper != null && !reaper.isRunning())
           reaper.start();
   }

   public void send(Address dest, byte[] data, int offset, int length) throws Exception {
       Connection conn;
       if(dest == null) {
           if(log.isErrorEnabled())
               log.error("destination is null");
           return;
       }

       if(data == null) {
           log.warn("data is null; discarding packet");
           return;
       }

       if(!running) {
           if(log.isWarnEnabled())
               log.warn("connection table is not running, discarding message to " + dest);
           return;
       }

       // 1. Try to obtain correct Connection (or create one if not yet existent)
       try {
           conn=getConnection(dest);
           if(conn == null) return;
       }
       catch(Throwable ex) {
           throw new Exception("connection to " + dest + " could not be established", ex);
       }

       // 2. Send the message using that connection
       try {
           conn.send(data, offset, length);
       }
       catch(Throwable ex) {
           if(log.isTraceEnabled())
               log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex);
           remove(dest);
       }
   }

   abstract Connection getConnection(Address dest) throws Exception;

   /**
    * Removes all connections from ConnectionTable which are not in c
    * @param c
    */
   //public void retainAll(Collection c) {
     //  conns.keySet().retainAll(c);
   //}


      /**
       * Removes all connections from ConnectionTable which are not in current_mbrs
       * @param current_mbrs
       */
      public void retainAll(Collection current_mbrs) {
          if(current_mbrs == null) return;
          HashMap copy;
          synchronized(conns) {
              copy=new HashMap(conns);
              conns.keySet().retainAll(current_mbrs);
          }

          // All of the connections that were not retained must be destroyed
          // so that their resources are cleaned up.
          Map.Entry entry;
          for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
              entry=(Map.Entry)it.next();
              Object oKey=entry.getKey();
              if(!current_mbrs.contains(oKey)) {    // This connection NOT in the resultant connection set
                  Connection conn=(Connection)entry.getValue();
                  if(null != conn) {    // Destroy this connection
                      if(log.isTraceEnabled())
                          log.trace("Destroy this orphaned connection: " + conn);
                      conn.destroy();
                  }
              }
          }
          copy.clear();
      }



    /** Used for message reception. */
    public interface Receiver {
       void receive(Address sender, byte[] data, int offset, int length);
   }

   /** Used to be notified about connection establishment and teardown. */
   public interface ConnectionListener {
       void connectionOpened(Address peer_addr);
       void connectionClosed(Address peer_addr);
   }

   class Connection implements Runnable {
       Socket           sock=null;                // socket to/from peer (result of srv_sock.accept() or new Socket())
       String           sock_addr=null;           // used for Thread.getName()
       DataOutputStream out=null;                 // for sending messages
       DataInputStream  in=null;                  // for receiving messages
       Thread           receiverThread=null;      // thread for receiving messages
       Address          peer_addr=null;           // address of the 'other end' of the connection
       final Object     send_mutex=new Object();  // serialize sends
       long             last_access=System.currentTimeMillis(); // last time a message was sent or received

       /** Queue<byte[]> of data to be sent to the peer of this connection */
       Queue            send_queue=new Queue();
       Sender           sender=new ConnectionTable.Connection.Sender();
       boolean          is_running=false;


       private String getSockAddress() {
           if(sock_addr != null)
               return sock_addr;
           if(sock != null) {
               StringBuffer sb=new StringBuffer();
               sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
               sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
               sock_addr=sb.toString();
           }
           return sock_addr;
       }




       Connection(Socket s, Address peer_addr) {
           sock=s;
           this.peer_addr=peer_addr;
           try {
               // out=new DataOutputStream(sock.getOutputStream());
               // in=new DataInputStream(sock.getInputStream());

               // The change to buffered input and output stream yielded a 400% performance gain !
               // bela Sept 7 2006
               out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
               in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
           }
           catch(Exception ex) {
               if(log.isErrorEnabled()) log.error("exception is " + ex);
           }
       }


       boolean established() {
           return receiverThread != null;
       }


       void setPeerAddress(Address peer_addr) {
           this.peer_addr=peer_addr;
       }

       Address getPeerAddress() {return peer_addr;}

       void updateLastAccessed() {
           last_access=System.currentTimeMillis();
       }

       void init() {
           is_running=true;
           if(receiverThread == null || !receiverThread.isAlive()) {
               // Roland Kurmann 4/7/2003, put in thread_group
               receiverThread=new Thread(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
               receiverThread.setDaemon(true);
               receiverThread.start();
               if(log.isTraceEnabled())
                   log.trace("ConnectionTable.Connection.Receiver started");
           }

       }


       void destroy() {
           is_running=false;
           closeSocket(); // should terminate handler as well
           sender.stop();
           Thread tmp=receiverThread;
           receiverThread=null;
           if(tmp != null) {
               try {
                   tmp.interrupt();

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?