📄 nioendpoint.java
字号:
protected String[] ciphersarr = new String[0];
public String getCiphers() { return ciphers;}
public void setCiphers(String s) {
ciphers = s;
if ( s == null ) ciphersarr = new String[0];
else {
StringTokenizer t = new StringTokenizer(s,",");
ciphersarr = new String[t.countTokens()];
for (int i=0; i<ciphersarr.length; i++ ) ciphersarr[i] = t.nextToken();
}
}
/**
* SSL engine.
*/
protected boolean SSLEnabled = false;
public boolean isSSLEnabled() { return SSLEnabled;}
public void setSSLEnabled(boolean SSLEnabled) {this.SSLEnabled = SSLEnabled;}
protected boolean secure = false;
public boolean getSecure() { return secure;}
public void setSecure(boolean b) { secure = b;}
public void setSelectorPool(NioSelectorPool selectorPool) {
this.selectorPool = selectorPool;
}
public void setSocketProperties(SocketProperties socketProperties) {
this.socketProperties = socketProperties;
}
protected SSLContext sslContext = null;
public SSLContext getSSLContext() { return sslContext;}
public void setSSLContext(SSLContext c) { sslContext = c;}
// --------------------------------------------------------- Public Methods
/**
* Number of keepalive sockets.
*/
public int getKeepAliveCount() {
if (pollers == null) {
return 0;
} else {
int keepAliveCount = 0;
for (int i = 0; i < pollers.length; i++) {
keepAliveCount += pollers[i].getKeepAliveCount();
}
return keepAliveCount;
}
}
/**
* Return the amount of threads that are managed by the pool.
*
* @return the amount of threads that are managed by the pool
*/
public int getCurrentThreadCount() {
return curThreads;
}
/**
* Return the amount of threads currently busy.
*
* @return the amount of threads currently busy
*/
public int getCurrentThreadsBusy() {
return curThreadsBusy;
}
/**
* Return the state of the endpoint.
*
* @return true if the endpoint is running, false otherwise
*/
public boolean isRunning() {
return running;
}
/**
* Return the state of the endpoint.
*
* @return true if the endpoint is paused, false otherwise
*/
public boolean isPaused() {
return paused;
}
// ----------------------------------------------- Public Lifecycle Methods
/**
* Initialize the endpoint.
*/
public void init()
throws Exception {
if (initialized)
return;
serverSock = ServerSocketChannel.open();
InetSocketAddress addr = (address!=null?new InetSocketAddress(address,port):new InetSocketAddress(port));
serverSock.socket().bind(addr,100); //todo, set backlog value
serverSock.configureBlocking(true); //mimic APR behavior
// Initialize thread count defaults for acceptor, poller and sendfile
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
// Initialize SSL if needed
if (isSSLEnabled()) {
// Initialize SSL
char[] passphrase = getKeystorePass().toCharArray();
KeyStore ks = KeyStore.getInstance(getKeystoreType());
ks.load(new FileInputStream(getKeystoreFile()), passphrase);
KeyStore ts = KeyStore.getInstance(getKeystoreType());
ts.load(new FileInputStream(getKeystoreFile()), passphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
kmf.init(ks, passphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance(getAlgorithm());
tmf.init(ts);
sslContext = SSLContext.getInstance(getSslProtocol());
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
}
initialized = true;
}
/**
* Start the APR endpoint, creating acceptor, poller threads.
*/
public void start()
throws Exception {
// Initialize socket if not done before
if (!initialized) {
init();
}
if (!running) {
running = true;
paused = false;
// Create worker collection
if (executor == null) {
workers = new WorkerStack(maxThreads);
//executor = new ThreadPoolExecutor(getMinSpareThreads(),getMaxThreads(),5000,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
acceptorThread.setPriority(threadPriority);
acceptorThread.setDaemon(daemon);
acceptorThread.start();
}
// Start poller threads
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
pollers[i] = new Poller();
pollers[i].init();
Thread pollerThread = new Thread(pollers[i], getName() + "-Poller-" + i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
}
}
/**
* Pause the endpoint, which will make it stop accepting new sockets.
*/
public void pause() {
if (running && !paused) {
paused = true;
unlockAccept();
}
}
/**
* Resume the endpoint, which will make it start accepting new sockets
* again.
*/
public void resume() {
if (running) {
paused = false;
}
}
/**
* Stop the endpoint. This will cause all processing threads to stop.
*/
public void stop() {
if (running) {
running = false;
unlockAccept();
for (int i = 0; i < pollers.length; i++) {
pollers[i].destroy();
}
pollers = null;
}
eventCache.clear();
keyCache.clear();
nioChannels.clear();
}
/**
* Deallocate APR memory pools, and close server socket.
*/
public void destroy() throws Exception {
if (running) {
stop();
}
// Close server socket
serverSock.socket().close();
serverSock.close();
serverSock = null;
sslContext = null;
initialized = false;
nioChannels.clear();
}
// ------------------------------------------------------ Protected Methods
/**
* Get a sequence number used for thread naming.
*/
protected int getSequence() {
return sequence++;
}
public int getWriteBufSize() {
return socketProperties.getTxBufSize();
}
public int getReadBufSize() {
return socketProperties.getRxBufSize();
}
public NioSelectorPool getSelectorPool() {
return selectorPool;
}
public SocketProperties getSocketProperties() {
return socketProperties;
}
/**
* Unlock the server socket accept using a bogus connection.
*/
protected void unlockAccept() {
java.net.Socket s = null;
try {
// Need to create a connection to unlock the accept();
if (address == null) {
s = new java.net.Socket("127.0.0.1", port);
} else {
s = new java.net.Socket(address, port);
// setting soLinger to a small value will help shutdown the
// connection quicker
s.setSoLinger(true, 0);
}
} catch(Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
}
} finally {
if (s != null) {
try {
s.close();
} catch (Exception e) {
// Ignore
}
}
}
}
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
int step = 1;
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.poll();
if ( channel == null ) {
// 2: SSL setup
step = 2;
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,getReadBufSize()),
Math.max(appbufsize,getWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
NioBufferHandler bufhandler = new NioBufferHandler(getReadBufSize(),
getWriteBufSize(),
socketProperties.getDirectBuffer());
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();
}
}
getPoller0().register(channel);
} catch (Throwable t) {
try {
log.error("",t);
}catch ( Throwable tt){}
// Tell to close the socket
return false;
}
return true;
}
protected SSLEngine createSSLEngine() {
SSLEngine engine = sslContext.createSSLEngine();
engine.setNeedClientAuth(getClientAuth());
engine.setUseClientMode(false);
if ( ciphersarr.length > 0 ) engine.setEnabledCipherSuites(ciphersarr);
if ( sslEnabledProtocolsarr.length > 0 ) engine.setEnabledProtocols(sslEnabledProtocolsarr);
return engine;
}
/**
* Create (or allocate) and return an available processor for use in
* processing a specific HTTP request, if possible. If the maximum
* allowed processors have already been created and are in use, return
* <code>null</code> instead.
*/
protected Worker createWorkerThread() {
synchronized (workers) {
if (workers.size() > 0) {
curThreadsBusy++;
return (workers.pop());
}
if ((maxThreads > 0) && (curThreads < maxThreads)) {
curThreadsBusy++;
return (newWorkerThread());
} else {
if (maxThreads < 0) {
curThreadsBusy++;
return (newWorkerThread());
} else {
return (null);
}
}
}
}
/**
* Create and return a new processor suitable for processing HTTP
* requests and returning the corresponding responses.
*/
protected Worker newWorkerThread() {
Worker workerThread = new Worker();
workerThread.start();
return (workerThread);
}
/**
* Return a new worker thread, and block while to worker is available.
*/
protected Worker getWorkerThread() {
// Allocate a new worker thread
Worker workerThread = createWorkerThread();
while (workerThread == null) {
try {
synchronized (workers) {
workerThread = createWorkerThread();
if ( workerThread == null ) workers.wait();
}
} catch (InterruptedException e) {
// Ignore
}
if ( workerThread == null ) workerThread = createWorkerThread();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -