📄 connectiontablenio.java
字号:
* create instances of WriteHandler threads for sending data.
*
* @param workerThreads is the number of threads to create.
*/
private static WriteHandler[] create(int workerThreads)
{
WriteHandler[] handlers = new WriteHandler[workerThreads];
for (int looper = 0; looper < workerThreads; looper++)
{
handlers[looper] = new WriteHandler();
Thread thread = new Thread(handlers[looper], "nioWriteHandlerThread");
thread.setDaemon(true);
thread.start();
}
return handlers;
}
/**
* Add a new channel to be handled.
*
* @param channel
*/
private SelectorWriteHandler add(SocketChannel channel)
{
return new SelectorWriteHandler(channel, SELECTOR, m_headerBuffer);
}
private void write(SocketChannel channel, ByteBuffer buffer,
FutureResult notification, SelectorWriteHandler hdlr)
throws
InterruptedException
{
QUEUE.insert(new WriteRequest(channel, buffer, notification, hdlr));
}
private void close(SelectorWriteHandler entry)
{
entry.cancel();
}
private void handleChannelError(SelectorWriteHandler entry, Throwable error)
{
do
{
if (error != null)
{
entry.notifyError(error);
}
}
while (entry.next());
close(entry);
}
// process the write operation
private void processWrite(Selector selector)
{
Set keys = selector.selectedKeys();
Object arr[] = keys.toArray();
for (int looper = 0; looper < arr.length; looper++)
{
SelectionKey key = (SelectionKey) arr[looper];
SelectorWriteHandler entry = (SelectorWriteHandler) key.attachment();
boolean needToDecrementPendingChannels = false;
try
{
if (0 == entry.write())
{ // write the buffer and if the remaining bytes is zero,
// notify the caller of number of bytes written.
entry.notifyObject(new Integer(entry.getBytesWritten()));
// switch to next write buffer or clear interest bit on socket channel.
if (!entry.next())
{
needToDecrementPendingChannels = true;
}
}
}
catch (IOException e)
{
needToDecrementPendingChannels = true;
// connection must of closed
handleChannelError(entry, e);
}
finally
{
if (needToDecrementPendingChannels)
{
m_pendingChannels--;
}
}
}
keys.clear();
}
public void run()
{
while (SELECTOR.isOpen())
{
// try {
WriteRequest queueEntry;
Object o;
// When there are no more commands in the Queue, we will hit the blocking code after this loop.
try
{
while (null != (o = QUEUE.poll(0)))
{
if (o instanceof Shutdown)
{ // Stop the thread
try
{
SELECTOR.close();
}
catch (IOException e)
{
if (LOG.isInfoEnabled())
{
LOG.info("Write selector close operation failed", e);
}
}
return;
}
queueEntry = (WriteRequest) o;
if (queueEntry.getHandler().add(queueEntry))
{
m_pendingChannels++;
}
try
{
// process any connections ready to be written to.
if (SELECTOR.selectNow() > 0)
{
processWrite(SELECTOR);
}
}
catch (IOException e)
{ // need to understand what causes this error so we can handle it properly
if (LOG.isErrorEnabled())
{
LOG.error("SelectNow operation on write selector failed, didn't expect this to occur, please report this",
e);
}
return; // if select fails, give up so we don't go into a busy loop.
}
}
}
catch (InterruptedException ex)
{
}
// if there isn't any pending work to do, block on queue to get next request.
if (m_pendingChannels == 0)
{
try
{
o = QUEUE.take();
}
catch (InterruptedException ex1)
{
o = null;
ex1.printStackTrace();
continue;
}
if (o instanceof Shutdown)
{ // Stop the thread
try
{
SELECTOR.close();
}
catch (IOException e)
{
if (LOG.isInfoEnabled())
{
LOG.info("Write selector close operation failed", e);
}
}
return;
}
queueEntry = (WriteRequest) o;
if (queueEntry.getHandler().add(queueEntry))
{
m_pendingChannels++;
}
}
// otherwise do a blocking wait select operation.
else
{
try
{
if ( (SELECTOR.select()) > 0)
{
processWrite(SELECTOR);
}
}
catch (IOException e)
{ // need to understand what causes this error
if (LOG.isErrorEnabled())
{
LOG.error("Failure while writing to socket", e);
}
}
}
// }
// catch (InterruptedException e) {
// if (LOG.isErrorEnabled()) {
// LOG.error("Thread (" + Thread.currentThread().getName() +
// ") was interrupted", e);
// }
// }
// catch (Throwable e) { // We are a daemon thread so we shouldn't prevent the process from terminating if
// // the controlling thread decides that should happen.
// if (LOG.isErrorEnabled()) {
// LOG.error("Thread (" + Thread.currentThread().getName() +
// ") caught Throwable", e);
// }
// }
}
}
}
// Wrapper class for passing Write requests. There will be an instance of this class for each socketChannel
// mapped to a Selector.
public static class SelectorWriteHandler
{
private final LinkedList m_writeRequests = new LinkedList(); // Collection of writeRequests
private boolean m_headerSent = false;
private SocketChannel m_channel;
private SelectionKey m_key;
private Selector m_selector;
private int m_bytesWritten = 0;
private boolean m_enabled = false;
private ByteBuffer m_headerBuffer;
SelectorWriteHandler(SocketChannel channel, Selector selector,
ByteBuffer headerBuffer)
{
m_channel = channel;
m_selector = selector;
m_headerBuffer = headerBuffer;
}
private void register(Selector selector, SocketChannel channel)
throws
ClosedChannelException
{
// register the channel but don't enable OP_WRITE until we have a write request.
m_key = channel.register(selector, 0, this);
}
// return true if selection key is enabled when it wasn't previous to call.
private boolean enable()
{
boolean rc = false;
try
{
if (m_key == null)
{ // register the socket on first access,
// we are the only thread using this variable, so no sync needed.
register(m_selector, m_channel);
}
}
catch (ClosedChannelException e)
{
return rc;
}
if (!m_enabled)
{
rc = true;
try
{
m_key.interestOps(SelectionKey.OP_WRITE);
}
catch (CancelledKeyException e)
{ // channel must of closed
return false;
}
m_enabled = true;
}
return rc;
}
private void disable()
{
if (m_enabled)
{
try
{
m_key.interestOps(0); // pass zero which means that we are not interested in being
// notified of anything for this channel.
}
catch (CancelledKeyException eat)
{ // we probably don't need to throw this exception (if they try to write
// again, we will then throw an exception).
}
m_enabled = false;
}
}
private void cancel()
{
m_key.cancel();
}
boolean add(WriteRequest entry)
{
m_writeRequests.add(entry);
return enable();
}
WriteRequest getCurrentRequest()
{
return (WriteRequest) m_writeRequests.getFirst();
}
SocketChannel getChannel()
{
return m_channel;
}
ByteBuffer getBuffer()
{
return getCurrentRequest().getBuffer();
}
FutureResult getCallback()
{
return getCurrentRequest().getCallback();
}
int getBytesWritten()
{
return m_bytesWritten;
}
void notifyError(Throwable error)
{
if (getCallback() != null)
{
getCallback().setException(error);
}
}
void notifyObject(Object result)
{
if (getCallback() != null)
{
getCallback().set(result);
}
}
boolean next()
{
m_headerSent = false;
m_bytesWritten = 0;
m_writeRequests.removeFirst(); // remove current entry
boolean rc = !m_writeRequests.isEmpty();
if (!rc)
{ // disable select for this channel if no more entries
disable();
}
return rc;
}
int write()
throws IOException
{
if (!m_headerSent)
{
m_headerSent = true;
m_headerBuffer.clear();
m_headerBuffer.putInt(getBuffer().remaining());
m_headerBuffer.flip();
do
{
getChannel().write(m_headerBuffer);
} // we should be able to handle writing the header in one action but just in case, just do a busy loop
while (m_headerBuffer.remaining() > 0);
}
m_bytesWritten += (getChannel().write(getBuffer()));
return getBuffer().remaining();
}
}
public static class WriteRequest
{
private final SocketChannel m_channel;
private final ByteBuffer m_buffer;
private final FutureResult m_callback;
private final SelectorWriteHandler m_hdlr;
WriteRequest(SocketChannel channel, ByteBuffer buffer,
FutureResult callback, SelectorWriteHandler hdlr)
{
m_channel = channel;
m_buffer = buffer;
m_callback = callback;
m_hdlr = hdlr;
}
SelectorWriteHandler getHandler()
{
return m_hdlr;
}
SocketChannel getChannel()
{
return m_channel;
}
ByteBuffer getBuffer()
{
return m_buffer;
}
FutureResult getCallback()
{
return m_callback;
}
}
class Reaper
implements Runnable
{
Thread t = null;
Reaper()
{
;
}
public void start()
{
if (t != null && !t.isAlive())
{
t = null;
}
if (t == null)
{
//RKU 7.4.2003, put in threadgroup
t = new Thread(thread_group, this, "ConnectionTable.ReaperThread");
// t.setDaemon(true); // will allow us to terminate if all remaining threads are daemons
t.start();
}
}
public void stop()
{
if (t != null)
{
t = null;
}
}
public boolean isRunning()
{
return t != null;
}
public void run()
{
Connection value;
Map.Entry entry;
long curr_time;
if (log.isInfoEnabled())
{
log.info("connection reaper thread was started. Number of connections=" +
conns.size() + ", reaper_interval=" + reaper_interval +
", conn_expire_time=" +
conn_expire_time);
}
while (conns.size() > 0 && t != null && t.equals(Thread.currentThread()))
{
// Util.sleep(reaper_interval);
synchronized (conns)
{
curr_time = System.currentTimeMillis();
for (Iterator it = conns.entrySet().iterator(); it.hasNext(); )
{
entry = (Map.Entry) it.next();
value = (Connection) entry.getValue();
if (log.isInfoEnabled())
{
// log.info("connection is " +
// ( (curr_time - value.last_access) / 1000) +
// " seconds old (curr-time=" +
// curr_time + ", last_access=" + value.last_access + ')');
}
if (value.last_access + conn_expire_time < curr_time)
{
if (log.isInfoEnabled())
{
log.info("connection " + value +
" has been idle for too long (conn_expire_time=" +
conn_expire_time +
"), will be removed");
}
value.destroy();
it.remove();
}
}
}
}
if (log.isInfoEnabled())
{
log.info("reaper terminated");
}
t = null;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -