📄 aprendpoint.java
字号:
*/
public void pause() {
if (running && !paused) {
paused = true;
unlockAccept();
}
}
/**
* Resume the endpoint, which will make it start accepting new sockets
* again.
*/
public void resume() {
if (running) {
paused = false;
}
}
/**
* Stop the endpoint. This will cause all processing threads to stop.
*/
public void stop() {
if (running) {
running = false;
unlockAccept();
for (int i = 0; i < pollers.length; i++) {
pollers[i].destroy();
}
pollers = null;
for (int i = 0; i < cometPollers.length; i++) {
cometPollers[i].destroy();
}
cometPollers = null;
if (useSendfile) {
for (int i = 0; i < sendfiles.length; i++) {
sendfiles[i].destroy();
}
sendfiles = null;
}
}
}
/**
* Deallocate APR memory pools, and close server socket.
*/
public void destroy() throws Exception {
if (running) {
stop();
}
Pool.destroy(serverSockPool);
serverSockPool = 0;
// Close server socket
Socket.close(serverSock);
serverSock = 0;
sslContext = 0;
// Close all APR memory pools and resources
Pool.destroy(rootPool);
rootPool = 0;
initialized = false;
}
// ------------------------------------------------------ Protected Methods
/**
* Get a sequence number used for thread naming.
*/
protected int getSequence() {
return sequence++;
}
/**
* Unlock the server socket accept using a bugus connection.
*/
protected void unlockAccept() {
java.net.Socket s = null;
try {
// Need to create a connection to unlock the accept();
if (address == null) {
s = new java.net.Socket("127.0.0.1", port);
} else {
s = new java.net.Socket(address, port);
// setting soLinger to a small value will help shutdown the
// connection quicker
s.setSoLinger(true, 0);
}
} catch(Exception e) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
}
} finally {
if (s != null) {
try {
s.close();
} catch (Exception e) {
// Ignore
}
}
}
}
/**
* Process the specified connection.
*/
protected boolean setSocketOptions(long socket) {
// Process the connection
int step = 1;
try {
// 1: Set socket options: timeout, linger, etc
if (soLinger >= 0)
Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
if (tcpNoDelay)
Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0));
if (soTimeout > 0)
Socket.timeoutSet(socket, soTimeout * 1000);
// 2: SSL handshake
step = 2;
if (sslContext != 0) {
SSLSocket.attach(sslContext, socket);
if (SSLSocket.handshake(socket) != 0) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
}
return false;
}
}
} catch (Throwable t) {
if (log.isDebugEnabled()) {
if (step == 2) {
log.debug(sm.getString("endpoint.err.handshake"), t);
} else {
log.debug(sm.getString("endpoint.err.unexpected"), t);
}
}
// Tell to close the socket
return false;
}
return true;
}
/**
* Create (or allocate) and return an available processor for use in
* processing a specific HTTP request, if possible. If the maximum
* allowed processors have already been created and are in use, return
* <code>null</code> instead.
*/
protected Worker createWorkerThread() {
synchronized (workers) {
if (workers.size() > 0) {
curThreadsBusy++;
return (workers.pop());
}
if ((maxThreads > 0) && (curThreads < maxThreads)) {
curThreadsBusy++;
return (newWorkerThread());
} else {
if (maxThreads < 0) {
curThreadsBusy++;
return (newWorkerThread());
} else {
return (null);
}
}
}
}
/**
* Create and return a new processor suitable for processing HTTP
* requests and returning the corresponding responses.
*/
protected Worker newWorkerThread() {
Worker workerThread = new Worker();
workerThread.start();
return (workerThread);
}
/**
* Return a new worker thread, and block while to worker is available.
*/
protected Worker getWorkerThread() {
// Allocate a new worker thread
Worker workerThread = createWorkerThread();
while (workerThread == null) {
try {
synchronized (workers) {
workers.wait();
}
} catch (InterruptedException e) {
// Ignore
}
workerThread = createWorkerThread();
}
return workerThread;
}
/**
* Recycle the specified Processor so that it can be used again.
*
* @param workerThread The processor to be recycled
*/
protected void recycleWorkerThread(Worker workerThread) {
synchronized (workers) {
workers.push(workerThread);
curThreadsBusy--;
workers.notify();
}
}
/**
* Allocate a new poller of the specified size.
*/
protected long allocatePoller(int size, long pool, int timeout) {
try {
return Poll.create(size, pool, 0, timeout * 1000);
} catch (Error e) {
if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
log.info(sm.getString("endpoint.poll.limitedpollsize", "" + size));
return 0;
} else {
log.error(sm.getString("endpoint.poll.initfail"), e);
return -1;
}
}
}
/**
* Process given socket.
*/
protected boolean processSocketWithOptions(long socket) {
try {
if (executor == null) {
getWorkerThread().assignWithOptions(socket);
} else {
executor.execute(new SocketWithOptionsProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
/**
* Process given socket.
*/
protected boolean processSocket(long socket) {
try {
if (executor == null) {
getWorkerThread().assign(socket);
} else {
executor.execute(new SocketProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
/**
* Process given socket for an event.
*/
protected boolean processSocket(long socket, SocketStatus status) {
try {
if (executor == null) {
getWorkerThread().assign(socket, status);
} else {
executor.execute(new SocketEventProcessor(socket, status));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
// --------------------------------------------------- Acceptor Inner Class
/**
* Server socket acceptor thread.
*/
protected class Acceptor implements Runnable {
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
try {
// Accept the next incoming connection from the server socket
long socket = Socket.accept(serverSock);
// Hand this socket off to an appropriate processor
if (!processSocketWithOptions(socket)) {
// Close socket and pool right away
Socket.destroy(socket);
}
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}
// ----------------------------------------------------- Poller Inner Class
/**
* Poller class.
*/
public class Poller implements Runnable {
protected long serverPollset = 0;
protected long pool = 0;
protected long[] desc;
protected long[] addS;
protected int addCount = 0;
protected boolean comet = true;
protected int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
public Poller(boolean comet) {
this.comet = comet;
}
/**
* Create the poller. With some versions of APR, the maximum poller size will
* be 62 (recompiling APR is necessary to remove this limitation).
*/
protected void init() {
pool = Pool.create(serverSockPool);
int size = pollerSize / pollerThreadCount;
int timeout = keepAliveTimeout;
if (timeout < 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -