connectiontablenio.java

来自「JGRoups源码」· Java 代码 · 共 1,494 行 · 第 1/4 页

JAVA
1,494
字号
// $Id: ConnectionTableNIO.java,v 1.24 2006/09/18 18:00:37 bstansberry Exp $package org.jgroups.blocks;import EDU.oswego.cs.dl.util.concurrent.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.stack.IpAddress;import org.jgroups.util.Util;import java.io.IOException;import java.net.*;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;import java.util.LinkedList;import java.util.Set;/** * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this * connection.  For incoming messages, one server socket is created at startup. For each new incoming * client connecting, a new thread from a thread pool is allocated and listens for incoming messages * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed * after some time. * <p/> * Incoming messages from any of the sockets can be received by setting the message listener. * * We currently require use_incoming_packet_handler=true (release 2.4 will support use_incoming_packet_handler=false * due to threadless stack support). * * @author Bela Ban, Scott Marlow, Alex Fu */public class ConnectionTableNIO extends BasicConnectionTable implements Runnable {   private ServerSocketChannel m_serverSocketChannel;   private Selector m_acceptSelector;   protected final static Log LOG = LogFactory.getLog(ConnectionTableNIO.class);   private WriteHandler[] m_writeHandlers;   private int m_nextWriteHandler = 0;   private final Object m_lockNextWriteHandler = new Object();   private ReadHandler[] m_readHandlers;   private int m_nextReadHandler = 0;   private final Object m_lockNextReadHandler = new Object();   // thread pool for processing read requests   private Executor m_requestProcessors;   private volatile boolean serverStopping=false;   private final LinkedList m_backGroundThreads = new LinkedList();  // Collection of all created threads   private int m_reader_threads = 8;   private int m_writer_threads = 8;   private int m_processor_threads = 10;                    // PooledExecutor.createThreads()   private int m_processor_minThreads = 10;                 // PooledExecutor.setMinimumPoolSize()   private int m_processor_maxThreads = 10;                 // PooledExecutor.setMaxThreads()   private int m_processor_queueSize=100;                   // Number of queued requests that can be pending waiting   // for a background thread to run the request.   private int m_processor_keepAliveTime = -1;              // PooledExecutor.setKeepAliveTime( milliseconds);    // A negative value means to wait forever   /**    * @param srv_port    * @throws Exception    */   public ConnectionTableNIO(int srv_port) throws Exception {      this.srv_port=srv_port;      start();   }   /**    * @param srv_port    * @param reaper_interval    * @param conn_expire_time    * @throws Exception    */   public ConnectionTableNIO(int srv_port, long reaper_interval,                             long conn_expire_time) throws Exception {      this.srv_port=srv_port;      this.reaper_interval=reaper_interval;      this.conn_expire_time=conn_expire_time;      start();   }   /**    * @param r    * @param bind_addr    * @param external_addr    * @param srv_port    * @param max_port    * @throws Exception    */   public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port   )      throws Exception   {      setReceiver(r);      this.external_addr=external_addr;      this.bind_addr=bind_addr;      this.srv_port=srv_port;      this.max_port=max_port;      use_reaper=true;      start();   }    public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,                              boolean doStart    )            throws Exception    {        setReceiver(r);        this.external_addr=external_addr;        this.bind_addr=bind_addr;        this.srv_port=srv_port;        this.max_port=max_port;        use_reaper=true;        if(doStart)            start();    }   /**    * @param r    * @param bind_addr    * @param external_addr    * @param srv_port    * @param max_port    * @param reaper_interval    * @param conn_expire_time    * @throws Exception    */   public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,                             long reaper_interval, long conn_expire_time                             ) throws Exception   {      setReceiver(r);      this.bind_addr=bind_addr;      this.external_addr=external_addr;      this.srv_port=srv_port;      this.max_port=max_port;      this.reaper_interval=reaper_interval;      this.conn_expire_time=conn_expire_time;      use_reaper=true;      start();   }    public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port,                              long reaper_interval, long conn_expire_time, boolean doStart    ) throws Exception    {        setReceiver(r);        this.bind_addr=bind_addr;        this.external_addr=external_addr;        this.srv_port=srv_port;        this.max_port=max_port;        this.reaper_interval=reaper_interval;        this.conn_expire_time=conn_expire_time;        use_reaper=true;        if(doStart)            start();    }    public int getReaderThreads() { return m_reader_threads; }    public void setReaderThreads(int m_reader_threads) {        this.m_reader_threads=m_reader_threads;    }    public int getWriterThreads() { return m_writer_threads; }    public void setWriterThreads(int m_writer_threads) {        this.m_writer_threads=m_writer_threads;    }    public int getProcessorThreads() { return m_processor_threads; }    public void setProcessorThreads(int m_processor_threads) {        this.m_processor_threads=m_processor_threads;    }    public int getProcessorMinThreads() { return m_processor_minThreads;}    public void setProcessorMinThreads(int m_processor_minThreads) {        this.m_processor_minThreads=m_processor_minThreads;    }    public int getProcessorMaxThreads() { return m_processor_maxThreads;}    public void setProcessorMaxThreads(int m_processor_maxThreads) {        this.m_processor_maxThreads=m_processor_maxThreads;    }    public int getProcessorQueueSize() { return m_processor_queueSize; }    public void setProcessorQueueSize(int m_processor_queueSize) {        this.m_processor_queueSize=m_processor_queueSize;    }    public int getProcessorKeepAliveTime() { return m_processor_keepAliveTime; }    public void setProcessorKeepAliveTime(int m_processor_keepAliveTime) {        this.m_processor_keepAliveTime=m_processor_keepAliveTime;    }    /**    * Try to obtain correct Connection (or create one if not yet existent)    */   ConnectionTable.Connection getConnection(Address dest) throws Exception   {      Connection conn;      SocketChannel sock_ch;      synchronized (conns)      {         conn = (Connection) conns.get(dest);         if (conn == null)         {            InetSocketAddress destAddress = new InetSocketAddress(((IpAddress) dest).getIpAddress(),               ((IpAddress) dest).getPort());            sock_ch = SocketChannel.open(destAddress);             sock_ch.socket().setTcpNoDelay(tcp_nodelay);            conn = new Connection(sock_ch, dest);            conn.sendLocalAddress(local_addr);            // This outbound connection is ready            sock_ch.configureBlocking(false);            try            {               if (LOG.isTraceEnabled())                  LOG.trace("About to change new connection send buff size from " + sock_ch.socket().getSendBufferSize() + " bytes");               sock_ch.socket().setSendBufferSize(send_buf_size);               if (LOG.isTraceEnabled())                  LOG.trace("Changed new connection send buff size to " + sock_ch.socket().getSendBufferSize() + " bytes");            }            catch (IllegalArgumentException ex)            {               if (log.isErrorEnabled()) log.error("exception setting send buffer size to " +                  send_buf_size + " bytes: " + ex);            }            try            {               if (LOG.isTraceEnabled())                  LOG.trace("About to change new connection receive buff size from " + sock_ch.socket().getReceiveBufferSize() + " bytes");               sock_ch.socket().setReceiveBufferSize(recv_buf_size);               if (LOG.isTraceEnabled())                  LOG.trace("Changed new connection receive buff size to " + sock_ch.socket().getReceiveBufferSize() + " bytes");            }            catch (IllegalArgumentException ex)            {               if (log.isErrorEnabled()) log.error("exception setting receive buffer size to " +                  send_buf_size + " bytes: " + ex);            }            int idx;            synchronized (m_lockNextWriteHandler)            {               idx = m_nextWriteHandler = (m_nextWriteHandler + 1) % m_writeHandlers.length;            }            conn.setupWriteHandler(m_writeHandlers[idx]);            // Put the new connection to the queue            try            {               synchronized (m_lockNextReadHandler)               {                  idx = m_nextReadHandler = (m_nextReadHandler + 1) % m_readHandlers.length;               }               m_readHandlers[idx].add(conn);            } catch (InterruptedException e)            {               if (LOG.isWarnEnabled())                  LOG.warn("Thread (" +Thread.currentThread().getName() + ") was interrupted, closing connection", e);               // What can we do? Remove it from table then.               conn.destroy();               throw e;            }            // Add connection to table            addConnection(dest, conn);            notifyConnectionOpened(dest);            if (LOG.isInfoEnabled()) LOG.info("created socket to " + dest);         }         return conn;      }   }   public final void start() throws Exception {       super.start();       //Roland Kurmann 4/7/2003, build new thread group       thread_group = new ThreadGroup(Util.getGlobalThreadGroup(), "ConnectionTableThreads");       init();       srv_sock=createServerSocket(srv_port, max_port);       if (external_addr!=null)           local_addr=new IpAddress(external_addr, srv_sock.getLocalPort());       else if (bind_addr != null)           local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort());       else           local_addr=new IpAddress(srv_sock.getLocalPort());       if(log.isInfoEnabled()) log.info("server socket created on " + local_addr);       //Roland Kurmann 4/7/2003, put in thread_group       acceptor=new Thread(thread_group, this, "ConnectionTable.AcceptorThread");       acceptor.setDaemon(true);       acceptor.start();       m_backGroundThreads.add(acceptor);       // start the connection reaper - will periodically remove unused connections       if(use_reaper && reaper == null) {           reaper=new Reaper();           reaper.start();       }   }   protected void init()      throws Exception   {      // use directExector if max thread pool size is less than or equal to zero.      if(getProcessorMaxThreads() <= 0) {         m_requestProcessors = new DirectExecutor();      }      else      {         // Create worker thread pool for processing incoming buffers         PooledExecutor requestProcessors = new PooledExecutor(new BoundedBuffer(getProcessorQueueSize()), getProcessorMaxThreads());          requestProcessors.setThreadFactory(new ThreadFactory() {              public Thread newThread(Runnable runnable) {                  Thread new_thread=new Thread(thread_group, runnable);                  new_thread.setDaemon(true);                  new_thread.setName("ConnectionTableNIO.Thread");                  m_backGroundThreads.add(new_thread);                  return new_thread;              }          });         requestProcessors.setMinimumPoolSize(getProcessorMinThreads());         requestProcessors.setKeepAliveTime(getProcessorKeepAliveTime());         requestProcessors.waitWhenBlocked();         requestProcessors.createThreads(getProcessorThreads());         m_requestProcessors = requestProcessors;      }      m_writeHandlers = WriteHandler.create(getWriterThreads(), thread_group, m_backGroundThreads);      m_readHandlers = ReadHandler.create(getReaderThreads(), this, thread_group, m_backGroundThreads);   }   /**    * Closes all open sockets, the server socket and all threads waiting for incoming messages    */   public void stop()   {       super.stop();      serverStopping = true;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?