⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 apr_thread_pool.c

📁 Apache官方在今天放出产品系列2.2的最新版本2.2.11的源码包 最流行的HTTP服务器软件之一
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed * with this work for additional information regarding copyright * ownership.  The ASF licenses this file to you under the Apache * License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License.  You may obtain a copy of * the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied.  See the License for the specific language governing * permissions and limitations under the License. */#include <assert.h>#include "apr_thread_pool.h"#include "apr_ring.h"#include "apr_thread_cond.h"#include "apr_portable.h"#if APR_HAS_THREADS#define TASK_PRIORITY_SEGS 4#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)typedef struct apr_thread_pool_task{    APR_RING_ENTRY(apr_thread_pool_task) link;    apr_thread_start_t func;    void *param;    void *owner;    union    {        apr_byte_t priority;        apr_time_t time;    } dispatch;} apr_thread_pool_task_t;APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);struct apr_thread_list_elt{    APR_RING_ENTRY(apr_thread_list_elt) link;    apr_thread_t *thd;    volatile void *current_owner;    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;};APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);struct apr_thread_pool{    apr_pool_t *pool;    volatile apr_size_t thd_max;    volatile apr_size_t idle_max;    volatile apr_interval_time_t idle_wait;    volatile apr_size_t thd_cnt;    volatile apr_size_t idle_cnt;    volatile apr_size_t task_cnt;    volatile apr_size_t scheduled_task_cnt;    volatile apr_size_t threshold;    volatile apr_size_t tasks_run;    volatile apr_size_t tasks_high;    volatile apr_size_t thd_high;    volatile apr_size_t thd_timed_out;    struct apr_thread_pool_tasks *tasks;    struct apr_thread_pool_tasks *scheduled_tasks;    struct apr_thread_list *busy_thds;    struct apr_thread_list *idle_thds;    apr_thread_mutex_t *lock;    apr_thread_mutex_t *cond_lock;    apr_thread_cond_t *cond;    volatile int terminated;    struct apr_thread_pool_tasks *recycled_tasks;    struct apr_thread_list *recycled_thds;    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];};static apr_status_t thread_pool_construct(apr_thread_pool_t * me,                                          apr_size_t init_threads,                                          apr_size_t max_threads){    apr_status_t rv;    int i;    me->thd_max = max_threads;    me->idle_max = init_threads;    me->threshold = init_threads / 2;    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,                                 me->pool);    if (APR_SUCCESS != rv) {        return rv;    }    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,                                 me->pool);    if (APR_SUCCESS != rv) {        apr_thread_mutex_destroy(me->lock);        return rv;    }    rv = apr_thread_cond_create(&me->cond, me->pool);    if (APR_SUCCESS != rv) {        apr_thread_mutex_destroy(me->lock);        apr_thread_mutex_destroy(me->cond_lock);        return rv;    }    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));    if (!me->tasks) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));    if (!me->scheduled_tasks) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));    if (!me->recycled_tasks) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));    if (!me->busy_thds) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));    if (!me->idle_thds) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));    if (!me->recycled_thds) {        goto CATCH_ENOMEM;    }    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;    me->idle_wait = 0;    me->terminated = 0;    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {        me->task_idx[i] = NULL;    }    goto FINAL_EXIT;  CATCH_ENOMEM:    rv = APR_ENOMEM;    apr_thread_mutex_destroy(me->lock);    apr_thread_mutex_destroy(me->cond_lock);    apr_thread_cond_destroy(me->cond);  FINAL_EXIT:    return rv;}/* * NOTE: This function is not thread safe by itself. Caller should hold the lock */static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me){    apr_thread_pool_task_t *task = NULL;    int seg;    /* check for scheduled tasks */    if (me->scheduled_task_cnt > 0) {        task = APR_RING_FIRST(me->scheduled_tasks);        assert(task != NULL);        assert(task !=               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,                                 link));        /* if it's time */        if (task->dispatch.time <= apr_time_now()) {            --me->scheduled_task_cnt;            APR_RING_REMOVE(task, link);            return task;        }    }    /* check for normal tasks if we're not returning a scheduled task */    if (me->task_cnt == 0) {        return NULL;    }    task = APR_RING_FIRST(me->tasks);    assert(task != NULL);    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));    --me->task_cnt;    seg = TASK_PRIORITY_SEG(task);    if (task == me->task_idx[seg]) {        me->task_idx[seg] = APR_RING_NEXT(task, link);        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,                                                   apr_thread_pool_task, link)            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {            me->task_idx[seg] = NULL;        }    }    APR_RING_REMOVE(task, link);    return task;}static apr_interval_time_t waiting_time(apr_thread_pool_t * me){    apr_thread_pool_task_t *task = NULL;    task = APR_RING_FIRST(me->scheduled_tasks);    assert(task != NULL);    assert(task !=           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,                             link));    return task->dispatch.time - apr_time_now();}/* * NOTE: This function is not thread safe by itself. Caller should hold the lock */static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,                                           apr_thread_t * t){    struct apr_thread_list_elt *elt;    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {        elt = apr_pcalloc(me->pool, sizeof(*elt));        if (NULL == elt) {            return NULL;        }    }    else {        elt = APR_RING_FIRST(me->recycled_thds);        APR_RING_REMOVE(elt, link);    }    APR_RING_ELEM_INIT(elt, link);    elt->thd = t;    elt->current_owner = NULL;    elt->state = TH_RUN;    return elt;}/* * The worker thread function. Take a task from the queue and perform it if * there is any. Otherwise, put itself into the idle thread list and waiting * for signal to wake up. * The thread terminate directly by detach and exit when it is asked to stop * after finishing a task. Otherwise, the thread should be in idle thread list * and should be joined. */static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param){    apr_status_t rv = APR_SUCCESS;    apr_thread_pool_t *me = param;    apr_thread_pool_task_t *task = NULL;    apr_interval_time_t wait;    struct apr_thread_list_elt *elt;    apr_thread_mutex_lock(me->lock);    elt = elt_new(me, t);    if (!elt) {        apr_thread_mutex_unlock(me->lock);        apr_thread_exit(t, APR_ENOMEM);    }    while (!me->terminated && elt->state != TH_STOP) {        /* Test if not new element, it is awakened from idle */        if (APR_RING_NEXT(elt, link) != elt) {            --me->idle_cnt;            APR_RING_REMOVE(elt, link);        }        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);        task = pop_task(me);        while (NULL != task && !me->terminated) {            ++me->tasks_run;            elt->current_owner = task->owner;            apr_thread_mutex_unlock(me->lock);            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);            task->func(t, task->param);            apr_thread_mutex_lock(me->lock);            APR_RING_INSERT_TAIL(me->recycled_tasks, task,                                 apr_thread_pool_task, link);            elt->current_owner = NULL;            if (TH_STOP == elt->state) {                break;            }            task = pop_task(me);        }        assert(NULL == elt->current_owner);        if (TH_STOP != elt->state)            APR_RING_REMOVE(elt, link);        /* Test if a busy thread been asked to stop, which is not joinable */        if ((me->idle_cnt >= me->idle_max             && !(me->scheduled_task_cnt && 0 >= me->idle_max)             && !me->idle_wait)            || me->terminated || elt->state != TH_RUN) {            --me->thd_cnt;            if ((TH_PROBATION == elt->state) && me->idle_wait)                ++me->thd_timed_out;            APR_RING_INSERT_TAIL(me->recycled_thds, elt,                                 apr_thread_list_elt, link);            apr_thread_mutex_unlock(me->lock);            apr_thread_detach(t);            apr_thread_exit(t, APR_SUCCESS);            return NULL;        /* should not be here, safe net */        }        /* busy thread become idle */        ++me->idle_cnt;        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);        /*          * If there is a scheduled task, always scheduled to perform that task.         * Since there is no guarantee that current idle threads are scheduled         * for next scheduled task.         */        if (me->scheduled_task_cnt)            wait = waiting_time(me);        else if (me->idle_cnt > me->idle_max) {            wait = me->idle_wait;            elt->state = TH_PROBATION;        }        else            wait = -1;        apr_thread_mutex_unlock(me->lock);        apr_thread_mutex_lock(me->cond_lock);        if (wait >= 0) {            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);        }        else {            rv = apr_thread_cond_wait(me->cond, me->cond_lock);        }        apr_thread_mutex_unlock(me->cond_lock);        apr_thread_mutex_lock(me->lock);    }    /* idle thread been asked to stop, will be joined */    --me->thd_cnt;    apr_thread_mutex_unlock(me->lock);    apr_thread_exit(t, APR_SUCCESS);    return NULL;                /* should not be here, safe net */}static apr_status_t thread_pool_cleanup(void *me){    apr_thread_pool_t *_self = me;    _self->terminated = 1;    apr_thread_pool_idle_max_set(_self, 0);    while (_self->thd_cnt) {        apr_sleep(20 * 1000);   /* spin lock with 20 ms */    }    apr_thread_mutex_destroy(_self->lock);    apr_thread_mutex_destroy(_self->cond_lock);    apr_thread_cond_destroy(_self->cond);    return APR_SUCCESS;}APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,                                                 apr_size_t init_threads,                                                 apr_size_t max_threads,                                                 apr_pool_t * pool){    apr_thread_t *t;    apr_status_t rv = APR_SUCCESS;    *me = apr_pcalloc(pool, sizeof(**me));    if (!*me) {        return APR_ENOMEM;    }    (*me)->pool = pool;    rv = thread_pool_construct(*me, init_threads, max_threads);    if (APR_SUCCESS != rv) {        *me = NULL;        return rv;    }    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,                              apr_pool_cleanup_null);    while (init_threads) {        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);        if (APR_SUCCESS != rv) {            break;        }        ++(*me)->thd_cnt;        if ((*me)->thd_cnt > (*me)->thd_high)            (*me)->thd_high = (*me)->thd_cnt;        --init_threads;    }    return rv;}APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me){    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);}/* * NOTE: This function is not thread safe by itself. Caller should hold the lock */static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,                                        apr_thread_start_t func,                                        void *param, apr_byte_t priority,                                        void *owner, apr_time_t time){    apr_thread_pool_task_t *t;    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {        t = apr_pcalloc(me->pool, sizeof(*t));        if (NULL == t) {            return NULL;        }    }    else {        t = APR_RING_FIRST(me->recycled_tasks);        APR_RING_REMOVE(t, link);    }    APR_RING_ELEM_INIT(t, link);    t->func = func;    t->param = param;    t->owner = owner;    if (time > 0) {        t->dispatch.time = apr_time_now() + time;    }    else {        t->dispatch.priority = priority;    }    return t;}/* * Test it the task is the only one within the priority segment.  * If it is not, return the first element with same or lower priority.  * Otherwise, add the task into the queue and return NULL. * * NOTE: This function is not thread safe by itself. Caller should hold the lock */static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,                                            apr_thread_pool_task_t * const t){    int seg;    int next;    apr_thread_pool_task_t *t_next;    seg = TASK_PRIORITY_SEG(t);    if (me->task_idx[seg]) {        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=               me->task_idx[seg]);        t_next = me->task_idx[seg];        while (t_next->dispatch.priority > t->dispatch.priority) {            t_next = APR_RING_NEXT(t_next, link);            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==                t_next) {                return t_next;            }        }        return t_next;    }    for (next = seg - 1; next >= 0; next--) {        if (me->task_idx[next]) {            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);            break;        }    }    if (0 > next) {        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);    }    me->task_idx[seg] = t;    return NULL;}/**   schedule a task to run in "time" microseconds. Find the spot in the ring where*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.*/static apr_status_t schedule_task(apr_thread_pool_t *me,                                  apr_thread_start_t func, void *param,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -