📄 nioendpoint.java
字号:
public void reset(Poller poller, NioChannel channel) {
this.channel = channel;
this.poller = poller;
lastAccess = System.currentTimeMillis();
currentAccess = false;
comet = false;
timeout = -1;
error = false;
}
public void reset() {
reset(null,null);
}
public Poller getPoller() { return poller;}
public void setPoller(Poller poller){this.poller = poller;}
public long getLastAccess() { return lastAccess; }
public void access() { access(System.currentTimeMillis()); }
public void access(long access) { lastAccess = access; }
public void setComet(boolean comet) { this.comet = comet; }
public boolean getComet() { return comet; }
public boolean getCurrentAccess() { return currentAccess; }
public void setCurrentAccess(boolean access) { currentAccess = access; }
public Object getMutex() {return mutex;}
public void setTimeout(long timeout) {this.timeout = timeout;}
public long getTimeout() {return this.timeout;}
public boolean getError() { return error; }
public void setError(boolean error) { this.error = error; }
public NioChannel getChannel() { return channel;}
public void setChannel(NioChannel channel) { this.channel = channel;}
protected Poller poller = null;
protected int interestOps = 0;
public int interestOps() { return interestOps;}
public int interestOps(int ops) { this.interestOps = ops; return ops; }
protected Object mutex = new Object();
protected long lastAccess = -1;
protected boolean currentAccess = false;
protected boolean comet = false;
protected long timeout = -1;
protected boolean error = false;
protected NioChannel channel = null;
}
// ----------------------------------------------------- Worker Inner Class
/**
* Server processor class.
*/
protected class Worker implements Runnable {
protected Thread thread = null;
protected boolean available = false;
protected Object socket = null;
protected SocketStatus status = null;
/**
* Process an incoming TCP/IP connection on the specified socket. Any
* exception that occurs during processing must be logged and swallowed.
* <b>NOTE</b>: This method is called from our Connector's thread. We
* must assign it to our own thread so that multiple simultaneous
* requests can be handled.
*
* @param socket TCP socket to process
*/
protected synchronized void assign(Object socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
status = null;
available = true;
notifyAll();
}
protected synchronized void assign(Object socket, SocketStatus status) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
this.status = status;
available = true;
notifyAll();
}
/**
* Await a newly assigned Socket from our Connector, or <code>null</code>
* if we are supposed to shut down.
*/
protected synchronized Object await() {
// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Notify the Connector that we have received this Socket
Object socket = this.socket;
available = false;
notifyAll();
return (socket);
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Process requests until we receive a shutdown signal
while (running) {
try {
// Wait for the next socket to be assigned
Object channel = await();
if (channel == null)
continue;
if ( channel instanceof SocketChannel) {
SocketChannel sc = (SocketChannel)channel;
if ( !setSocketOptions(sc) ) {
try {
sc.socket().close();
sc.close();
}catch ( IOException ix ) {
if ( log.isDebugEnabled() ) log.debug("",ix);
}
} else {
//now we have it registered, remove it from the cache
}
} else {
NioChannel socket = (NioChannel)channel;
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
int handshake = -1;
try {
handshake = socket.handshake(key.isReadable(), key.isWritable());
}catch ( IOException x ) {
handshake = -1;
if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
}catch ( CancelledKeyException ckx ) {
handshake = -1;
}
if ( handshake == 0 ) {
// Process the request from this socket
if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
// Close socket and pool
try {
KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
try {socket.close();}catch (Exception ignore){}
if ( socket.isOpen() ) socket.close(true);
key.cancel();
key.attach(null);
nioChannels.offer(socket);
if ( att!=null ) keyCache.offer(att);
}catch ( Exception x ) {
log.error("",x);
}
} else if ((status == null) && (handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool
try {
KeyAttachment att = (KeyAttachment)socket.getAttachment(true);
try {socket.close();}catch (Exception ignore){}
if ( socket.isOpen() ) socket.close(true);
key.cancel();
key.attach(null);
nioChannels.offer(socket);
if ( att!=null ) keyCache.offer(att);
}catch ( Exception x ) {
log.error("",x);
}
}
} else if (handshake == -1 ) {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
try {socket.close(true);}catch (IOException ignore){}
nioChannels.offer(socket);
} else {
final SelectionKey fk = key;
final int intops = handshake;
final KeyAttachment ka = (KeyAttachment)fk.attachment();
ka.getPoller().add(socket,intops);
}
}
} finally {
//dereference socket to let GC do its job
socket = null;
// Finish up this request
recycleWorkerThread(this);
}
}
}
/**
* Start the background processing thread.
*/
public void start() {
thread = new Thread(this);
thread.setName(getName() + "-" + (++curThreads));
thread.setDaemon(true);
thread.start();
}
}
// ------------------------------------------------ Application Buffer Handler
public class NioBufferHandler implements ApplicationBufferHandler {
protected ByteBuffer readbuf = null;
protected ByteBuffer writebuf = null;
public NioBufferHandler(int readsize, int writesize, boolean direct) {
if ( direct ) {
readbuf = ByteBuffer.allocateDirect(readsize);
writebuf = ByteBuffer.allocateDirect(writesize);
}else {
readbuf = ByteBuffer.allocate(readsize);
writebuf = ByteBuffer.allocate(writesize);
}
}
public ByteBuffer expand(ByteBuffer buffer, int remaining) {return buffer;}
public ByteBuffer getReadBuffer() {return readbuf;}
public ByteBuffer getWriteBuffer() {return writebuf;}
}
// ------------------------------------------------ Handler Inner Interface
/**
* Bare bones interface used for socket processing. Per thread data is to be
* stored in the ThreadWithAttributes extra folders, or alternately in
* thread local fields.
*/
public interface Handler {
public enum SocketState {
OPEN, CLOSED, LONG
}
public SocketState process(NioChannel socket);
public SocketState event(NioChannel socket, SocketStatus status);
}
// ------------------------------------------------- WorkerStack Inner Class
public class WorkerStack {
protected Worker[] workers = null;
protected int end = 0;
public WorkerStack(int size) {
workers = new Worker[size];
}
/**
* Put the object into the queue.
*
* @param object the object to be appended to the queue (first element).
*/
public void push(Worker worker) {
workers[end++] = worker;
}
/**
* Get the first object out of the queue. Return null if the queue
* is empty.
*/
public Worker pop() {
if (end > 0) {
return workers[--end];
}
return null;
}
/**
* Get the first object out of the queue, Return null if the queue
* is empty.
*/
public Worker peek() {
return workers[end];
}
/**
* Is the queue empty?
*/
public boolean isEmpty() {
return (end == 0);
}
/**
* How many elements are there in this queue?
*/
public int size() {
return (end);
}
}
// ---------------------------------------------- SocketOptionsProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketOptionsProcessor implements Runnable {
protected SocketChannel sc = null;
public SocketOptionsProcessor(SocketChannel socket) {
this.sc = socket;
}
public void run() {
if ( !setSocketOptions(sc) ) {
try {
sc.socket().close();
sc.close();
}catch ( IOException ix ) {
if ( log.isDebugEnabled() ) log.debug("",ix);
}
}
}
}
// ---------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {
protected NioChannel socket = null;
public SocketProcessor(NioChannel socket) {
this.socket = socket;
}
public void run() {
// Process the request from this socket
if (handler.process(socket) == Handler.SocketState.CLOSED) {
// Close socket and pool
try {
try {socket.close();}catch (Exception ignore){}
if ( socket.isOpen() ) socket.close(true);
} catch ( Exception x ) {
log.error("",x);
}
socket = null;
}
}
}
// --------------------------------------- SocketEventProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketEventProcessor implements Runnable {
protected NioChannel socket = null;
protected SocketStatus status = null;
public SocketEventProcessor(NioChannel socket, SocketStatus status) {
this.socket = socket;
this.status = status;
}
public void run() {
// Process the request from this socket
if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
// Close socket and pool
try {
try {socket.close();}catch (Exception ignore){}
if ( socket.isOpen() ) socket.close(true);
} catch ( Exception x ) {
log.error("",x);
}
socket = null;
}
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -