📄 apr_thread_pool.c
字号:
void *owner, apr_interval_time_t time){ apr_thread_pool_task_t *t; apr_thread_pool_task_t *t_loc; apr_thread_t *thd; apr_status_t rv = APR_SUCCESS; apr_thread_mutex_lock(me->lock); t = task_new(me, func, param, 0, owner, time); if (NULL == t) { apr_thread_mutex_unlock(me->lock); return APR_ENOMEM; } t_loc = APR_RING_FIRST(me->scheduled_tasks); while (NULL != t_loc) { /* if the time is less than the entry insert ahead of it */ if (t->dispatch.time < t_loc->dispatch.time) { ++me->scheduled_task_cnt; APR_RING_INSERT_BEFORE(t_loc, t, link); break; } else { t_loc = APR_RING_NEXT(t_loc, link); if (t_loc == APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, link)) { ++me->scheduled_task_cnt; APR_RING_INSERT_TAIL(me->scheduled_tasks, t, apr_thread_pool_task, link); break; } } } /* there should be at least one thread for scheduled tasks */ if (0 == me->thd_cnt) { rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); if (APR_SUCCESS == rv) { ++me->thd_cnt; if (me->thd_cnt > me->thd_high) me->thd_high = me->thd_cnt; } } apr_thread_mutex_unlock(me->lock); apr_thread_mutex_lock(me->cond_lock); apr_thread_cond_signal(me->cond); apr_thread_mutex_unlock(me->cond_lock); return rv;}static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, void *param, apr_byte_t priority, int push, void *owner){ apr_thread_pool_task_t *t; apr_thread_pool_task_t *t_loc; apr_thread_t *thd; apr_status_t rv = APR_SUCCESS; apr_thread_mutex_lock(me->lock); t = task_new(me, func, param, priority, owner, 0); if (NULL == t) { apr_thread_mutex_unlock(me->lock); return APR_ENOMEM; } t_loc = add_if_empty(me, t); if (NULL == t_loc) { goto FINAL_EXIT; } if (push) { while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { t_loc = APR_RING_NEXT(t_loc, link); } } APR_RING_INSERT_BEFORE(t_loc, t, link); if (!push) { if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { me->task_idx[TASK_PRIORITY_SEG(t)] = t; } } FINAL_EXIT: me->task_cnt++; if (me->task_cnt > me->tasks_high) me->tasks_high = me->task_cnt; if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && me->task_cnt > me->threshold)) { rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); if (APR_SUCCESS == rv) { ++me->thd_cnt; if (me->thd_cnt > me->thd_high) me->thd_high = me->thd_cnt; } } apr_thread_mutex_unlock(me->lock); apr_thread_mutex_lock(me->cond_lock); apr_thread_cond_signal(me->cond); apr_thread_mutex_unlock(me->cond_lock); return rv;}APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, apr_thread_start_t func, void *param, apr_byte_t priority, void *owner){ return add_task(me, func, param, priority, 1, owner);}APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, apr_thread_start_t func, void *param, apr_interval_time_t time, void *owner){ return schedule_task(me, func, param, owner, time);}APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, apr_thread_start_t func, void *param, apr_byte_t priority, void *owner){ return add_task(me, func, param, priority, 0, owner);}static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, void *owner){ apr_thread_pool_task_t *t_loc; apr_thread_pool_task_t *next; t_loc = APR_RING_FIRST(me->scheduled_tasks); while (t_loc != APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, link)) { next = APR_RING_NEXT(t_loc, link); /* if this is the owner remove it */ if (t_loc->owner == owner) { --me->scheduled_task_cnt; APR_RING_REMOVE(t_loc, link); } t_loc = next; } return APR_SUCCESS;}static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner){ apr_thread_pool_task_t *t_loc; apr_thread_pool_task_t *next; int seg; t_loc = APR_RING_FIRST(me->tasks); while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { next = APR_RING_NEXT(t_loc, link); if (t_loc->owner == owner) { --me->task_cnt; seg = TASK_PRIORITY_SEG(t_loc); if (t_loc == me->task_idx[seg]) { me->task_idx[seg] = APR_RING_NEXT(t_loc, link); if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { me->task_idx[seg] = NULL; } } APR_RING_REMOVE(t_loc, link); } t_loc = next; } return APR_SUCCESS;}static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner){#ifndef NDEBUG apr_os_thread_t *os_thread;#endif struct apr_thread_list_elt *elt; apr_thread_mutex_lock(me->lock); elt = APR_RING_FIRST(me->busy_thds); while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { if (elt->current_owner != owner) { elt = APR_RING_NEXT(elt, link); continue; }#ifndef NDEBUG /* make sure the thread is not the one calling tasks_cancel */ apr_os_thread_get(&os_thread, elt->thd);#ifdef WIN32 /* hack for apr win32 bug */ assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));#else assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));#endif#endif while (elt->current_owner == owner) { apr_thread_mutex_unlock(me->lock); apr_sleep(200 * 1000); apr_thread_mutex_lock(me->lock); } elt = APR_RING_FIRST(me->busy_thds); } apr_thread_mutex_unlock(me->lock); return;}APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, void *owner){ apr_status_t rv = APR_SUCCESS; apr_thread_mutex_lock(me->lock); if (me->task_cnt > 0) { rv = remove_tasks(me, owner); } if (me->scheduled_task_cnt > 0) { rv = remove_scheduled_tasks(me, owner); } apr_thread_mutex_unlock(me->lock); wait_on_busy_threads(me, owner); return rv;}APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me){ return me->task_cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me){ return me->scheduled_task_cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me){ return me->thd_cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me){ return me->thd_cnt - me->idle_cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me){ return me->idle_cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_tasks_run_count(apr_thread_pool_t * me){ return me->tasks_run;}APU_DECLARE(apr_size_t) apr_thread_pool_tasks_high_count(apr_thread_pool_t * me){ return me->tasks_high;}APU_DECLARE(apr_size_t) apr_thread_pool_threads_high_count(apr_thread_pool_t * me){ return me->thd_high;}APU_DECLARE(apr_size_t) apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me){ return me->thd_timed_out;}APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me){ return me->idle_max;}APU_DECLARE(apr_interval_time_t) apr_thread_pool_idle_wait_get(apr_thread_pool_t * me){ return me->idle_wait;}/* * This function stop extra idle threads to the cnt. * @return the number of threads stopped * NOTE: There could be busy threads become idle during this function */static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle){ struct apr_thread_list *thds; apr_size_t n, n_dbg, i; struct apr_thread_list_elt *head, *tail, *elt; apr_thread_mutex_lock(me->lock); if (idle) { thds = me->idle_thds; n = me->idle_cnt; } else { thds = me->busy_thds; n = me->thd_cnt - me->idle_cnt; } if (n <= *cnt) { apr_thread_mutex_unlock(me->lock); *cnt = 0; return NULL; } n -= *cnt; head = APR_RING_FIRST(thds); for (i = 0; i < *cnt; i++) { head = APR_RING_NEXT(head, link); } tail = APR_RING_LAST(thds); if (idle) { APR_RING_UNSPLICE(head, tail, link); me->idle_cnt = *cnt; } n_dbg = 0; for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { elt->state = TH_STOP; n_dbg++; } elt->state = TH_STOP; n_dbg++; assert(n == n_dbg); *cnt = n; apr_thread_mutex_unlock(me->lock); APR_RING_PREV(head, link) = NULL; APR_RING_NEXT(tail, link) = NULL; return head;}static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt){ apr_size_t n_dbg; struct apr_thread_list_elt *elt, *head, *tail; apr_status_t rv; elt = trim_threads(me, &cnt, 1); apr_thread_mutex_lock(me->cond_lock); apr_thread_cond_broadcast(me->cond); apr_thread_mutex_unlock(me->cond_lock); n_dbg = 0; if (NULL != (head = elt)) { while (elt) { tail = elt; apr_thread_join(&rv, elt->thd); elt = APR_RING_NEXT(elt, link); ++n_dbg; } apr_thread_mutex_lock(me->lock); APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, apr_thread_list_elt, link); apr_thread_mutex_unlock(me->lock); } assert(cnt == n_dbg); return cnt;}/* don't join on busy threads for performance reasons, who knows how long will * the task takes to perform */static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt){ trim_threads(me, &cnt, 0); return cnt;}APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, apr_size_t cnt){ me->idle_max = cnt; cnt = trim_idle_threads(me, cnt); return cnt;}APU_DECLARE(apr_interval_time_t) apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, apr_interval_time_t timeout){ apr_interval_time_t oldtime; oldtime = me->idle_wait; me->idle_wait = timeout; return oldtime;}APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me){ return me->thd_max;}/* * This function stop extra working threads to the new limit. * NOTE: There could be busy threads become idle during this function */APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, apr_size_t cnt){ unsigned int n; me->thd_max = cnt; if (0 == cnt || me->thd_cnt <= cnt) { return 0; } n = me->thd_cnt - cnt; if (n >= me->idle_cnt) { trim_busy_threads(me, n - me->idle_cnt); trim_idle_threads(me, 0); } else { trim_idle_threads(me, me->idle_cnt - n); } return n;}APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me){ return me->threshold;}APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, apr_size_t val){ apr_size_t ov; ov = me->threshold; me->threshold = val; return ov;}APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, void **owner){ apr_status_t rv; apr_thread_pool_task_t *task; void *data; rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); if (rv != APR_SUCCESS) { return rv; } task = data; if (!task) { *owner = NULL; return APR_BADARG; } *owner = task->owner; return APR_SUCCESS;}#endif /* APR_HAS_THREADS *//* vim: set ts=4 sw=4 et cin tw=80: */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -