📄 connectiontablenio.java
字号:
package com.huawei.comm.smap;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import sun.rmi.runtime.Executor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Vector;
import java.util.HashMap;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.util.Map;
public class ConnectionTableNIO
implements Runnable
{
final HashMap conns = new HashMap(); // keys: Addresses (peer address), values: Connection
//Receiver receiver=null;
boolean use_send_queues = true;
InetAddress bind_addr = null;
IpAddress local_addr = null; // bind_addr + port of srv_sock
int srv_port = 7800;
int recv_buf_size = 120000;
int send_buf_size = 60000;
final Vector conn_listeners = new Vector(); // listeners to be notified when a conn is established/torn down
final Object recv_mutex = new Object(); // to serialize simultaneous access to receive() from multiple Connections
Reaper reaper = null; // closes conns that have been idle for more than n secs
long reaper_interval = 60000; // reap unused conns once a minute
long conn_expire_time = 300000; // connections can be idle for 5 minutes before they are reaped
int sock_conn_timeout = 1000; // max time in millis to wait for Socket.connect() to return
ThreadGroup thread_group = null;
protected final Log log = LogFactory.getLog(getClass());
boolean use_reaper = false; // by default we don't reap idle conns
static final int backlog = 20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
ServerSocket srv_sock = null;
boolean reuse_addr = false;
InetAddress external_addr = null;
int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
Thread acceptor = null;
private ServerSocketChannel m_serverSocketChannel;
private Selector m_acceptSelector;
protected final static Log LOG = LogFactory.getLog(ConnectionTableNIO.class);
private WriteHandler[] m_writeHandlers;
private int m_nextWriteHandler = 0;
private final Object m_lockNextWriteHandler = new Object();
private ReadHandler[] m_readHandlers;
private int m_nextReadHandler = 0;
private final Object m_lockNextReadHandler = new Object();
// int max_port = 0; // maximum port to bind to (if < srv_port, no limit)
//InetAddress external_addr = null;
private Protocol prot;
private volatile boolean serverStopping = false;
/**
* @param srv_port
* @throws Exception
*/
public ConnectionTableNIO(Protocol prot, int srv_port)
throws Exception
{
System.out.println("------------------------");
this.srv_port = srv_port;
this.prot = prot;
}
public void up(Event evt)
{
prot.up(evt);
}
public void down(Event evt)
{
// prot.down(evt);
}
/**
* @param srv_port
* @param reaper_interval
* @param conn_expire_time
* @throws Exception
*/
public ConnectionTableNIO(int srv_port, long reaper_interval,
long conn_expire_time)
throws Exception
{
this.srv_port = srv_port;
this.reaper_interval = reaper_interval;
this.conn_expire_time = conn_expire_time;
}
/**
* Try to obtain correct Connection (or create one if not yet existent)
*/
Connection getConnection(IpAddress dest)
throws Exception
{
Connection conn;
SocketChannel sock_ch;
synchronized (conns)
{
conn = (Connection) conns.get(dest);
if (conn == null)
{
InetSocketAddress destAddress = new InetSocketAddress( ( (IpAddress)
dest).getIpAddress(),
( (IpAddress) dest).getPort());
sock_ch = SocketChannel.open(destAddress);
conn = new Connection(sock_ch, dest);
// conn.sendLocalAddress(local_addr);
// This outbound connection is ready
sock_ch.configureBlocking(false);
try
{
if (LOG.isTraceEnabled())
{
LOG.trace("About to change new connection send buff size from " +
sock_ch.socket().getSendBufferSize() + " bytes");
}
sock_ch.socket().setSendBufferSize(send_buf_size);
if (LOG.isTraceEnabled())
{
LOG.trace("Changed new connection send buff size to " +
sock_ch.socket().getSendBufferSize() + " bytes");
}
}
catch (IllegalArgumentException ex)
{
if (log.isErrorEnabled())
{
log.error("exception setting send buffer size to " +
send_buf_size + " bytes: " + ex);
}
}
try
{
if (LOG.isTraceEnabled())
{
LOG.trace("About to change new connection receive buff size from " +
sock_ch.socket().getReceiveBufferSize() + " bytes");
}
sock_ch.socket().setReceiveBufferSize(recv_buf_size);
if (LOG.isTraceEnabled())
{
LOG.trace("Changed new connection receive buff size to " +
sock_ch.socket().getReceiveBufferSize() + " bytes");
}
}
catch (IllegalArgumentException ex)
{
if (log.isErrorEnabled())
{
log.error("exception setting receive buffer size to " +
send_buf_size + " bytes: " + ex);
}
}
int idx;
synchronized (m_lockNextWriteHandler)
{
idx = m_nextWriteHandler = (m_nextWriteHandler + 1) %
m_writeHandlers.length;
}
conn.setupWriteHandler(m_writeHandlers[idx]);
// Put the new connection to the queue
try
{
synchronized (m_lockNextReadHandler)
{
idx = m_nextReadHandler = (m_nextReadHandler + 1) %
m_readHandlers.length;
}
m_readHandlers[idx].add(conn);
}
catch (InterruptedException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn("Thread (" + Thread.currentThread().getName() +
") was interrupted, closing connection", e);
// What can we do? Remove it from table then.
}
conn.destroy();
throw e;
}
// Add connection to table
addConnection(dest, conn);
// notifyConnectionOpened(dest);
if (LOG.isInfoEnabled())
{
LOG.info("created socket to " + dest);
}
}
return conn;
}
}
void addConnection(IpAddress peer, Connection c)
{
conns.put(peer, c);
if (reaper != null && !reaper.isRunning())
{
reaper.start();
}
}
public final void start()
throws Exception
{
init();
srv_sock = createServerSocket(srv_port, max_port);
if (external_addr != null)
{
local_addr = new IpAddress(external_addr, srv_sock.getLocalPort());
}
else if (bind_addr != null)
{
local_addr = new IpAddress(bind_addr, srv_sock.getLocalPort());
}
else
{
local_addr = new IpAddress(srv_sock.getLocalPort());
}
if (log.isInfoEnabled())
{
log.info("server socket created on " + local_addr);
}
thread_group = new ThreadGroup(new ThreadGroup("JGroups threads"),
"ConnectionTableGroup");
acceptor = new Thread(thread_group, this, "ConnectionTable.AcceptorThread");
acceptor.setDaemon(true);
acceptor.start();
use_reaper = true;
// start the connection reaper - will periodically remove unused connections
// if (use_reaper && reaper == null)
// {
// reaper = new Reaper();
// reaper.start();
//
// }
}
protected void init()
throws Exception
{
m_writeHandlers = WriteHandler.create(3);
m_readHandlers = ReadHandler.create(3, this);
}
/**
* Closes all open sockets, the server socket and all threads waiting for incoming messages
*/
public void stop()
{
serverStopping = true;
// Stop the main selector
m_acceptSelector.wakeup();
// Stop selector threads
for (int i = 0; i < m_readHandlers.length; i++)
{
try
{
m_readHandlers[i].add(new Shutdown());
}
catch (InterruptedException e)
{
LOG.error("Thread (" + Thread.currentThread().getName() +
") was interrupted, failed to shutdown selector", e);
}
}
for (int i = 0; i < m_writeHandlers.length; i++)
{
// m_writeHandlers[i].QUEUE.put(new Shutdown());
m_writeHandlers[i].SELECTOR.wakeup();
}
// then close the connections
synchronized (conns)
{
Iterator it = conns.values().iterator();
while (it.hasNext())
{
Connection conn = (Connection) it.next();
conn.destroy();
}
conns.clear();
}
}
/**
* Acceptor thread. Continuously accept new connections and assign readhandler/writehandler
* to them.
*/
public void run()
{
Connection conn;
while (m_serverSocketChannel.isOpen() && !serverStopping)
{
int num;
try
{
num = m_acceptSelector.select();
LOG.warn("Select operation :" + num);
}
catch (IOException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn("Select operation on listening socket failed", e);
}
continue; // Give up this time
}
if (num > 0)
{
Set readyKeys = m_acceptSelector.selectedKeys();
for (Iterator i = readyKeys.iterator(); i.hasNext(); )
{
SelectionKey key = (SelectionKey) i.next();
i.remove();
// We only deal with new incoming connections
ServerSocketChannel readyChannel = (ServerSocketChannel) key.channel();
SocketChannel client_sock_ch;
try
{
client_sock_ch = readyChannel.accept();
client_sock_ch.configureBlocking(false);
}
catch (IOException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn(
"Attempt to accept new connection from listening socket failed",
e);
// Give up this connection
}
continue;
}
if (LOG.isInfoEnabled())
{
LOG.info("accepted connection, client_sock=" +
client_sock_ch.socket());
}
try
{
if (LOG.isTraceEnabled())
{
LOG.trace("About to change new connection send buff size from " +
client_sock_ch.socket().getSendBufferSize() + " bytes");
}
client_sock_ch.socket().setSendBufferSize(send_buf_size);
if (LOG.isTraceEnabled())
{
LOG.trace("Changed new connection send buff size to " +
client_sock_ch.socket().getSendBufferSize() + " bytes");
}
}
catch (IllegalArgumentException ex)
{
if (log.isErrorEnabled())
{
log.error("exception setting send buffer size to " +
send_buf_size + " bytes: ", ex);
}
}
catch (SocketException e)
{
if (log.isErrorEnabled())
{
log.error("exception setting send buffer size to " +
send_buf_size + " bytes: ", e);
}
}
try
{
if (LOG.isTraceEnabled())
{
LOG.trace(
"About to change new connection receive buff size from " +
client_sock_ch.socket().getReceiveBufferSize() + " bytes");
}
client_sock_ch.socket().setReceiveBufferSize(recv_buf_size);
if (LOG.isTraceEnabled())
{
LOG.trace("Changed new connection receive buff size to " +
client_sock_ch.socket().getReceiveBufferSize() +
" bytes");
}
}
catch (IllegalArgumentException ex)
{
if (log.isErrorEnabled())
{
log.error("exception setting receive buffer size to " +
send_buf_size + " bytes: ", ex);
}
}
catch (SocketException e)
{
if (log.isErrorEnabled())
{
log.error("exception setting receive buffer size to " +
recv_buf_size + " bytes: ", e);
}
}
conn = new Connection(client_sock_ch, null);
addConnection(conn.getPeerAddress(), conn);
int idx;
synchronized (m_lockNextWriteHandler)
{
idx = m_nextWriteHandler = (m_nextWriteHandler + 1) %
m_writeHandlers.length;
}
conn.setupWriteHandler(m_writeHandlers[idx]);
try
{
synchronized (m_lockNextReadHandler)
{
idx = m_nextReadHandler = (m_nextReadHandler + 1) %
m_readHandlers.length;
}
m_readHandlers[idx].add(conn);
}
catch (InterruptedException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn(
"Attempt to configure read handler for accepted connection failed",
e);
// close connection
}
conn.destroy();
}
} // end of iteration
} // end of selected key > 0
} // end of thread
if (m_serverSocketChannel.isOpen())
{
try
{
m_serverSocketChannel.close();
}
catch (Exception e)
{
log.error("exception closing server listening socket", e);
}
}
if (LOG.isTraceEnabled())
{
LOG.trace("acceptor thread terminated");
}
}
/**
* Finds first available port starting at start_port and returns server socket. Sets srv_port
*/
protected ServerSocket createServerSocket(int start_port, int end_port)
throws
Exception
{
this.m_acceptSelector = Selector.open();
m_serverSocketChannel = ServerSocketChannel.open();
m_serverSocketChannel.configureBlocking(false);
while (true)
{
try
{
SocketAddress sockAddr;
if (bind_addr == null)
{
sockAddr = new InetSocketAddress(start_port);
m_serverSocketChannel.socket().bind(sockAddr);
}
else
{
sockAddr = new InetSocketAddress(bind_addr, start_port);
m_serverSocketChannel.socket().bind(sockAddr, backlog);
}
}
catch (BindException bind_ex)
{
if (start_port == end_port)
{
throw (BindException) ( (new BindException(
"No available port to bind to")).initCause(bind_ex));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -