📄 event.c
字号:
ap_push_pool(worker_queue_info, cs->p); } return APR_SUCCESS;}/* get_worker: * reserve a worker thread, block if all are currently busy. * this prevents the worker queue from overflowing and lets * other processes accept new connections in the mean time. */static int get_worker(int *have_idle_worker_p){ apr_status_t rc; if (!*have_idle_worker_p) { rc = ap_queue_info_wait_for_idler(worker_queue_info); if (rc == APR_SUCCESS) { *have_idle_worker_p = 1; return 1; } else { if (!APR_STATUS_IS_EOF(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "ap_queue_info_wait_for_idler failed. " "Attempting to shutdown process gracefully"); signal_threads(ST_GRACEFUL); } return 0; } } else { /* already reserved a worker thread - must have hit a * transient error on a previous pass */ return 1; }}static void *listener_thread(apr_thread_t * thd, void *dummy){ apr_status_t rc; proc_info *ti = dummy; int process_slot = ti->pid; apr_pool_t *tpool = apr_thread_pool_get(thd); void *csd = NULL; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ ap_listen_rec *lr; int have_idle_worker = 0; conn_state_t *cs; const apr_pollfd_t *out_pfd; apr_int32_t num = 0; apr_time_t time_now = 0; apr_interval_time_t timeout_interval; apr_time_t timeout_time; listener_poll_type *pt; free(ti); /* We set this to force apr_pollset to wakeup if there hasn't been any IO * on any of its sockets. This allows sockets to have been added * when no other keepalive operations where going on. * * current value is 1 second */ timeout_interval = 1000000; /* the following times out events that are really close in the future * to prevent extra poll calls * * current value is .1 second */#define TIMEOUT_FUDGE_FACTOR 100000 /* POLLSET_SCALE_FACTOR * ap_threads_per_child sets the size of * the pollset. I've seen 15 connections per active worker thread * running SPECweb99. * * However, with the newer apr_pollset, this is the number of sockets that * we will return to any *one* call to poll(). Therefore, there is no * reason to make it more than ap_threads_per_child. */#define POLLSET_SCALE_FACTOR 1 rc = apr_thread_mutex_create(&timeout_mutex, APR_THREAD_MUTEX_DEFAULT, tpool); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "creation of the timeout mutex failed. Attempting to " "shutdown process gracefully"); signal_threads(ST_GRACEFUL); return NULL; } APR_RING_INIT(&timeout_head, conn_state_t, timeout_list); /* Create the main pollset */ rc = apr_pollset_create(&event_pollset, ap_threads_per_child * POLLSET_SCALE_FACTOR, tpool, APR_POLLSET_THREADSAFE); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "apr_pollset_create with Thread Safety failed. " "Attempting to shutdown process gracefully"); signal_threads(ST_GRACEFUL); return NULL; } for (lr = ap_listeners; lr != NULL; lr = lr->next) { apr_pollfd_t pfd = { 0 }; pt = apr_pcalloc(tpool, sizeof(*pt)); pfd.desc_type = APR_POLL_SOCKET; pfd.desc.s = lr->sd; pfd.reqevents = APR_POLLIN; pt->type = PT_ACCEPT; pt->baton = lr; pfd.client_data = pt; apr_socket_opt_set(pfd.desc.s, APR_SO_NONBLOCK, 1); apr_pollset_add(event_pollset, &pfd); } /* Unblock the signal used to wake this thread up, and set a handler for * it. */ unblock_signal(LISTENER_SIGNAL); apr_signal(LISTENER_SIGNAL, dummy_signal_handler); while (!listener_may_exit) { if (requests_this_child <= 0) { check_infinite_requests(); } rc = apr_pollset_poll(event_pollset, timeout_interval, &num, &out_pfd); if (rc != APR_SUCCESS) { if (APR_STATUS_IS_EINTR(rc)) { continue; } if (!APR_STATUS_IS_TIMEUP(rc)) { ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "apr_pollset_poll failed. Attempting to " "shutdown process gracefully"); signal_threads(ST_GRACEFUL); } } if (listener_may_exit) break; while (num && get_worker(&have_idle_worker)) { pt = (listener_poll_type *) out_pfd->client_data; if (pt->type == PT_CSD) { /* one of the sockets is readable */ cs = (conn_state_t *) pt->baton; switch (cs->state) { case CONN_STATE_CHECK_REQUEST_LINE_READABLE: cs->state = CONN_STATE_READ_REQUEST_LINE; break; default: ap_log_error(APLOG_MARK, APLOG_ERR, rc, ap_server_conf, "event_loop: unexpected state %d", cs->state); AP_DEBUG_ASSERT(0); } apr_thread_mutex_lock(timeout_mutex); APR_RING_REMOVE(cs, timeout_list); apr_thread_mutex_unlock(timeout_mutex); rc = push2worker(out_pfd, event_pollset); if (rc != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, "push2worker failed"); } else { have_idle_worker = 0; } } else { /* A Listener Socket is ready for an accept() */ apr_pool_t *recycled_pool = NULL; lr = (ap_listen_rec *) pt->baton; ap_pop_pool(&recycled_pool, worker_queue_info); if (recycled_pool == NULL) { /* create a new transaction pool for each accepted socket */ apr_allocator_t *allocator; apr_allocator_create(&allocator); apr_allocator_max_free_set(allocator, ap_max_mem_free); apr_pool_create_ex(&ptrans, pconf, NULL, allocator); apr_allocator_owner_set(allocator, ptrans); if (ptrans == NULL) { ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, "Failed to create transaction pool"); signal_threads(ST_GRACEFUL); return NULL; } } else { ptrans = recycled_pool; } apr_pool_tag(ptrans, "transaction"); rc = lr->accept_func(&csd, lr, ptrans); /* later we trash rv and rely on csd to indicate * success/failure */ AP_DEBUG_ASSERT(rc == APR_SUCCESS || !csd); if (rc == APR_EGENERAL) { /* E[NM]FILE, ENOMEM, etc */ resource_shortage = 1; signal_threads(ST_GRACEFUL); } if (csd != NULL) { rc = ap_queue_push(worker_queue, csd, NULL, ptrans); if (rc != APR_SUCCESS) { /* trash the connection; we couldn't queue the connected * socket to a worker */ apr_socket_close(csd); ap_log_error(APLOG_MARK, APLOG_CRIT, rc, ap_server_conf, "ap_queue_push failed"); apr_pool_clear(ptrans); ap_push_pool(worker_queue_info, ptrans); } else { have_idle_worker = 0; } } else { apr_pool_clear(ptrans); ap_push_pool(worker_queue_info, ptrans); } } /* if:else on pt->type */ out_pfd++; num--; } /* while for processing poll */ /* XXX possible optimization: stash the current time for use as * r->request_time for new requests */ time_now = apr_time_now(); /* handle timed out sockets */ apr_thread_mutex_lock(timeout_mutex); cs = APR_RING_FIRST(&timeout_head); timeout_time = time_now + TIMEOUT_FUDGE_FACTOR; while (!APR_RING_EMPTY(&timeout_head, conn_state_t, timeout_list) && cs->expiration_time < timeout_time) { cs->state = CONN_STATE_LINGER; APR_RING_REMOVE(cs, timeout_list); apr_thread_mutex_unlock(timeout_mutex); if (!get_worker(&have_idle_worker)) { apr_thread_mutex_lock(timeout_mutex); APR_RING_INSERT_HEAD(&timeout_head, cs, conn_state_t, timeout_list); break; } rc = push2worker(&cs->pfd, event_pollset); if (rc != APR_SUCCESS) { return NULL; /* XXX return NULL looks wrong - not an init failure * that bypasses all the cleanup outside the main loop * break seems more like it * need to evaluate seriousness of push2worker failures */ } have_idle_worker = 0; apr_thread_mutex_lock(timeout_mutex); cs = APR_RING_FIRST(&timeout_head); } apr_thread_mutex_unlock(timeout_mutex); } /* listener main loop */ ap_close_listeners(); ap_queue_term(worker_queue); dying = 1; ap_scoreboard_image->parent[process_slot].quiescing = 1; /* wake up the main thread */ kill(ap_my_pid, SIGTERM); apr_thread_exit(thd, APR_SUCCESS); return NULL;}/* XXX For ungraceful termination/restart, we definitely don't want to * wait for active connections to finish but we may want to wait * for idle workers to get out of the queue code and release mutexes, * since those mutexes are cleaned up pretty soon and some systems * may not react favorably (i.e., segfault) if operations are attempted * on cleaned-up mutexes. */static void *APR_THREAD_FUNC worker_thread(apr_thread_t * thd, void *dummy){ proc_info *ti = dummy; int process_slot = ti->pid; int thread_slot = ti->tid; apr_socket_t *csd = NULL; conn_state_t *cs; apr_pool_t *ptrans; /* Pool for per-transaction stuff */ apr_status_t rv; int is_idle = 0; free(ti); ap_scoreboard_image->servers[process_slot][thread_slot].pid = ap_my_pid; ap_scoreboard_image->servers[process_slot][thread_slot].generation = ap_my_generation; ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING, NULL); while (!workers_may_exit) { if (!is_idle) { rv = ap_queue_info_set_idle(worker_queue_info, NULL); if (rv != APR_SUCCESS) { ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf, "ap_queue_info_set_idle failed. Attempting to " "shutdown process gracefully."); signal_threads(ST_GRACEFUL); break; } is_idle = 1; } ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY, NULL); worker_pop: if (workers_may_exit) { break; } rv = ap_queue_pop(worker_queue, &csd, &cs, &ptrans); if (rv != APR_SUCCESS) { /* We get APR_EOF during a graceful shutdown once all the * connections accepted by this server process have been handled. */ if (APR_STATUS_IS_EOF(rv)) { break; } /* We get APR_EINTR whenever ap_queue_pop() has been interrupted * from an explicit call to ap_queue_interrupt_all(). This allows * us to unblock threads stuck in ap_queue_pop() when a shutdown * is pending. * * If workers_may_exit is set and this is ungraceful termination/ * restart, we are bound to get an error on some systems (e.g., * AIX, which sanity-checks mutex operations) since the queue * may have already been cleaned up. Don't log the "error" if * workers_may_exit is set. */ else if (APR_STATUS_IS_EINTR(rv)) { goto worker_pop; } /* We got some other error. */ else if (!workers_may_exit) { ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf, "ap_queue_pop failed"); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -