jk_lb_worker.c

来自「以便Apache与其他服务进行整合 Mod_JK安装」· C语言 代码 · 共 1,498 行 · 第 1/4 页

C
1,498
字号
                JK_TRACE_EXIT(l);                return &p->lb_workers[0];            }        }        else {            JK_TRACE_EXIT(l);            return NULL;        }    }    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)        r = jk_shm_lock();    else {        JK_ENTER_CS(&(p->cs), r);    }    if (!r) {       jk_log(l, JK_LOG_ERROR,              "locking failed (errno=%d)",              errno);    }    if (sessionid) {        char *session = sessionid;        while (sessionid) {            char *next = strchr(sessionid, ';');            char *session_route = NULL;            if (next)               *next++ = '\0';            if (JK_IS_DEBUG_LEVEL(l))                jk_log(l, JK_LOG_DEBUG,                       "searching worker for partial sessionid %s",                       sessionid);            session_route = strchr(sessionid, '.');            if (session_route) {                ++session_route;                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "searching worker for session route %s",                           session_route);                /* We have a session route. Whow! */                rc = find_bysession_route(p, session_route, l);                if (rc) {                    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)                        jk_shm_unlock();                    else {                        JK_LEAVE_CS(&(p->cs), r);                    }                    if (JK_IS_DEBUG_LEVEL(l))                        jk_log(l, JK_LOG_DEBUG,                               "found worker %s (%s) for route %s and partial sessionid %s",                               rc->s->name, rc->s->route, session_route, sessionid);                        JK_TRACE_EXIT(l);                    return rc;                }            }            /* Try next partial sessionid if present */            sessionid = next;            rc = NULL;        }        if (!rc && p->sticky_session_force) {            if (p->lblock == JK_LB_LOCK_PESSIMISTIC)                jk_shm_unlock();            else {                JK_LEAVE_CS(&(p->cs), r);            }            jk_log(l, JK_LOG_INFO,                   "all workers are in error state for session %s",                   session);            JK_TRACE_EXIT(l);            return NULL;        }    }    rc = find_best_worker(p, l);    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)        jk_shm_unlock();    else {        JK_LEAVE_CS(&(p->cs), r);    }    if (rc && JK_IS_DEBUG_LEVEL(l)) {        jk_log(l, JK_LOG_DEBUG,               "found best worker %s (%s) using method '%s'",               rc->s->name, rc->s->route, jk_lb_get_method(p, l));    }    JK_TRACE_EXIT(l);    return rc;}static void lb_add_log_items(jk_ws_service_t *s,                             const char *const *log_names,                             worker_record_t *w,                             jk_logger_t *l){    const char **log_values = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT);    char *buf = jk_pool_alloc(s->pool, sizeof(char *) * JK_LB_NOTES_COUNT * JK_LB_UINT64_STR_SZ);    if (log_values && buf) {        /* JK_NOTE_LB_FIRST/LAST_NAME */        log_values[0] = w->s->name;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->lb_value);        /* JK_NOTE_LB_FIRST/LAST_VALUE */        log_values[1] = buf;        buf += JK_LB_UINT64_STR_SZ;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->elected);        /* JK_NOTE_LB_FIRST/LAST_ACCESSED */        log_values[2] = buf;        buf += JK_LB_UINT64_STR_SZ;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->readed);        /* JK_NOTE_LB_FIRST/LAST_READ */        log_values[3] = buf;        buf += JK_LB_UINT64_STR_SZ;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT64_T_FMT, w->s->transferred);        /* JK_NOTE_LB_FIRST/LAST_TRANSFERRED */        log_values[4] = buf;        buf += JK_LB_UINT64_STR_SZ;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%" JK_UINT32_T_FMT, w->s->errors);        /* JK_NOTE_LB_FIRST/LAST_ERRORS */        log_values[5] = buf;        buf += JK_LB_UINT64_STR_SZ;        snprintf(buf, JK_LB_UINT64_STR_SZ, "%d", w->s->busy);        /* JK_NOTE_LB_FIRST/LAST_BUSY */        log_values[6] = buf;        /* JK_NOTE_LB_FIRST/LAST_ACTIVATION */        log_values[7] = jk_lb_get_activation(w, l);        /* JK_NOTE_LB_FIRST/LAST_STATE */        log_values[8] = jk_lb_get_state(w, l);        s->add_log_items(s, log_names, log_values, JK_LB_NOTES_COUNT);    }}static int JK_METHOD service(jk_endpoint_t *e,                             jk_ws_service_t *s,                             jk_logger_t *l, int *is_error){    lb_endpoint_t *p;    int attempt = 1;    worker_record_t *prec = NULL;    int num_of_workers;    int first = 1;    int was_forced = 0;    int rc = -1;    char *sessionid = NULL;    JK_TRACE_ENTER(l);    if (!e || !e->endpoint_private || !s || !is_error) {        JK_LOG_NULL_PARAMS(l);        if (is_error)            *is_error = JK_HTTP_SERVER_ERROR;        JK_TRACE_EXIT(l);        return JK_FALSE;    }    p = e->endpoint_private;    num_of_workers = p->worker->num_of_workers;    /* Set returned error to OK */    *is_error = JK_HTTP_OK;    /* set the recovery post, for LB mode */    s->reco_buf = jk_b_new(s->pool);    if (!s->reco_buf) {        *is_error = JK_HTTP_SERVER_ERROR;        jk_log(l, JK_LOG_ERROR,               "Failed allocating AJP message");        JK_TRACE_EXIT(l);        return JK_SERVER_ERROR;            }    if (jk_b_set_buffer_size(s->reco_buf, p->worker->max_packet_size)) {        *is_error = JK_HTTP_SERVER_ERROR;        jk_log(l, JK_LOG_ERROR,               "Failed allocating AJP message buffer");        JK_TRACE_EXIT(l);        return JK_SERVER_ERROR;            }    jk_b_reset(s->reco_buf);    s->reco_status = RECO_INITED;    jk_shm_lock();    if (p->worker->sequence != p->worker->s->sequence)        jk_lb_pull(p->worker, l);    jk_shm_unlock();    if (p->worker->sticky_session) {        /* Use sessionid only if sticky_session is         * defined for this load balancer         */        sessionid = get_sessionid(s, l);    }    if (JK_IS_DEBUG_LEVEL(l))        jk_log(l, JK_LOG_DEBUG,               "service sticky_session=%d id='%s'",               p->worker->sticky_session, sessionid ? sessionid : "empty");    while (attempt <= num_of_workers && rc == -1) {        worker_record_t *rec =            get_most_suitable_worker(p->worker, sessionid, s, l);        /* Do not reuse previous worker, because         * that worker already failed.         */        if (rec) {            int r;            int is_service_error = JK_HTTP_OK;            jk_endpoint_t *end = NULL;            int retry = 0;            int retry_wait = JK_LB_MIN_RETRY_WAIT;            s->route = rec->r;            prec = rec;            if (JK_IS_DEBUG_LEVEL(l))                jk_log(l, JK_LOG_DEBUG,                       "service worker=%s route=%s",                       rec->s->name, s->route);            if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                jk_shm_lock();            if (rec->s->state == JK_LB_STATE_RECOVER)                rec->s->state = JK_LB_STATE_PROBE;            if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                jk_shm_unlock();                                   while ((!(r=rec->w->get_endpoint(rec->w, &end, l)) || !end) && (retry < p->worker->s->retries)) {                retry++;                retry_wait *=2;                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_lock();                if (retry_wait > JK_LB_MAX_RETRY_WAIT)                    retry_wait = JK_LB_MAX_RETRY_WAIT;                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_unlock();                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "could not get free endpoint for worker"                           " (retry %d, sleeping for %d ms)",                           retry, retry_wait);                jk_sleep(retry_wait);            }            if (!r || !end) {                /* If we can not get the endpoint                 * mark the worker as busy rather then                 * as in error if the retry number is                 * greater then the number of retries.                 */                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_lock();                if (rec->s->state != JK_LB_STATE_ERROR)                    rec->s->state = JK_LB_STATE_BUSY;                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_unlock();                jk_log(l, JK_LOG_INFO,                       "could not get free endpoint for worker %s (%d retries)",                       rec->s->name, retry);            }            else {                int service_stat = -1;                jk_uint64_t rd = 0;                jk_uint64_t wr = 0;                /* Reset endpoint read and write sizes for                 * this request.                 */                end->rd = end->wr = 0;                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_lock();                rec->s->elected++;                /* Increment the number of workers serving request */                p->worker->s->busy++;                if (p->worker->s->busy > p->worker->s->max_busy)                    p->worker->s->max_busy = p->worker->s->busy;                rec->s->busy++;                if (rec->s->busy > rec->s->max_busy)                    rec->s->max_busy = rec->s->busy;                if ( (p->worker->lbmethod == JK_LB_METHOD_REQUESTS) ||                     (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) ||                     (p->worker->lbmethod == JK_LB_METHOD_SESSIONS &&                      !sessionid) )                    rec->s->lb_value += rec->s->lb_mult;                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_unlock();                service_stat = end->service(end, s, l, &is_service_error);                rd = end->rd;                wr = end->wr;                end->done(&end, l);                if (p->worker->lblock == JK_LB_LOCK_PESSIMISTIC)                    jk_shm_lock();                /* Update partial reads and writes if any */                rec->s->readed += rd;                rec->s->transferred += wr;                if (p->worker->lbmethod == JK_LB_METHOD_TRAFFIC) {                    rec->s->lb_value += (rd+wr)*rec->s->lb_mult;                }                else if (p->worker->lbmethod == JK_LB_METHOD_BUSYNESS) {                    if (rec->s->lb_value >= rec->s->lb_mult) {                        rec->s->lb_value -= rec->s->lb_mult;                    }                    else {                        rec->s->lb_value = 0;                        if (JK_IS_DEBUG_LEVEL(l)) {                            jk_log(l, JK_LOG_DEBUG,                                   "worker %s has load value to low (%"                                   JK_UINT64_T_FMT                                   " < %"                                   JK_UINT64_T_FMT                                   ") ",                                   "- correcting to 0",                                   rec->s->name,                                   rec->s->lb_value,                                   rec->s->lb_mult);                        }                    }                }                /* When returning the endpoint mark the worker as not busy.                 * We have at least one endpoint free                 */                if (rec->s->state == JK_LB_STATE_BUSY)                    rec->s->state = JK_LB_STATE_OK;                /* Decrement the busy worker count.                 * Check if the busy was reset to zero by graceful                 * restart of the server.                 */                if (rec->s->busy)                    rec->s->busy--;                if (p->worker->s->busy)                    p->worker->s->busy--;                if (service_stat == JK_TRUE) {                    rec->s->state = JK_LB_STATE_OK;                    rec->s->error_time = 0;                    rc = JK_TRUE;                }                else if (service_stat == JK_CLIENT_ERROR) {                    /*                    * Client error !!!                    * Since this is bad request do not fail over.                    */                    rec->s->client_errors++;                    rec->s->state = JK_LB_STATE_OK;                    rec->s->error_time = 0;                    jk_log(l, JK_LOG_INFO,                           "unrecoverable error %d, request failed."                           " Client failed in the middle of request,"                           " we can't recover to another instance.",                           is_service_error);                    *is_error = is_service_error;                    rc = JK_CLIENT_ERROR;                }                else {                    if (is_service_error != JK_HTTP_SERVER_BUSY) {                        /*                        * Error is not recoverable - break with an error.                        */                        jk_log(l, JK_LOG_ERROR,                            "unrecoverable error %d, request failed."                            " Tomcat failed in the middle of request,"                            " we can't recover to another instance.",                            is_service_error);                        *is_error = is_service_error;                        rc = JK_FALSE;                    }                    if (service_stat == JK_REPLY_TIMEOUT) {                        rec->s->reply_timeouts++;                    }                    if (service_stat != JK_STATUS_ERROR &&                        (service_stat != JK_REPLY_TIMEOUT ||                        rec->s->reply_timeouts > (unsigned)p->worker->s->max_reply_timeouts)) {                        /*                        * Service failed !!!                        * Time for fault tolerance (if possible)...                        */                        rec->s->errors++;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?