⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jk_lb_worker.c

📁 jboss与apache集成的中间件,详情请参看文档说明.
💻 C
📖 第 1 页 / 共 3 页
字号:
         * retry on that worker. It will be marked as         * operational if the retry timeout is elapsed.         * The worker might still be unusable, but we try         * anyway.         */        if (JK_WORKER_IN_ERROR(p->lb_workers[i].s)) {            retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l);        }        /* Take into calculation only the workers that are         * not in error state, stopped or not disabled.         */        if (JK_WORKER_USABLE(p->lb_workers[i].s)) {            mytraffic = (p->lb_workers[i].s->transferred/p->lb_workers[i].s->lb_factor) +                        (p->lb_workers[i].s->readed/p->lb_workers[i].s->lb_factor);            if (!candidate || mytraffic < curmin) {                candidate = &p->lb_workers[i];                curmin = mytraffic;            }        }    }    if (p->lblock == JK_LB_LOCK_PESSIMISTIC)        jk_shm_unlock();    return candidate;}static worker_record_t *find_bysession_route(lb_worker_t *p,                                             const char *name,                                             jk_logger_t *l){    unsigned int i;    int total_factor = 0;    int uses_domain  = 0;    worker_record_t *candidate = NULL;    candidate = find_by_session(p, name, l);    if (!candidate) {        uses_domain = 1;        candidate = find_best_bydomain(p, name, l);    }    if (candidate) {        if (JK_WORKER_IN_ERROR(candidate->s)) {            retry_worker(candidate, p->s->recover_wait_time, l);        }        if (candidate->s->in_error_state || candidate->s->is_stopped ) {            /* We have a worker that is error state or stopped.             * If it has a redirection set use that redirection worker.             * This enables to safely remove the member from the             * balancer. Of course you will need a some kind of             * session replication between those two remote.             */            if (p->s->sticky_session_force)                candidate = NULL;            else if (*candidate->s->redirect)                candidate = find_by_session(p, candidate->s->redirect, l);            else if (*candidate->s->domain && !uses_domain) {                uses_domain = 1;                candidate = find_best_bydomain(p, candidate->s->domain, l);            }            if (candidate && (candidate->s->in_error_state || candidate->s->is_stopped))                candidate = NULL;        }    }    if (candidate && !uses_domain &&        p->lbmethod == JK_LB_BYREQUESTS) {        if (p->lblock == JK_LB_LOCK_PESSIMISTIC)            jk_shm_lock();        for (i = 0; i < p->num_of_workers; i++) {            if (JK_WORKER_USABLE(p->lb_workers[i].s)) {                /* Skip all workers that are not member of candidate domain */                if (*candidate->s->domain &&                    strcmp(p->lb_workers[i].s->domain, candidate->s->domain))                    continue;                p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor;                total_factor += p->lb_workers[i].s->lb_factor;            }        }        candidate->s->lb_value -= total_factor;        if (p->lblock == JK_LB_LOCK_PESSIMISTIC)            jk_shm_unlock();    }    return candidate;}static worker_record_t *find_failover_worker(lb_worker_t * p,                                             jk_logger_t *l){    worker_record_t *rc = NULL;    unsigned int i;    const char *redirect = NULL;    for (i = 0; i < p->num_of_workers; i++) {        if (strlen(p->lb_workers[i].s->redirect)) {            redirect = &(p->lb_workers[i].s->redirect[0]);            break;        }    }    if (redirect)        rc = find_bysession_route(p, redirect, l);    return rc;}static worker_record_t *find_best_worker(lb_worker_t * p,                                         jk_logger_t *l){    worker_record_t *rc = NULL;    if (p->lbmethod == JK_LB_BYREQUESTS)        rc = find_best_byrequests(p, l);    else if (p->lbmethod == JK_LB_BYTRAFFIC)        rc = find_best_bytraffic(p, l);    /* By default use worker name as session route */    if (rc)        rc->r = &(rc->s->name[0]);    else        rc = find_failover_worker(p, l);    return rc;}static worker_record_t *get_most_suitable_worker(lb_worker_t * p,                                                 jk_ws_service_t *s,                                                 int attempt,                                                 jk_logger_t *l){    worker_record_t *rc = NULL;    char *sessionid = NULL;    int r;    JK_TRACE_ENTER(l);    if (p->num_of_workers == 1) {        /* No need to find the best worker         * if there is a single one         */        if (JK_WORKER_IN_ERROR(p->lb_workers[0].s)) {            retry_worker(&p->lb_workers[0], p->s->recover_wait_time, l);        }        /* Check if worker is marked for retry */        if(!p->lb_workers[0].s->in_error_state && !p->lb_workers[0].s->is_stopped) {            p->lb_workers[0].r = &(p->lb_workers[0].s->name[0]);            JK_TRACE_EXIT(l);            return &p->lb_workers[0];        }        else {            JK_TRACE_EXIT(l);            return NULL;        }    }    else if (p->s->sticky_session) {        /* Use sessionid only if sticky_session is         * defined for this load balancer         */        sessionid = get_sessionid(s);    }    JK_ENTER_CS(&(p->cs), r);    if (!r) {       jk_log(l, JK_LOG_ERROR,              "locking thread with errno=%d",              errno);        JK_TRACE_EXIT(l);        return NULL;    }    if (sessionid) {        char *session = sessionid;        if (JK_IS_DEBUG_LEVEL(l)) {            jk_log(l, JK_LOG_DEBUG,                   "total sessionid is %s",                    sessionid ? sessionid : "empty");        }        while (sessionid) {            char *next = strchr(sessionid, ';');            char *session_route = NULL;            if (next)               *next++ = '\0';            if (JK_IS_DEBUG_LEVEL(l))                jk_log(l, JK_LOG_DEBUG,                       "searching worker for partial sessionid %s",                       sessionid);            session_route = strchr(sessionid, '.');            if (session_route) {                ++session_route;                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "searching worker for session route %s",                            session_route);                /* We have a session route. Whow! */                rc = find_bysession_route(p, session_route, l);                if (rc) {                    JK_LEAVE_CS(&(p->cs), r);                    if (JK_IS_DEBUG_LEVEL(l))                        jk_log(l, JK_LOG_DEBUG,                               "found worker %s for route %s and partial sessionid %s",                               rc->s->name, session_route, sessionid);                        JK_TRACE_EXIT(l);                    return rc;                }            }            /* Try next partial sessionid if present */            sessionid = next;            rc = NULL;        }        if (!rc && p->s->sticky_session_force) {           JK_LEAVE_CS(&(p->cs), r);            jk_log(l, JK_LOG_INFO,                   "all workers are in error state for session %s",                    session);            JK_TRACE_EXIT(l);            return NULL;        }    }    rc = find_best_worker(p, l);    JK_LEAVE_CS(&(p->cs), r);    if (rc && JK_IS_DEBUG_LEVEL(l)) {        jk_log(l, JK_LOG_DEBUG,               "found best worker (%s) using %s method", rc->s->name,               p->lbmethod == JK_LB_BYREQUESTS ? "by request" : "by traffic");    }    JK_TRACE_EXIT(l);    return rc;}static int JK_METHOD service(jk_endpoint_t *e,                             jk_ws_service_t *s,                             jk_logger_t *l, int *is_error){    JK_TRACE_ENTER(l);    if (e && e->endpoint_private && s && is_error) {        lb_endpoint_t *p = e->endpoint_private;        int attempt = 0;        int num_of_workers = p->worker->num_of_workers;        worker_record_t *prec = NULL;        /* Set returned error to OK */        *is_error = JK_HTTP_OK;        /* set the recovery post, for LB mode */        s->reco_buf = jk_b_new(s->pool);        jk_b_set_buffer_size(s->reco_buf, DEF_BUFFER_SZ);        jk_b_reset(s->reco_buf);        s->reco_status = RECO_INITED;        if (JK_IS_DEBUG_LEVEL(l))            jk_log(l, JK_LOG_DEBUG,                   "service sticky_session=%d",                   p->worker->s->sticky_session);        while (num_of_workers) {            worker_record_t *rec =                get_most_suitable_worker(p->worker, s, attempt++, l);            int rc;            /* Do not reuse previous worker, because             * that worker already failed.             */            if (rec && rec != prec) {                int is_service_error = JK_HTTP_OK;                int service_stat = JK_FALSE;                jk_endpoint_t *end = NULL;                s->jvm_route = rec->r;                rc = rec->w->get_endpoint(rec->w, &end, l);                if (JK_IS_DEBUG_LEVEL(l))                    jk_log(l, JK_LOG_DEBUG,                           "service worker=%s jvm_route=%s",                           rec->s->name, s->jvm_route);                rec->s->elected++;                if (rc && end) {                    /* Reset endpoint read and write sizes for                     * this request.                     */                    end->rd = end->wr = 0;                    /* Increment the number of workers serving request */                    p->worker->s->busy++;                    if (p->worker->s->busy > p->worker->s->max_busy)                        p->worker->s->max_busy = p->worker->s->busy;                    rec->s->busy++;                    if (rec->s->busy > rec->s->max_busy)                        rec->s->max_busy = rec->s->busy;                    service_stat = end->service(end, s, l, &is_service_error);                    /* Update partial reads and writes if any */                    rec->s->readed += end->rd;                    rec->s->transferred += end->wr;                    end->done(&end, l);                    /* When returning the endpoint mark the worker as not busy.                     * We have at least one endpoint free                     */                    rec->s->is_busy = JK_FALSE;                    /* Decrement the busy worker count */                    rec->s->busy--;                    p->worker->s->busy--;                    if (service_stat == JK_TRUE) {                        rec->s->in_error_state = JK_FALSE;                        rec->s->in_recovering = JK_FALSE;                        rec->s->error_time = 0;                        JK_TRACE_EXIT(l);                        return JK_TRUE;                    }                }                else {                    /* If we can not get the endpoint                     * mark the worker as busy rather then                     * as in error                     */                    rec->s->is_busy = JK_TRUE;                    jk_log(l, JK_LOG_INFO,                           "could not get free endpoint for worker %s",                           rec->s->name);                    /* Decrement the worker count and try another worker */                    --num_of_workers;                    prec = rec;                    continue;                }                if (service_stat == JK_FALSE) {                    /*                    * Service failed !!!                    *                    * Time for fault tolerance (if possible)...                    */                    rec->s->errors++;                    rec->s->in_error_state = JK_TRUE;                    rec->s->in_recovering = JK_FALSE;                    rec->s->error_time = time(NULL);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -