📄 aprendpoint.java
字号:
// Socket and socket pool
public long socket;
// Position
public long pos;
// KeepAlive flag
public boolean keepAlive;
}
// --------------------------------------------------- Sendfile Inner Class
/**
* Sendfile class.
*/
public class Sendfile implements Runnable {
protected long sendfilePollset = 0;
protected long pool = 0;
protected long[] desc;
protected HashMap<Long, SendfileData> sendfileData;
protected int sendfileCount;
public int getSendfileCount() { return sendfileCount; }
protected ArrayList<SendfileData> addS;
/**
* Create the sendfile poller. With some versions of APR, the maximum poller size will
* be 62 (reocmpiling APR is necessary to remove this limitation).
*/
protected void init() {
pool = Pool.create(serverSockPool);
int size = sendfileSize / sendfileThreadCount;
sendfilePollset = allocatePoller(size, pool, soTimeout);
if (sendfilePollset == 0 && size > 1024) {
size = 1024;
sendfilePollset = allocatePoller(size, pool, soTimeout);
}
if (sendfilePollset == 0) {
size = 62;
sendfilePollset = allocatePoller(size, pool, soTimeout);
}
desc = new long[size * 2];
sendfileData = new HashMap<Long, SendfileData>(size);
addS = new ArrayList<SendfileData>();
}
/**
* Destroy the poller.
*/
protected void destroy() {
// Wait for polltime before doing anything, so that the poller threads
// exit, otherwise parallel descturction of sockets which are still
// in the poller can cause problems
try {
synchronized (this) {
this.wait(pollTime / 1000);
}
} catch (InterruptedException e) {
// Ignore
}
// Close any socket remaining in the add queue
for (int i = (addS.size() - 1); i >= 0; i--) {
SendfileData data = addS.get(i);
Socket.destroy(data.socket);
}
// Close all sockets still in the poller
int rv = Poll.pollset(sendfilePollset, desc);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
Socket.destroy(desc[n*2+1]);
}
}
Pool.destroy(pool);
sendfileData.clear();
}
/**
* Add the sendfile data to the sendfile poller. Note that in most cases,
* the initial non blocking calls to sendfile will return right away, and
* will be handled asynchronously inside the kernel. As a result,
* the poller will never be used.
*
* @param data containing the reference to the data which should be snet
* @return true if all the data has been sent right away, and false
* otherwise
*/
public boolean add(SendfileData data) {
// Initialize fd from data given
try {
data.fdpool = Socket.pool(data.socket);
data.fd = File.open
(data.fileName, File.APR_FOPEN_READ
| File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
0, data.fdpool);
data.pos = data.start;
// Set the socket to nonblocking mode
Socket.timeoutSet(data.socket, 0);
while (true) {
long nw = Socket.sendfilen(data.socket, data.fd,
data.pos, data.end - data.pos, 0);
if (nw < 0) {
if (!(-nw == Status.EAGAIN)) {
Socket.destroy(data.socket);
data.socket = 0;
return false;
} else {
// Break the loop and add the socket to poller.
break;
}
} else {
data.pos = data.pos + nw;
if (data.pos >= data.end) {
// Entire file has been sent
Pool.destroy(data.fdpool);
// Set back socket to blocking mode
Socket.timeoutSet(data.socket, soTimeout * 1000);
return true;
}
}
}
} catch (Exception e) {
log.error(sm.getString("endpoint.sendfile.error"), e);
return false;
}
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
synchronized (this) {
addS.add(data);
this.notify();
}
return false;
}
/**
* Remove socket from the poller.
*
* @param data the sendfile data which should be removed
*/
protected void remove(SendfileData data) {
int rv = Poll.remove(sendfilePollset, data.socket);
if (rv == Status.APR_SUCCESS) {
sendfileCount--;
}
sendfileData.remove(data);
}
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
public void run() {
// Loop until we receive a shutdown command
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
while (sendfileCount < 1 && addS.size() < 1) {
try {
synchronized (this) {
this.wait();
}
} catch (InterruptedException e) {
// Ignore
}
}
try {
// Add socket to the poller
if (addS.size() > 0) {
synchronized (this) {
for (int i = (addS.size() - 1); i >= 0; i--) {
SendfileData data = addS.get(i);
int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
if (rv == Status.APR_SUCCESS) {
sendfileData.put(new Long(data.socket), data);
sendfileCount++;
} else {
log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv, Error.strerror(rv)));
// Can't do anything: close the socket right away
Socket.destroy(data.socket);
}
}
addS.clear();
}
}
// Pool for the specified interval
int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
// Get the sendfile state
SendfileData state =
sendfileData.get(new Long(desc[n*2+1]));
// Problem events
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
// Close socket and clear pool
remove(state);
// Destroy file descriptor pool, which should close the file
// Close the socket, as the reponse would be incomplete
Socket.destroy(state.socket);
continue;
}
// Write some data using sendfile
long nw = Socket.sendfilen(state.socket, state.fd,
state.pos,
state.end - state.pos, 0);
if (nw < 0) {
// Close socket and clear pool
remove(state);
// Close the socket, as the reponse would be incomplete
// This will close the file too.
Socket.destroy(state.socket);
continue;
}
state.pos = state.pos + nw;
if (state.pos >= state.end) {
remove(state);
if (state.keepAlive) {
// Destroy file descriptor pool, which should close the file
Pool.destroy(state.fdpool);
Socket.timeoutSet(state.socket, soTimeout * 1000);
// If all done hand this socket off to a worker for
// processing of further requests
if (!processSocket(state.socket)) {
Socket.destroy(state.socket);
}
} else {
// Close the socket since this is
// the end of not keep-alive request.
Socket.destroy(state.socket);
}
}
}
} else if (rv < 0) {
int errn = -rv;
/* Any non timeup or interrupted error is critical */
if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
if (errn > Status.APR_OS_START_USERERR) {
errn -= Status.APR_OS_START_USERERR;
}
log.error(sm.getString("endpoint.poll.fail", "" + errn, Error.strerror(errn)));
// Handle poll critical failure
synchronized (this) {
destroy();
init();
}
continue;
}
}
/* TODO: See if we need to call the maintain for sendfile poller */
} catch (Throwable t) {
log.error(sm.getString("endpoint.poll.error"), t);
}
}
synchronized (this) {
this.notifyAll();
}
}
}
// ------------------------------------------------ Handler Inner Interface
/**
* Bare bones interface used for socket processing. Per thread data is to be
* stored in the ThreadWithAttributes extra folders, or alternately in
* thread local fields.
*/
public interface Handler {
public enum SocketState {
OPEN, CLOSED, LONG
}
public SocketState process(long socket);
public SocketState event(long socket, SocketStatus status);
}
// ------------------------------------------------- WorkerStack Inner Class
public class WorkerStack {
protected Worker[] workers = null;
protected int end = 0;
public WorkerStack(int size) {
workers = new Worker[size];
}
/**
* Put the object into the queue.
*
* @param object the object to be appended to the queue (first element).
*/
public void push(Worker worker) {
workers[end++] = worker;
}
/**
* Get the first object out of the queue. Return null if the queue
* is empty.
*/
public Worker pop() {
if (end > 0) {
return workers[--end];
}
return null;
}
/**
* Get the first object out of the queue, Return null if the queue
* is empty.
*/
public Worker peek() {
return workers[end];
}
/**
* Is the queue empty?
*/
public boolean isEmpty() {
return (end == 0);
}
/**
* How many elements are there in this queue?
*/
public int size() {
return (end);
}
}
// ---------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool. This will also set the socket options
* and do the handshake.
*/
protected class SocketWithOptionsProcessor implements Runnable {
protected long socket = 0;
public SocketWithOptionsProcessor(long socket) {
this.socket = socket;
}
public void run() {
// Process the request from this socket
if (!setSocketOptions(socket)
|| handler.process(socket) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
socket = 0;
}
}
}
// ---------------------------------------------- SocketProcessor Inner Class
/**
* This class is the equivalent of the Wor
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -