jk_lb_worker.c

来自「以便Apache与其他服务进行整合 Mod_JK安装」· C语言 代码 · 共 1,498 行 · 第 1/4 页

C
1,498
字号
                                jk_pool_realloc(s->pool, sz, result, osz);                            strcat(result, ";");                            strcat(result, id_start);                        }                    }                }            }        }    }    return result;}/* Retrieve session id from the cookie or the parameter * (parameter first) */static char *get_sessionid(jk_ws_service_t *s, jk_logger_t *l){    char *val;    val = get_path_param(s, JK_PATH_SESSION_IDENTIFIER);    if (!val) {        val = get_cookie(s, JK_SESSION_IDENTIFIER);    }    if (val && !*val) {        /* TODO: For now only log the empty sessions.         *       However we should probably return 400         *       (BAD_REQUEST) in this case         */        jk_log(l, JK_LOG_INFO,               "Detected empty session identifier.");        return NULL;    }    return val;}static void close_workers(lb_worker_t * p, int num_of_workers, jk_logger_t *l){    int i = 0;    for (i = 0; i < num_of_workers; i++) {        p->lb_workers[i].w->destroy(&(p->lb_workers[i].w), l);    }}/* If the worker is in error state run * retry on that worker. It will be marked as * operational if the retry timeout is elapsed. * The worker might still be unusable, but we try * anyway. * If the worker is in ok state and got no requests * since the last global maintenance, we mark its * state as not available. * Return the number of workers not in error state. */static int recover_workers(lb_worker_t *p,                            jk_uint64_t curmax,                            time_t now,                            jk_logger_t *l){    unsigned int i;    int non_error = 0;    int elapsed;    worker_record_t *w = NULL;    JK_TRACE_ENTER(l);    if (p->sequence != p->s->sequence)        jk_lb_pull(p, l);    for (i = 0; i < p->num_of_workers; i++) {        w = &p->lb_workers[i];        if (w->s->state == JK_LB_STATE_ERROR) {            elapsed = (int)difftime(now, w->s->error_time);            if (elapsed <= p->s->recover_wait_time) {                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "worker %s will recover in %d seconds",                           w->s->name, p->s->recover_wait_time - elapsed);            }            else {                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "worker %s is marked for recovery",                           w->s->name);                if (p->lbmethod != JK_LB_METHOD_BUSYNESS)                    w->s->lb_value = curmax;                w->s->reply_timeouts = 0;                w->s->state = JK_LB_STATE_RECOVER;                non_error++;            }        }        else {            non_error++;            if (w->s->state == JK_LB_STATE_OK &&                w->s->elected == w->s->elected_snapshot)                w->s->state = JK_LB_STATE_IDLE;        }        w->s->elected_snapshot = w->s->elected;    }    JK_TRACE_EXIT(l);    return non_error;}static int force_recovery(lb_worker_t *p,                          jk_logger_t *l){    unsigned int i;    int forced = 0;    worker_record_t *w = NULL;    JK_TRACE_ENTER(l);    for (i = 0; i < p->num_of_workers; i++) {        w = &p->lb_workers[i];        if (w->s->state == JK_LB_STATE_ERROR) {            if (JK_IS_DEBUG_LEVEL(l))                jk_log(l, JK_LOG_INFO,                       "worker %s is marked for recovery",                       w->s->name);            w->s->state = JK_LB_STATE_FORCE;            forced++;        }    }    JK_TRACE_EXIT(l);    return forced;}/* Divide old load values by the decay factor, * such that older values get less important * for the routing decisions. */static jk_uint64_t decay_load(lb_worker_t *p,                              time_t exponent,                              jk_logger_t *l){    unsigned int i;    jk_uint64_t curmax = 0;    JK_TRACE_ENTER(l);    if (p->lbmethod != JK_LB_METHOD_BUSYNESS) {        for (i = 0; i < p->num_of_workers; i++) {            p->lb_workers[i].s->lb_value >>= exponent;            if (p->lb_workers[i].s->lb_value > curmax) {                curmax = p->lb_workers[i].s->lb_value;            }            p->lb_workers[i].s->reply_timeouts >>= exponent;        }    }    JK_TRACE_EXIT(l);    return curmax;}static int JK_METHOD maintain_workers(jk_worker_t *p, time_t now, jk_logger_t *l){    unsigned int i = 0;    jk_uint64_t curmax = 0;    long delta;    JK_TRACE_ENTER(l);    if (p && p->worker_private) {        lb_worker_t *lb = (lb_worker_t *)p->worker_private;        for (i = 0; i < lb->num_of_workers; i++) {            if (lb->lb_workers[i].w->maintain) {                lb->lb_workers[i].w->maintain(lb->lb_workers[i].w, now, l);            }        }        jk_shm_lock();        /* Now we check for global maintenance (once for all processes).         * Checking workers for recovery and applying decay to the         * load values should not be done by each process individually.         * Therefore we globally sync and we use a global timestamp.         * Since it's possible that we come here a few milliseconds         * before the interval has passed, we allow a little tolerance.         */        delta = (long)difftime(now, lb->s->last_maintain_time) + JK_LB_MAINTAIN_TOLERANCE;        if (delta >= lb->maintain_time) {            lb->s->last_maintain_time = now;            if (JK_IS_DEBUG_LEVEL(l))                jk_log(l, JK_LOG_DEBUG,                       "decay with 2^%d",                       JK_LB_DECAY_MULT * delta / lb->maintain_time);            curmax = decay_load(lb, JK_LB_DECAY_MULT * delta / lb->maintain_time, l);            if (!recover_workers(lb, curmax, now, l)) {                force_recovery(lb, l);            }        }        jk_shm_unlock();    }    else {        JK_LOG_NULL_PARAMS(l);    }    JK_TRACE_EXIT(l);    return JK_TRUE;}static worker_record_t *find_by_session(lb_worker_t *p,                                        const char *name,                                        jk_logger_t *l){    worker_record_t *rc = NULL;    unsigned int i;    for (i = 0; i < p->num_of_workers; i++) {        if (strcmp(p->lb_workers[i].s->route, name) == 0) {            rc = &p->lb_workers[i];            rc->r = &(rc->s->route[0]);            break;        }    }    return rc;}static worker_record_t *find_best_bydomain(lb_worker_t *p,                                           const char *domain,                                           jk_logger_t *l){    unsigned int i;    int d = 0;    jk_uint64_t curmin = 0;    worker_record_t *candidate = NULL;    /* First try to see if we have available candidate */    for (i = 0; i < p->num_of_workers; i++) {        /* Skip all workers that are not member of domain */        if (strlen(p->lb_workers[i].s->domain) == 0 ||            strcmp(p->lb_workers[i].s->domain, domain))            continue;        /* Take into calculation only the workers that are         * not in error state, stopped, disabled or busy.         */        if (JK_WORKER_USABLE(p->lb_workers[i].s)) {            if (!candidate || p->lb_workers[i].s->distance < d ||                (p->lb_workers[i].s->lb_value < curmin &&                p->lb_workers[i].s->distance == d)) {                candidate = &p->lb_workers[i];                curmin = p->lb_workers[i].s->lb_value;                d = p->lb_workers[i].s->distance;            }        }    }    if (candidate) {        candidate->r = &(candidate->s->domain[0]);    }    return candidate;}static worker_record_t *find_best_byvalue(lb_worker_t *p,                                          jk_logger_t *l){    static unsigned int next_offset = 0;    unsigned int i;    unsigned int j;    unsigned int offset;    int d = 0;    jk_uint64_t curmin = 0;    /* find the least busy worker */    worker_record_t *candidate = NULL;    offset = next_offset;    /* First try to see if we have available candidate */    for (j = offset; j < p->num_of_workers + offset; j++) {        i = j % p->num_of_workers;        /* Take into calculation only the workers that are         * not in error state, stopped, disabled or busy.         */        if (JK_WORKER_USABLE(p->lb_workers[i].s)) {            if (!candidate || p->lb_workers[i].s->distance < d ||                (p->lb_workers[i].s->lb_value < curmin &&                p->lb_workers[i].s->distance == d)) {                candidate = &p->lb_workers[i];                curmin = p->lb_workers[i].s->lb_value;                d = p->lb_workers[i].s->distance;                next_offset = i + 1;            }        }    }    return candidate;}static worker_record_t *find_bysession_route(lb_worker_t *p,                                             const char *name,                                             jk_logger_t *l){    int uses_domain  = 0;    worker_record_t *candidate = NULL;    candidate = find_by_session(p, name, l);    if (!candidate) {        uses_domain = 1;        candidate = find_best_bydomain(p, name, l);    }    if (candidate) {        if (!JK_WORKER_USABLE_STICKY(candidate->s)) {            /* We have a worker that is error state or stopped.             * If it has a redirection set use that redirection worker.             * This enables to safely remove the member from the             * balancer. Of course you will need a some kind of             * session replication between those two remote.             */            if (p->sticky_session_force)                candidate = NULL;            else if (*candidate->s->redirect)                candidate = find_by_session(p, candidate->s->redirect, l);            else if (*candidate->s->domain && !uses_domain) {                uses_domain = 1;                candidate = find_best_bydomain(p, candidate->s->domain, l);            }            if (candidate && !JK_WORKER_USABLE_STICKY(candidate->s))                candidate = NULL;        }    }    return candidate;}static worker_record_t *find_failover_worker(lb_worker_t * p,                                             jk_logger_t *l){    worker_record_t *rc = NULL;    unsigned int i;    const char *redirect = NULL;    for (i = 0; i < p->num_of_workers; i++) {        if (strlen(p->lb_workers[i].s->redirect)) {            redirect = &(p->lb_workers[i].s->redirect[0]);            break;        }    }    if (redirect)        rc = find_bysession_route(p, redirect, l);    return rc;}static worker_record_t *find_best_worker(lb_worker_t * p,                                         jk_logger_t *l){    worker_record_t *rc = NULL;    rc = find_best_byvalue(p, l);    /* By default use worker route as session route */    if (rc)        rc->r = &(rc->s->route[0]);    else        rc = find_failover_worker(p, l);    return rc;}static worker_record_t *get_most_suitable_worker(lb_worker_t * p,                                                 char *sessionid,                                                 jk_ws_service_t *s,                                                 jk_logger_t *l){    worker_record_t *rc = NULL;    int r;    JK_TRACE_ENTER(l);    if (p->num_of_workers == 1) {        /* No need to find the best worker         * if there is a single one         */        if (JK_WORKER_USABLE_STICKY(p->lb_workers[0].s)) {            if (p->lb_workers[0].s->activation != JK_LB_ACTIVATION_DISABLED) {                p->lb_workers[0].r = &(p->lb_workers[0].s->route[0]);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?