📄 threadpool.c
字号:
}
static void *listener_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
apr_pool_t *tpool = apr_thread_pool_get(thd);
void *csd = NULL;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
int n;
apr_pollfd_t *pollset;
apr_status_t rv;
ap_listen_rec *lr, *last_lr = ap_listeners;
worker_wakeup_info *worker = NULL;
free(ti);
apr_poll_setup(&pollset, num_listensocks, tpool);
for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
/* Unblock the signal used to wake this thread up, and set a handler for
* it.
*/
unblock_signal(LISTENER_SIGNAL);
apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
/* TODO: Switch to a system where threads reuse the results from earlier
poll calls - manoj */
while (1) {
/* TODO: requests_this_child should be synchronized - aaron */
if (requests_this_child <= 0) {
check_infinite_requests();
}
if (listener_may_exit) break;
if (worker == NULL) {
rv = worker_stack_pop(idle_worker_stack, &worker);
if (APR_STATUS_IS_EOF(rv)) {
break;
}
else if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
"worker_stack_pop failed");
break;
}
ptrans = worker->pool;
}
AP_DEBUG_ASSERT(worker->state == WORKER_IDLE);
if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
!= APR_SUCCESS) {
int level = APLOG_EMERG;
if (listener_may_exit) {
break;
}
if (ap_scoreboard_image->parent[process_slot].generation !=
ap_scoreboard_image->global->running_generation) {
level = APLOG_DEBUG; /* common to get these at restart time */
}
ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
"apr_proc_mutex_lock failed. Attempting to shutdown "
"process gracefully.");
signal_threads(ST_GRACEFUL);
break; /* skip the lock release */
}
if (!APR_O_NONBLOCK_INHERITED && !ap_listeners->next) {
/* Only one listener, so skip the poll */
lr = ap_listeners;
}
else {
while (!listener_may_exit) {
apr_status_t ret;
apr_int16_t event;
ret = apr_poll(pollset, num_listensocks, &n, -1);
if (ret != APR_SUCCESS) {
if (APR_STATUS_IS_EINTR(ret)) {
continue;
}
/* apr_poll() will only return errors in catastrophic
* circumstances. Let's try exiting gracefully, for now. */
ap_log_error(APLOG_MARK, APLOG_ERR, ret, (const server_rec *)
ap_server_conf, "apr_poll: (listen)");
signal_threads(ST_GRACEFUL);
}
if (listener_may_exit) break;
/* find a listener */
lr = last_lr;
do {
lr = lr->next;
if (lr == NULL) {
lr = ap_listeners;
}
/* XXX: Should we check for POLLERR? */
apr_poll_revents_get(&event, lr->sd, pollset);
if (event & APR_POLLIN) {
last_lr = lr;
goto got_fd;
}
} while (lr != last_lr);
}
}
got_fd:
if (!listener_may_exit) {
rv = lr->accept_func(&csd, lr, ptrans);
/* later we trash rv and rely on csd to indicate success/failure */
AP_DEBUG_ASSERT(rv == APR_SUCCESS || !csd);
if (rv == APR_EGENERAL) {
/* E[NM]FILE, ENOMEM, etc */
resource_shortage = 1;
signal_threads(ST_GRACEFUL);
}
if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
!= APR_SUCCESS) {
int level = APLOG_EMERG;
if (listener_may_exit) {
break;
}
if (ap_scoreboard_image->parent[process_slot].generation !=
ap_scoreboard_image->global->running_generation) {
level = APLOG_DEBUG; /* common to get these at restart time */
}
ap_log_error(APLOG_MARK, level, rv, ap_server_conf,
"apr_proc_mutex_unlock failed. Attempting to "
"shutdown process gracefully.");
signal_threads(ST_GRACEFUL);
}
if (csd != NULL) {
/* Wake up the sleeping worker. */
apr_thread_mutex_lock(worker->mutex);
worker->csd = (apr_socket_t *)csd;
worker->state = WORKER_BUSY;
/* Posix allows us to signal this condition without
* owning the associated mutex, but in that case it can
* not guarantee predictable scheduling. See
* _UNIX Network Programming: Interprocess Communication_
* by W. Richard Stevens, Vol 2, 2nd Ed, pp. 170-171. */
apr_thread_cond_signal(worker->cond);
apr_thread_mutex_unlock(worker->mutex);
worker = NULL;
}
}
else {
if ((rv = SAFE_ACCEPT(apr_proc_mutex_unlock(accept_mutex)))
!= APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"apr_proc_mutex_unlock failed. Attempting to "
"shutdown process gracefully.");
signal_threads(ST_GRACEFUL);
}
break;
}
}
workers_may_exit = 1;
if (worker) {
apr_thread_mutex_lock(worker->mutex);
worker->state = WORKER_TERMINATED;
/* Posix allows us to signal this condition without
* owning the associated mutex, but in that case it can
* not guarantee predictable scheduling. See
* _UNIX Network Programming: Interprocess Communication_
* by W. Richard Stevens, Vol 2, 2nd Ed, pp. 170-171. */
apr_thread_cond_signal(worker->cond);
apr_thread_mutex_unlock(worker->mutex);
}
worker_stack_terminate(idle_worker_stack);
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
/* wake up the main thread */
kill(ap_my_pid, SIGTERM);
apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
/* XXX For ungraceful termination/restart, we definitely don't want to
* wait for active connections to finish but we may want to wait
* for idle workers to get out of the queue code and release mutexes,
* since those mutexes are cleaned up pretty soon and some systems
* may not react favorably (i.e., segfault) if operations are attempted
* on cleaned-up mutexes.
*/
static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
int thread_slot = ti->tid;
apr_bucket_alloc_t *bucket_alloc;
apr_pool_t *tpool = apr_thread_pool_get(thd);
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
apr_allocator_t *allocator;
apr_status_t rv;
worker_wakeup_info *wakeup;
free(ti);
ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL);
apr_allocator_create(&allocator);
apr_allocator_max_free_set(allocator, ap_max_mem_free);
/* XXX: why is ptrans's parent not tpool? --jcw 08/2003 */
apr_pool_create_ex(&ptrans, NULL, NULL, allocator);
apr_allocator_owner_set(allocator, ptrans);
bucket_alloc = apr_bucket_alloc_create_ex(allocator);
wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
wakeup->pool = ptrans;
if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"apr_thread_cond_create failed. Attempting to shutdown "
"process gracefully.");
signal_threads(ST_GRACEFUL);
apr_thread_exit(thd, rv);
}
if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
tpool)) != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
"apr_thread_mutex_create failed. Attempting to shutdown "
"process gracefully.");
signal_threads(ST_GRACEFUL);
apr_thread_exit(thd, rv);
}
while (!workers_may_exit) {
ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL);
rv = worker_stack_wait(idle_worker_stack, wakeup);
if (APR_STATUS_IS_EOF(rv)) {
break; /* The queue has been terminated. */
}
else if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
"worker_stack_wait failed");
break; /* Treat all other errors as fatal. */
}
else if (wakeup->state == WORKER_TERMINATED) {
break; /* They told us to quit. */
}
AP_DEBUG_ASSERT(wakeup->state != WORKER_IDLE);
process_socket(ptrans, wakeup->csd,
process_slot, thread_slot, bucket_alloc);
requests_this_child--; /* FIXME: should be synchronized - aaron */
apr_pool_clear(ptrans);
}
ap_update_child_status_from_indexes(process_slot, thread_slot,
(dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
apr_bucket_alloc_destroy(bucket_alloc);
apr_thread_exit(thd, APR_SUCCESS);
return NULL;
}
static int check_signal(int signum)
{
switch (signum) {
case SIGTERM:
case SIGINT:
return 1;
}
return 0;
}
static void create_listener_thread(thread_starter *ts)
{
int my_child_num = ts->child_num_arg;
apr_threadattr_t *thread_attr = ts->threadattr;
proc_info *my_info;
apr_status_t rv;
my_info = (proc_info *)malloc(sizeof(proc_info));
my_info->pid = my_child_num;
my_info->tid = -1; /* listener thread doesn't have a thread slot */
my_info->sd = 0;
rv = apr_thread_create(&ts->listener, thread_attr, listener_thread,
my_info, pchild);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
"apr_thread_create: unable to create listener thread");
/* In case system resources are maxxed out, we don't want
* Apache running away with the CPU trying to fork over and
* over and over again if we exit.
* XXX Jeff doesn't see how Apache is going to try to fork again since
* the exit code is APEXIT_CHILDFATAL
*/
apr_sleep(10 * APR_USEC_PER_SEC);
clean_child_exit(APEXIT_CHILDFATAL);
}
apr_os_thread_get(&listener_os_thread, ts->listener);
}
/* XXX under some circumstances not understood, children can get stuck
* in start_threads forever trying to take over slots which will
* never be cleaned up; for now there is an APLOG_DEBUG message issued
* every so often when this condition occurs
*/
static void * APR_THREAD_FUNC start_threads(apr_thread_t *thd, void *dummy)
{
thread_starter *ts = dummy;
apr_thread_t **threads = ts->threads;
apr_threadattr_t *thread_attr = ts->threadattr;
int child_num_arg = ts->child_num_arg;
int my_child_num = child_num_arg;
proc_info *my_info;
apr_status_t rv;
int i;
int threads_created = 0;
int loops;
int prev_threads_created;
idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child);
if (idle_worker_stack == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, 0, ap_server_conf,
"worker_stack_create() failed");
clean_child_exit(APEXIT_CHILDFATAL);
}
loops = prev_threads_created = 0;
while (1) {
/* ap_threads_per_child does not include the listener thread */
for (i = 0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {
continue;
}
my_info = (proc_info *)malloc(sizeof(proc_info));
if (my_info == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, errno, ap_server_conf,
"malloc: out of memory");
clean_child_exit(APEXIT_CHILDFATAL);
}
my_info->pid = my_child_num;
my_info->tid = i;
my_info->sd = 0;
/* We are creating threads right now */
ap_update_child_status_from_indexes(my_child_num, i,
SERVER_STARTING, NULL);
/* We let each thread update its own scoreboard entry. This is
* done because it lets us deal with tid better.
*/
rv = apr_thread_create(&threads[i], thread_attr,
worker_thread, my_info, pchild);
if (rv != APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
"apr_thread_create: unable to create worker thread");
/* In case system resources are maxxed out, we don't want
Apache running away with the CPU trying to fork over and
over and over again if we exit. */
apr_sleep(10 * APR_USEC_PER_SEC);
clean_child_exit(APEXIT_CHILDFATAL);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -