connectiontablenio.java
来自「JGRoups源码」· Java 代码 · 共 1,494 行 · 第 1/4 页
JAVA
1,494 行
// $Id: ConnectionTableNIO.java,v 1.24 2006/09/18 18:00:37 bstansberry Exp $package org.jgroups.blocks;import EDU.oswego.cs.dl.util.concurrent.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Address;import org.jgroups.stack.IpAddress;import org.jgroups.util.Util;import java.io.IOException;import java.net.*;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.channels.spi.SelectorProvider;import java.util.Iterator;import java.util.LinkedList;import java.util.Set;/** * Manages incoming and outgoing TCP connections. For each outgoing message to destination P, if there * is not yet a connection for P, one will be created. Subsequent outgoing messages will use this * connection. For incoming messages, one server socket is created at startup. For each new incoming * client connecting, a new thread from a thread pool is allocated and listens for incoming messages * until the socket is closed by the peer.<br>Sockets/threads with no activity will be killed * after some time. * <p/> * Incoming messages from any of the sockets can be received by setting the message listener. * * We currently require use_incoming_packet_handler=true (release 2.4 will support use_incoming_packet_handler=false * due to threadless stack support). * * @author Bela Ban, Scott Marlow, Alex Fu */public class ConnectionTableNIO extends BasicConnectionTable implements Runnable { private ServerSocketChannel m_serverSocketChannel; private Selector m_acceptSelector; protected final static Log LOG = LogFactory.getLog(ConnectionTableNIO.class); private WriteHandler[] m_writeHandlers; private int m_nextWriteHandler = 0; private final Object m_lockNextWriteHandler = new Object(); private ReadHandler[] m_readHandlers; private int m_nextReadHandler = 0; private final Object m_lockNextReadHandler = new Object(); // thread pool for processing read requests private Executor m_requestProcessors; private volatile boolean serverStopping=false; private final LinkedList m_backGroundThreads = new LinkedList(); // Collection of all created threads private int m_reader_threads = 8; private int m_writer_threads = 8; private int m_processor_threads = 10; // PooledExecutor.createThreads() private int m_processor_minThreads = 10; // PooledExecutor.setMinimumPoolSize() private int m_processor_maxThreads = 10; // PooledExecutor.setMaxThreads() private int m_processor_queueSize=100; // Number of queued requests that can be pending waiting // for a background thread to run the request. private int m_processor_keepAliveTime = -1; // PooledExecutor.setKeepAliveTime( milliseconds); // A negative value means to wait forever /** * @param srv_port * @throws Exception */ public ConnectionTableNIO(int srv_port) throws Exception { this.srv_port=srv_port; start(); } /** * @param srv_port * @param reaper_interval * @param conn_expire_time * @throws Exception */ public ConnectionTableNIO(int srv_port, long reaper_interval, long conn_expire_time) throws Exception { this.srv_port=srv_port; this.reaper_interval=reaper_interval; this.conn_expire_time=conn_expire_time; start(); } /** * @param r * @param bind_addr * @param external_addr * @param srv_port * @param max_port * @throws Exception */ public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port ) throws Exception { setReceiver(r); this.external_addr=external_addr; this.bind_addr=bind_addr; this.srv_port=srv_port; this.max_port=max_port; use_reaper=true; start(); } public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, boolean doStart ) throws Exception { setReceiver(r); this.external_addr=external_addr; this.bind_addr=bind_addr; this.srv_port=srv_port; this.max_port=max_port; use_reaper=true; if(doStart) start(); } /** * @param r * @param bind_addr * @param external_addr * @param srv_port * @param max_port * @param reaper_interval * @param conn_expire_time * @throws Exception */ public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, long reaper_interval, long conn_expire_time ) throws Exception { setReceiver(r); this.bind_addr=bind_addr; this.external_addr=external_addr; this.srv_port=srv_port; this.max_port=max_port; this.reaper_interval=reaper_interval; this.conn_expire_time=conn_expire_time; use_reaper=true; start(); } public ConnectionTableNIO(Receiver r, InetAddress bind_addr, InetAddress external_addr, int srv_port, int max_port, long reaper_interval, long conn_expire_time, boolean doStart ) throws Exception { setReceiver(r); this.bind_addr=bind_addr; this.external_addr=external_addr; this.srv_port=srv_port; this.max_port=max_port; this.reaper_interval=reaper_interval; this.conn_expire_time=conn_expire_time; use_reaper=true; if(doStart) start(); } public int getReaderThreads() { return m_reader_threads; } public void setReaderThreads(int m_reader_threads) { this.m_reader_threads=m_reader_threads; } public int getWriterThreads() { return m_writer_threads; } public void setWriterThreads(int m_writer_threads) { this.m_writer_threads=m_writer_threads; } public int getProcessorThreads() { return m_processor_threads; } public void setProcessorThreads(int m_processor_threads) { this.m_processor_threads=m_processor_threads; } public int getProcessorMinThreads() { return m_processor_minThreads;} public void setProcessorMinThreads(int m_processor_minThreads) { this.m_processor_minThreads=m_processor_minThreads; } public int getProcessorMaxThreads() { return m_processor_maxThreads;} public void setProcessorMaxThreads(int m_processor_maxThreads) { this.m_processor_maxThreads=m_processor_maxThreads; } public int getProcessorQueueSize() { return m_processor_queueSize; } public void setProcessorQueueSize(int m_processor_queueSize) { this.m_processor_queueSize=m_processor_queueSize; } public int getProcessorKeepAliveTime() { return m_processor_keepAliveTime; } public void setProcessorKeepAliveTime(int m_processor_keepAliveTime) { this.m_processor_keepAliveTime=m_processor_keepAliveTime; } /** * Try to obtain correct Connection (or create one if not yet existent) */ ConnectionTable.Connection getConnection(Address dest) throws Exception { Connection conn; SocketChannel sock_ch; synchronized (conns) { conn = (Connection) conns.get(dest); if (conn == null) { InetSocketAddress destAddress = new InetSocketAddress(((IpAddress) dest).getIpAddress(), ((IpAddress) dest).getPort()); sock_ch = SocketChannel.open(destAddress); sock_ch.socket().setTcpNoDelay(tcp_nodelay); conn = new Connection(sock_ch, dest); conn.sendLocalAddress(local_addr); // This outbound connection is ready sock_ch.configureBlocking(false); try { if (LOG.isTraceEnabled()) LOG.trace("About to change new connection send buff size from " + sock_ch.socket().getSendBufferSize() + " bytes"); sock_ch.socket().setSendBufferSize(send_buf_size); if (LOG.isTraceEnabled()) LOG.trace("Changed new connection send buff size to " + sock_ch.socket().getSendBufferSize() + " bytes"); } catch (IllegalArgumentException ex) { if (log.isErrorEnabled()) log.error("exception setting send buffer size to " + send_buf_size + " bytes: " + ex); } try { if (LOG.isTraceEnabled()) LOG.trace("About to change new connection receive buff size from " + sock_ch.socket().getReceiveBufferSize() + " bytes"); sock_ch.socket().setReceiveBufferSize(recv_buf_size); if (LOG.isTraceEnabled()) LOG.trace("Changed new connection receive buff size to " + sock_ch.socket().getReceiveBufferSize() + " bytes"); } catch (IllegalArgumentException ex) { if (log.isErrorEnabled()) log.error("exception setting receive buffer size to " + send_buf_size + " bytes: " + ex); } int idx; synchronized (m_lockNextWriteHandler) { idx = m_nextWriteHandler = (m_nextWriteHandler + 1) % m_writeHandlers.length; } conn.setupWriteHandler(m_writeHandlers[idx]); // Put the new connection to the queue try { synchronized (m_lockNextReadHandler) { idx = m_nextReadHandler = (m_nextReadHandler + 1) % m_readHandlers.length; } m_readHandlers[idx].add(conn); } catch (InterruptedException e) { if (LOG.isWarnEnabled()) LOG.warn("Thread (" +Thread.currentThread().getName() + ") was interrupted, closing connection", e); // What can we do? Remove it from table then. conn.destroy(); throw e; } // Add connection to table addConnection(dest, conn); notifyConnectionOpened(dest); if (LOG.isInfoEnabled()) LOG.info("created socket to " + dest); } return conn; } } public final void start() throws Exception { super.start(); //Roland Kurmann 4/7/2003, build new thread group thread_group = new ThreadGroup(Util.getGlobalThreadGroup(), "ConnectionTableThreads"); init(); srv_sock=createServerSocket(srv_port, max_port); if (external_addr!=null) local_addr=new IpAddress(external_addr, srv_sock.getLocalPort()); else if (bind_addr != null) local_addr=new IpAddress(bind_addr, srv_sock.getLocalPort()); else local_addr=new IpAddress(srv_sock.getLocalPort()); if(log.isInfoEnabled()) log.info("server socket created on " + local_addr); //Roland Kurmann 4/7/2003, put in thread_group acceptor=new Thread(thread_group, this, "ConnectionTable.AcceptorThread"); acceptor.setDaemon(true); acceptor.start(); m_backGroundThreads.add(acceptor); // start the connection reaper - will periodically remove unused connections if(use_reaper && reaper == null) { reaper=new Reaper(); reaper.start(); } } protected void init() throws Exception { // use directExector if max thread pool size is less than or equal to zero. if(getProcessorMaxThreads() <= 0) { m_requestProcessors = new DirectExecutor(); } else { // Create worker thread pool for processing incoming buffers PooledExecutor requestProcessors = new PooledExecutor(new BoundedBuffer(getProcessorQueueSize()), getProcessorMaxThreads()); requestProcessors.setThreadFactory(new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread new_thread=new Thread(thread_group, runnable); new_thread.setDaemon(true); new_thread.setName("ConnectionTableNIO.Thread"); m_backGroundThreads.add(new_thread); return new_thread; } }); requestProcessors.setMinimumPoolSize(getProcessorMinThreads()); requestProcessors.setKeepAliveTime(getProcessorKeepAliveTime()); requestProcessors.waitWhenBlocked(); requestProcessors.createThreads(getProcessorThreads()); m_requestProcessors = requestProcessors; } m_writeHandlers = WriteHandler.create(getWriterThreads(), thread_group, m_backGroundThreads); m_readHandlers = ReadHandler.create(getReaderThreads(), this, thread_group, m_backGroundThreads); } /** * Closes all open sockets, the server socket and all threads waiting for incoming messages */ public void stop() { super.stop(); serverStopping = true;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?