📄 connectiontablenio.java
字号:
}
start_port++;
continue;
}
catch (SocketException bind_ex)
{
if (start_port == end_port)
{
throw (BindException) ( (new BindException(
"No available port to bind to")).initCause(bind_ex));
}
start_port++;
continue;
}
catch (IOException io_ex)
{
if (LOG.isErrorEnabled())
{
LOG.error("Attempt to bind serversocket failed, port=" + start_port +
", bind addr=" + bind_addr, io_ex);
}
throw io_ex;
}
srv_port = start_port;
break;
}
m_serverSocketChannel.register(this.m_acceptSelector,
SelectionKey.OP_ACCEPT);
return m_serverSocketChannel.socket();
}
protected void runRequest(IpAddress addr, ByteBuffer buf)
throws
InterruptedException
{
// m_requestProcessors.execute(new ExecuteTask(addr, buf));
}
// Represents shutdown
private static class Shutdown
{
}
// ReadHandler has selector to deal with read, it runs in seperated thread
private static class ReadHandler
implements Runnable
{
private final Selector SELECTOR = initHandler();
private final LinkedQueue QUEUE = new LinkedQueue();
private final ConnectionTableNIO connectTable;
ReadHandler(ConnectionTableNIO ct)
{
connectTable = ct;
}
public Selector initHandler()
{
// Open the selector
try
{
return Selector.open();
}
catch (IOException e)
{
if (LOG.isErrorEnabled())
{
LOG.error(e);
}
throw new IllegalStateException(e.getMessage());
}
}
/**
* create instances of ReadHandler threads for receiving data.
*
* @param workerThreads is the number of threads to create.
*/
private static ReadHandler[] create(int workerThreads,
ConnectionTableNIO ct)
{
ReadHandler[] handlers = new ReadHandler[workerThreads];
for (int looper = 0; looper < workerThreads; looper++)
{
handlers[looper] = new ReadHandler(ct);
Thread thread = new Thread(handlers[looper], "nioReadHandlerThread");
thread.setDaemon(true);
thread.start();
}
return handlers;
}
private void add(Object conn)
throws InterruptedException
{
QUEUE.insert(conn);
wakeup();
}
private void wakeup()
{
SELECTOR.wakeup();
}
public void run()
{
while (true)
{ // m_s can be closed by the management thread
int events;
try
{
events = SELECTOR.select();
}
catch (IOException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn("Select operation on socket failed", e);
}
continue; // Give up this time
}
catch (ClosedSelectorException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn("Select operation on socket failed", e);
}
return; // Selector gets closed, thread stops
}
if (events > 0)
{ // there are read-ready channels
Set readyKeys = SELECTOR.selectedKeys();
for (Iterator i = readyKeys.iterator(); i.hasNext(); )
{
SelectionKey key = (SelectionKey) i.next();
i.remove();
// Do partial read and handle call back
Connection conn = (Connection) key.attachment();
try
{
if (conn.getSocketChannel().isOpen())
{
readOnce(conn);
}
else
{ // socket connection is already closed, clean up connection state
conn.closed();
}
}
catch (IOException e)
{
if (LOG.isWarnEnabled())
{
LOG.warn("Read operation on socket failed", e);
// The connection must be bad, cancel the key, close socket, then
// remove it from table!
}
key.cancel();
conn.destroy();
conn.closed();
}
}
}
// Now we look at the connection queue to get any new connections added
Object o;
// try {
try
{
o = QUEUE.poll(0); // get a connection
if (null == o)
{
continue;
}
}
catch (InterruptedException ex)
{
o = null;
ex.printStackTrace();
}
// }
// catch (InterruptedException e) {
// if (LOG.isInfoEnabled()) {
// LOG.info("Thread (" + Thread.currentThread().getName() +
// ") was interrupted while polling queue", e);
// // We must give up
// }
// continue;
// }
if (o instanceof Shutdown)
{ // shutdown command?
try
{
SELECTOR.close();
}
catch (IOException e)
{
if (LOG.isInfoEnabled())
{
LOG.info("Read selector close operation failed", e);
}
}
return; // stop reading
}
Connection conn = (Connection) o; // must be a new connection
SocketChannel sc = conn.getSocketChannel();
try
{
sc.register(SELECTOR, SelectionKey.OP_READ, conn);
}
catch (ClosedChannelException e)
{
if (LOG.isInfoEnabled())
{
LOG.info(
"Socket channel was closed while we were trying to register it to selector",
e);
// Channel becomes bad. The connection must be bad,
// close socket, then remove it from table!
}
conn.destroy();
conn.closed();
}
} // end of the while true loop
}
private void readOnce(Connection conn)
throws IOException
{
ConnectionReadState readState = conn.getReadState();
if (!readState.isHeadFinished())
{ // a brand new message coming or header is not completed
// Begin or continue to read header
int size = readHeader(conn);
if (0 == size)
{ // header is not completed
return;
}
}
// Begin or continue to read body
if (readBody(conn) > 0)
{ // not finish yet
return;
}
IpAddress src_addr = conn.getPeerAddress();
ByteBuffer body_buf = readState.getReadBodyBuffer();
ByteBuffer header_buf = readState.getReadHeadBuffer();
header_buf.position(conn.HEADER_SIZE);
header_buf.flip();
int len = header_buf.capacity() + body_buf.capacity();
byte[] msg_buf = new byte[len];
header_buf.get(msg_buf, 0, conn.HEADER_SIZE);
body_buf.get(msg_buf, conn.HEADER_SIZE, body_buf.capacity());
Message msg = new Message(connectTable.local_addr, src_addr, msg_buf);
Event evt = new Event(Event.UP_REQUEST, msg);
connectTable.up(evt);
// Clear status
readState.bodyFinished();
// Assign worker thread to execute call back
}
private int readHeader(Connection conn)
throws IOException
{
ConnectionReadState readState = conn.getReadState();
ByteBuffer headBuf = readState.getReadHeadBuffer();
SocketChannel sc = conn.getSocketChannel();
while (headBuf.remaining() > 0)
{
int num = sc.read(headBuf);
if ( -1 == num)
{ // EOS
throw new IOException("Peer closed socket");
}
if (0 == num)
{ // no more data
return 0;
}
}
// OK, now we get the whole header, change the status and return message size
return readState.headFinished();
}
private int readBody(Connection conn)
throws IOException
{
ByteBuffer bodyBuf = conn.getReadState().getReadBodyBuffer();
SocketChannel sc = conn.getSocketChannel();
while (bodyBuf.remaining() > 0)
{
int num = sc.read(bodyBuf);
if ( -1 == num)
{ // EOS
throw new IOException(
"Couldn't read from socket as peer closed the socket");
}
if (0 == num)
{ // no more data
return bodyBuf.remaining();
}
}
// OK, we finished reading the whole message! Flip it (not necessary though)
bodyBuf.flip();
return 0;
}
}
private class ExecuteTask
implements Runnable
{
IpAddress m_addr = null;
ByteBuffer m_buf = null;
public ExecuteTask(IpAddress addr, ByteBuffer buf)
{
m_addr = addr;
m_buf = buf;
}
public void run()
{
// receive(m_addr, m_buf.array(), m_buf.arrayOffset(), m_buf.limit());
}
}
private class ConnectionReadState
{
private final Connection m_conn;
// Status for receiving message
private boolean m_headFinished = false;
private ByteBuffer m_readBodyBuf = null;
private final ByteBuffer m_readHeadBuf = ByteBuffer.allocate(Connection.
HEADER_SIZE);
public ConnectionReadState(Connection conn)
{
m_conn = conn;
}
ByteBuffer getReadBodyBuffer()
{
return m_readBodyBuf;
}
ByteBuffer getReadHeadBuffer()
{
return m_readHeadBuf;
}
void bodyFinished()
{
m_headFinished = false;
m_readHeadBuf.clear();
m_readBodyBuf = null;
m_conn.updateLastAccessed();
}
/**
* Status change for finishing reading the message header (data already in buffer)
*
* @return message size
*/
int headFinished()
{
m_headFinished = true;
m_readHeadBuf.flip();
int messageSize = m_readHeadBuf.getInt();
m_readBodyBuf = ByteBuffer.allocate(messageSize - Connection.HEADER_SIZE);
m_conn.updateLastAccessed();
return messageSize;
}
boolean isHeadFinished()
{
return m_headFinished;
}
}
class Connection
{
Socket sock = null; // socket to/from peer (result of srv_sock.accept() or new Socket())
String sock_addr = null; // used for Thread.getName()
DataOutputStream out = null; // for sending messages
DataInputStream in = null; // for receiving messages
Thread receiverThread = null; // thread for receiving messages
IpAddress peer_addr = null; // address of the 'other end' of the connection
final Object send_mutex = new Object(); // serialize sends
long last_access = System.currentTimeMillis(); // last time a message was sent or received
private SocketChannel sock_ch = null;
private WriteHandler m_writeHandler;
private SelectorWriteHandler m_selectorWriteHandler;
private final ConnectionReadState m_readState;
private static final int HEADER_SIZE = 12;
final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
Connection(SocketChannel s, IpAddress peer_addr)
{
sock_ch = s;
this.sock = s.socket();
this.peer_addr = peer_addr;
m_readState = new ConnectionReadState(this);
}
void updateLastAccessed()
{
last_access = System.currentTimeMillis();
}
private ConnectionReadState getReadState()
{
return m_readState;
}
private void setupWriteHandler(WriteHandler hdlr)
{
m_writeHandler = hdlr;
m_selectorWriteHandler = hdlr.add(sock_ch);
}
void destroy()
{
closeSocket();
}
void doSend(byte[] buffie, int offset, int length)
throws Exception
{
FutureResult result = new FutureResult();
m_writeHandler.write(sock_ch, ByteBuffer.wrap(buffie, offset, length),
result, m_selectorWriteHandler);
Exception ex = result.getException();
if (ex != null)
{
if (LOG.isErrorEnabled())
{
LOG.error("failed sending message", ex);
}
if (ex.getCause() instanceof IOException)
{
throw (IOException) ex.getCause();
}
throw ex;
}
result.get();
}
SocketChannel getSocketChannel()
{
return sock_ch;
}
void closeSocket()
{
if (sock_ch != null)
{
try
{
if (sock_ch.isConnected() && sock_ch.isOpen())
{
sock_ch.close();
}
}
catch (Exception e)
{
log.error("error closing socket connection", e);
}
sock_ch = null;
}
}
IpAddress getPeerAddress()
{
return peer_addr;
}
void closed()
{
IpAddress peerAddr = getPeerAddress();
synchronized (conns)
{
conns.remove(peerAddr);
}
// notifyConnectionClosed(peerAddr);
}
}
/**
* Handle writing to non-blocking NIO connection.
*/
private static class WriteHandler
implements Runnable
{
// Create a queue for write requests
private final LinkedQueue QUEUE = new LinkedQueue();
private final Selector SELECTOR = initSelector();
private int m_pendingChannels; // count of the number of channels that have pending writes
// allocate and reuse the header for all buffer write operations
private ByteBuffer m_headerBuffer = ByteBuffer.allocate(Connection.
HEADER_SIZE);
Selector initSelector()
{
try
{
return SelectorProvider.provider().openSelector();
}
catch (IOException e)
{
if (LOG.isErrorEnabled())
{
LOG.error(e);
}
throw new IllegalStateException(e.getMessage());
}
}
/**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -