📄 aprendpoint.java
字号:
timeout = soTimeout;
}
if (comet) {
// FIXME: Find an appropriate timeout value, for now, "longer than usual"
// semms appropriate
timeout = soTimeout * 50;
}
serverPollset = allocatePoller(size, pool, timeout);
if (serverPollset == 0 && size > 1024) {
size = 1024;
serverPollset = allocatePoller(size, pool, timeout);
}
if (serverPollset == 0) {
size = 62;
serverPollset = allocatePoller(size, pool, timeout);
}
desc = new long[size * 2];
keepAliveCount = 0;
addS = new long[size];
addCount = 0;
}
/**
* Destroy the poller.
*/
protected void destroy() {
// Wait for polltime before doing anything, so that the poller threads
// exit, otherwise parallel descturction of sockets which are still
// in the poller can cause problems
try {
synchronized (this) {
this.wait(pollTime / 1000);
}
} catch (InterruptedException e) {
// Ignore
}
// Close all sockets in the add queue
for (int i = 0; i < addCount; i++) {
if (comet) {
processSocket(addS[i], SocketStatus.STOP);
} else {
Socket.destroy(addS[i]);
}
}
// Close all sockets still in the poller
int rv = Poll.pollset(serverPollset, desc);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
if (comet) {
processSocket(desc[n*2+1], SocketStatus.STOP);
} else {
Socket.destroy(desc[n*2+1]);
}
}
}
Pool.destroy(pool);
keepAliveCount = 0;
addCount = 0;
}
/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
* of time equal to pollTime (in most cases, latency will be much lower,
* however).
*
* @param socket to add to the poller
*/
public void add(long socket) {
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
if (addCount >= addS.length) {
// Can't do anything: close the socket right away
if (comet) {
processSocket(socket, SocketStatus.ERROR);
} else {
Socket.destroy(socket);
}
return;
}
addS[addCount] = socket;
addCount++;
this.notify();
}
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
long maintainTime = 0;
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
while (keepAliveCount < 1 && addCount < 1) {
// Reset maintain time.
maintainTime = 0;
try {
synchronized (this) {
this.wait();
}
} catch (InterruptedException e) {
// Ignore
}
}
try {
// Add sockets which are waiting to the poller
if (addCount > 0) {
synchronized (this) {
for (int i = (addCount - 1); i >= 0; i--) {
int rv = Poll.add
(serverPollset, addS[i], Poll.APR_POLLIN);
if (rv == Status.APR_SUCCESS) {
keepAliveCount++;
} else {
// Can't do anything: close the socket right away
if (comet) {
processSocket(addS[i], SocketStatus.ERROR);
} else {
Socket.destroy(addS[i]);
}
}
}
addCount = 0;
}
}
maintainTime += pollTime;
// Pool for the specified interval
int rv = Poll.poll(serverPollset, pollTime, desc, true);
if (rv > 0) {
keepAliveCount -= rv;
for (int n = 0; n < rv; n++) {
// Check for failed sockets and hand this socket off to a worker
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
|| (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN)))
|| (!comet && (!processSocket(desc[n*2+1])))) {
// Close socket and clear pool
if (comet) {
processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
} else {
Socket.destroy(desc[n*2+1]);
}
continue;
}
}
} else if (rv < 0) {
int errn = -rv;
/* Any non timeup or interrupted error is critical */
if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
if (errn > Status.APR_OS_START_USERERR) {
errn -= Status.APR_OS_START_USERERR;
}
log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
// Handle poll critical failure
synchronized (this) {
destroy();
init();
}
continue;
}
}
if (soTimeout > 0 && maintainTime > 1000000L && running) {
rv = Poll.maintain(serverPollset, desc, true);
maintainTime = 0;
if (rv > 0) {
keepAliveCount -= rv;
for (int n = 0; n < rv; n++) {
// Close socket and clear pool
if (comet) {
processSocket(desc[n], SocketStatus.TIMEOUT);
} else {
Socket.destroy(desc[n]);
}
}
}
}
} catch (Throwable t) {
log.error(sm.getString("endpoint.poll.error"), t);
}
}
synchronized (this) {
this.notifyAll();
}
}
}
// ----------------------------------------------------- Worker Inner Class
/**
* Server processor class.
*/
protected class Worker implements Runnable {
protected Thread thread = null;
protected boolean available = false;
protected long socket = 0;
protected SocketStatus status = null;
protected boolean options = false;
/**
* Process an incoming TCP/IP connection on the specified socket. Any
* exception that occurs during processing must be logged and swallowed.
* <b>NOTE</b>: This method is called from our Connector's thread. We
* must assign it to our own thread so that multiple simultaneous
* requests can be handled.
*
* @param socket TCP socket to process
*/
protected synchronized void assignWithOptions(long socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
status = null;
options = true;
available = true;
notifyAll();
}
/**
* Process an incoming TCP/IP connection on the specified socket. Any
* exception that occurs during processing must be logged and swallowed.
* <b>NOTE</b>: This method is called from our Connector's thread. We
* must assign it to our own thread so that multiple simultaneous
* requests can be handled.
*
* @param socket TCP socket to process
*/
protected synchronized void assign(long socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
status = null;
options = false;
available = true;
notifyAll();
}
protected synchronized void assign(long socket, SocketStatus status) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
this.status = status;
options = false;
available = true;
notifyAll();
}
/**
* Await a newly assigned Socket from our Connector, or <code>null</code>
* if we are supposed to shut down.
*/
protected synchronized long await() {
// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Notify the Connector that we have received this Socket
long socket = this.socket;
available = false;
notifyAll();
return (socket);
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Process requests until we receive a shutdown signal
while (running) {
// Wait for the next socket to be assigned
long socket = await();
if (socket == 0)
continue;
// Process the request from this socket
if ((status != null) && (handler.event(socket, status) == Handler.SocketState.CLOSED)) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
} else if ((status == null) && ((options && !setSocketOptions(socket))
|| handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
}
// Finish up this request
recycleWorkerThread(this);
}
}
/**
* Start the background processing thread.
*/
public void start() {
thread = new Thread(this);
thread.setName(getName() + "-" + (++curThreads));
thread.setDaemon(true);
thread.start();
}
}
// ----------------------------------------------- SendfileData Inner Class
/**
* SendfileData class.
*/
public static class SendfileData {
// File
public String fileName;
public long fd;
public long fdpool;
// Range information
public long start;
public long end;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -